Detect transport executors with no remaining threads (#11503) · grpc/grpc-java@3a6be9c
@@ -83,9 +83,13 @@
8383import java.util.Locale;
8484import java.util.Map;
8585import java.util.Random;
86+import java.util.concurrent.BrokenBarrierException;
8687import java.util.concurrent.CountDownLatch;
88+import java.util.concurrent.CyclicBarrier;
8789import java.util.concurrent.Executor;
8890import java.util.concurrent.ScheduledExecutorService;
91+import java.util.concurrent.TimeUnit;
92+import java.util.concurrent.TimeoutException;
8993import java.util.logging.Level;
9094import java.util.logging.Logger;
9195import javax.annotation.Nullable;
@@ -499,8 +503,15 @@ public Runnable start(Listener listener) {
499503outboundFlow = new OutboundFlowController(this, frameWriter);
500504 }
501505final CountDownLatch latch = new CountDownLatch(1);
506+final CountDownLatch latchForExtraThread = new CountDownLatch(1);
507+// The transport needs up to two threads to function once started,
508+// but only needs one during handshaking. Start another thread during handshaking
509+// to make sure there's still a free thread available. If the number of threads is exhausted,
510+// it is better to kill the transport than for all the transports to hang unable to send.
511+CyclicBarrier barrier = new CyclicBarrier(2);
502512// Connecting in the serializingExecutor, so that some stream operations like synStream
503513// will be executed after connected.
514+504515serializingExecutor.execute(new Runnable() {
505516@Override
506517public void run() {
@@ -510,8 +521,14 @@ public void run() {
510521// initial preface.
511522try {
512523latch.await();
524+barrier.await(1000, TimeUnit.MILLISECONDS);
513525 } catch (InterruptedException e) {
514526Thread.currentThread().interrupt();
527+ } catch (TimeoutException | BrokenBarrierException e) {
528+startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
529+ .withDescription("Timed out waiting for second handshake thread. "
530+ + "The transport executor pool may have run out of threads"));
531+return;
515532 }
516533// Use closed source on failure so that the reader immediately shuts down.
517534BufferedSource source = Okio.buffer(new Source() {
@@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
575592return;
576593 } finally {
577594clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
595+latchForExtraThread.countDown();
578596 }
579597synchronized (lock) {
580598socket = Preconditions.checkNotNull(sock, "socket");
@@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()
584602 }
585603 }
586604 });
605+606+executor.execute(new Runnable() {
607+@Override
608+public void run() {
609+try {
610+barrier.await(1000, TimeUnit.MILLISECONDS);
611+latchForExtraThread.await();
612+ } catch (BrokenBarrierException | TimeoutException e) {
613+// Something bad happened, maybe too few threads available!
614+// This will be handled in the handshake thread.
615+ } catch (InterruptedException e) {
616+Thread.currentThread().interrupt();
617+ }
618+ }
619+ });
587620// Schedule to send connection preface & settings before any other write.
588621try {
589622sendConnectionPrefaceAndSettings();