Detect transport executors with no remaining threads (#11503) · grpc/grpc-java@3a6be9c

@@ -83,9 +83,13 @@

8383

import java.util.Locale;

8484

import java.util.Map;

8585

import java.util.Random;

86+

import java.util.concurrent.BrokenBarrierException;

8687

import java.util.concurrent.CountDownLatch;

88+

import java.util.concurrent.CyclicBarrier;

8789

import java.util.concurrent.Executor;

8890

import java.util.concurrent.ScheduledExecutorService;

91+

import java.util.concurrent.TimeUnit;

92+

import java.util.concurrent.TimeoutException;

8993

import java.util.logging.Level;

9094

import java.util.logging.Logger;

9195

import javax.annotation.Nullable;

@@ -499,8 +503,15 @@ public Runnable start(Listener listener) {

499503

outboundFlow = new OutboundFlowController(this, frameWriter);

500504

}

501505

final CountDownLatch latch = new CountDownLatch(1);

506+

final CountDownLatch latchForExtraThread = new CountDownLatch(1);

507+

// The transport needs up to two threads to function once started,

508+

// but only needs one during handshaking. Start another thread during handshaking

509+

// to make sure there's still a free thread available. If the number of threads is exhausted,

510+

// it is better to kill the transport than for all the transports to hang unable to send.

511+

CyclicBarrier barrier = new CyclicBarrier(2);

502512

// Connecting in the serializingExecutor, so that some stream operations like synStream

503513

// will be executed after connected.

514+504515

serializingExecutor.execute(new Runnable() {

505516

@Override

506517

public void run() {

@@ -510,8 +521,14 @@ public void run() {

510521

// initial preface.

511522

try {

512523

latch.await();

524+

barrier.await(1000, TimeUnit.MILLISECONDS);

513525

} catch (InterruptedException e) {

514526

Thread.currentThread().interrupt();

527+

} catch (TimeoutException | BrokenBarrierException e) {

528+

startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE

529+

.withDescription("Timed out waiting for second handshake thread. "

530+

+ "The transport executor pool may have run out of threads"));

531+

return;

515532

}

516533

// Use closed source on failure so that the reader immediately shuts down.

517534

BufferedSource source = Okio.buffer(new Source() {

@@ -575,6 +592,7 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()

575592

return;

576593

} finally {

577594

clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));

595+

latchForExtraThread.countDown();

578596

}

579597

synchronized (lock) {

580598

socket = Preconditions.checkNotNull(sock, "socket");

@@ -584,6 +602,21 @@ sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort()

584602

}

585603

}

586604

});

605+606+

executor.execute(new Runnable() {

607+

@Override

608+

public void run() {

609+

try {

610+

barrier.await(1000, TimeUnit.MILLISECONDS);

611+

latchForExtraThread.await();

612+

} catch (BrokenBarrierException | TimeoutException e) {

613+

// Something bad happened, maybe too few threads available!

614+

// This will be handled in the handshake thread.

615+

} catch (InterruptedException e) {

616+

Thread.currentThread().interrupt();

617+

}

618+

}

619+

});

587620

// Schedule to send connection preface & settings before any other write.

588621

try {

589622

sendConnectionPrefaceAndSettings();