Fix retry race condition that can lead to double decrementing inFligh… · grpc/grpc-java@bdb6230
@@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) {
149149this.throttle = throttle;
150150 }
151151152-@SuppressWarnings("GuardedBy")
152+@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
153153@Nullable // null if already committed
154154@CheckReturnValue
155155private Runnable commit(final Substream winningSubstream) {
156-157156synchronized (lock) {
158157if (state.winningSubstream != null) {
159158return null;
@@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) {
165164// subtract the share of this RPC from channelBufferUsed.
166165channelBufferUsed.addAndGet(-perRpcBufferUsed);
167166167+final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;
168168final Future<?> retryFuture;
169169if (scheduledRetry != null) {
170-// TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead
171-// found: 'this.lock'
172170retryFuture = scheduledRetry.markCancelled();
173171scheduledRetry = null;
174172 } else {
@@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) {
177175// cancel the scheduled hedging if it is scheduled prior to the commitment
178176final Future<?> hedgingFuture;
179177if (scheduledHedging != null) {
180-// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
181-// found: 'this.lock'
182178hedgingFuture = scheduledHedging.markCancelled();
183179scheduledHedging = null;
184180 } else {
@@ -196,7 +192,21 @@ public void run() {
196192 }
197193if (retryFuture != null) {
198194retryFuture.cancel(false);
195+if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {
196+assert savedCloseMasterListenerReason != null;
197+listenerSerializeExecutor.execute(
198+new Runnable() {
199+@Override
200+public void run() {
201+isClosed = true;
202+masterListener.closed(savedCloseMasterListenerReason.status,
203+savedCloseMasterListenerReason.progress,
204+savedCloseMasterListenerReason.metadata);
205+ }
206+ });
207+ }
199208 }
209+200210if (hedgingFuture != null) {
201211hedgingFuture.cancel(false);
202212 }
@@ -415,7 +425,7 @@ public final void start(ClientStreamListener listener) {
415425drain(substream);
416426 }
417427418-@SuppressWarnings("GuardedBy")
428+@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
419429private void pushbackHedging(@Nullable Integer delayMillis) {
420430if (delayMillis == null) {
421431return;
@@ -434,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) {
434444return;
435445 }
436446437-// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
438-// found: 'this.lock'
439447futureToBeCancelled = scheduledHedging.markCancelled();
440448scheduledHedging = future = new FutureCanceller(lock);
441449 }
@@ -469,16 +477,13 @@ public void run() {
469477 }
470478callExecutor.execute(
471479new Runnable() {
472-@SuppressWarnings("GuardedBy")
480+@SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok
473481@Override
474482public void run() {
475483boolean cancelled = false;
476484FutureCanceller future = null;
477485478486synchronized (lock) {
479-// TODO(b/145386688): This access should be guarded by
480-// 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:
481-// 'RetriableStream.this.lock'
482487if (scheduledHedgingRef.isCancelled()) {
483488cancelled = true;
484489 } else {
@@ -810,13 +815,11 @@ private boolean hasPotentialHedging(State state) {
810815&& !state.hedgingFrozen;
811816 }
812817813-@SuppressWarnings("GuardedBy")
818+@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok
814819private void freezeHedging() {
815820Future<?> futureToBeCancelled = null;
816821synchronized (lock) {
817822if (scheduledHedging != null) {
818-// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead
819-// found: 'this.lock'
820823futureToBeCancelled = scheduledHedging.markCancelled();
821824scheduledHedging = null;
822825 }
@@ -999,9 +1002,19 @@ public void run() {
9991002synchronized (lock) {
10001003scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);
10011004 }
1005+10021006class RetryBackoffRunnable implements Runnable {
10031007@Override
1008+@SuppressWarnings("FutureReturnValueIgnored")
10041009public void run() {
1010+synchronized (scheduledRetryCopy.lock) {
1011+if (scheduledRetryCopy.isCancelled()) {
1012+return;
1013+ } else {
1014+scheduledRetryCopy.markCancelled();
1015+ }
1016+ }
1017+10051018callExecutor.execute(
10061019new Runnable() {
10071020@Override
@@ -1563,11 +1576,16 @@ private static final class FutureCanceller {
15631576 }
1564157715651578void setFuture(Future<?> future) {
1579+boolean wasCancelled;
15661580synchronized (lock) {
1567-if (!cancelled) {
1581+wasCancelled = cancelled;
1582+if (!wasCancelled) {
15681583this.future = future;
15691584 }
15701585 }
1586+if (wasCancelled) {
1587+future.cancel(false);
1588+ }
15711589 }
1572159015731591@GuardedBy("lock")