Skip to content

Potential issue with virtual threads #172

@natan-abolafya

Description

@natan-abolafya

Describe the bug
Sorry, this will be a bit vague as I'm having hard time understanding as well.
Related: #157
Version: 2.10.1

We are using org.newsclub.net.unix.AFUNIXSocketFactory. I have been investigating an issue with jdbc connection pools where the application would just hang for unclear reasons and jdbc pool would start throwing errors about pool being busy even though there was nothing going on. It looked like some sort of connection leak but I saw nothing on the thread dumps whenever this happened.

The problem solved itself as soon as I disabled Jetty virtual threads. I don't have a clear indication it was junixsocket that caused the problem but it felt like a good suspect. Unfortunately, it happens only on a specific environment and I can't reproduce locally. And on this specific environment, it is hard to change it so the application can use postgresql TCP socket instead of unix socket to rule it out.

So I've asked AI to create a minimal test. The result is not clear cut.

  @Test
  public void blockingRead_onVirtualThreads_completesForAllClients() throws Exception {
    int clients = Integer.getInteger("benchmark.junixsocket.clients", 64);
    int serverDelayMillis = Integer.getInteger("benchmark.junixsocket.server-delay-ms", 20);
    int timeoutSeconds = Integer.getInteger("benchmark.junixsocket.timeout-seconds", 60);
    byte payload = 0x2A;

    Path socketPath = Files.createTempFile("junixsocket-vt-", ".sock");
    Files.deleteIfExists(socketPath);

    var failures = new AtomicInteger(0);
    var completed = new CountDownLatch(clients);
    var address = AFUNIXSocketAddress.of(socketPath.toFile());

    try (var server = AFUNIXServerSocket.newInstance()) {
      server.bind(address);

      var acceptor = Executors.newSingleThreadExecutor();
      var handlers = Executors.newCachedThreadPool();

      try {
        acceptor.submit(() -> {
          for (int i = 0; i < clients; i++) {
            try {
              var clientSocket = server.accept();
              handlers.submit(() -> {
                try (clientSocket) {
                  Thread.sleep(serverDelayMillis);
                  clientSocket.getOutputStream().write(payload);
                  clientSocket.getOutputStream().flush();
                } catch (Throwable t) {
                  failures.incrementAndGet();
                }
              });
            } catch (IOException e) {
              failures.incrementAndGet();
            }
          }
        });

        long startNanos = System.nanoTime();
        try (var vexec = Executors.newVirtualThreadPerTaskExecutor()) {
          for (int i = 0; i < clients; i++) {
            vexec.submit(() -> {
              try (var socket = AFUNIXSocket.newInstance()) {
                socket.connect(address);
                int result = socket.getInputStream().read();
                if (result != payload) {
                  failures.incrementAndGet();
                }
              } catch (Throwable t) {
                failures.incrementAndGet();
              } finally {
                completed.countDown();
              }
            });
          }
        }

        boolean doneInTime = completed.await(timeoutSeconds, TimeUnit.SECONDS);
        long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
        System.out.printf("%njunixsocket VT test: clients=%d, delayMs=%d, elapsedMs=%d, doneInTime=%s, failures=%d%n%n",
                          clients, serverDelayMillis, elapsedMs, doneInTime, failures.get());

        assertThat(doneInTime).isTrue();
        assertThat(failures.get()).isZero();
      } finally {
        acceptor.shutdownNow();
        handlers.shutdownNow();
        acceptor.awaitTermination(5, TimeUnit.SECONDS);
        handlers.awaitTermination(5, TimeUnit.SECONDS);
      }
    } finally {
      Files.deleteIfExists(socketPath);
    }
  }

This test hangs for 5 seconds (awaitTermination) and then gets a few failures.
If I change newVirtualThreadPerTaskExecutor to newCachedThreadPool, the test finishes immediately without any failure.
Can't say what that means exactly either but I was hoping perhaps you can? If this isn't helpful, feel free to close this issue as I lack the time to provide more right now and will go ahead with disabling virtual threads anyway.

Metadata

Metadata

Labels

bugThe issue describes a bug in the codeverifyThe issue is considered fixed/done, and reassigned to the originator to verify.

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions