Fix retry race condition that can lead to double decrementing inFligh… · grpc/grpc-java@bdb6230

@@ -149,11 +149,10 @@ public void uncaughtException(Thread t, Throwable e) {

149149

this.throttle = throttle;

150150

}

151151152-

@SuppressWarnings("GuardedBy")

152+

@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok

153153

@Nullable // null if already committed

154154

@CheckReturnValue

155155

private Runnable commit(final Substream winningSubstream) {

156-157156

synchronized (lock) {

158157

if (state.winningSubstream != null) {

159158

return null;

@@ -165,10 +164,9 @@ private Runnable commit(final Substream winningSubstream) {

165164

// subtract the share of this RPC from channelBufferUsed.

166165

channelBufferUsed.addAndGet(-perRpcBufferUsed);

167166167+

final boolean wasCancelled = (scheduledRetry != null) ? scheduledRetry.isCancelled() : false;

168168

final Future<?> retryFuture;

169169

if (scheduledRetry != null) {

170-

// TODO(b/145386688): This access should be guarded by 'this.scheduledRetry.lock'; instead

171-

// found: 'this.lock'

172170

retryFuture = scheduledRetry.markCancelled();

173171

scheduledRetry = null;

174172

} else {

@@ -177,8 +175,6 @@ private Runnable commit(final Substream winningSubstream) {

177175

// cancel the scheduled hedging if it is scheduled prior to the commitment

178176

final Future<?> hedgingFuture;

179177

if (scheduledHedging != null) {

180-

// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead

181-

// found: 'this.lock'

182178

hedgingFuture = scheduledHedging.markCancelled();

183179

scheduledHedging = null;

184180

} else {

@@ -196,7 +192,21 @@ public void run() {

196192

}

197193

if (retryFuture != null) {

198194

retryFuture.cancel(false);

195+

if (!wasCancelled && inFlightSubStreams.decrementAndGet() == Integer.MIN_VALUE) {

196+

assert savedCloseMasterListenerReason != null;

197+

listenerSerializeExecutor.execute(

198+

new Runnable() {

199+

@Override

200+

public void run() {

201+

isClosed = true;

202+

masterListener.closed(savedCloseMasterListenerReason.status,

203+

savedCloseMasterListenerReason.progress,

204+

savedCloseMasterListenerReason.metadata);

205+

}

206+

});

207+

}

199208

}

209+200210

if (hedgingFuture != null) {

201211

hedgingFuture.cancel(false);

202212

}

@@ -415,7 +425,7 @@ public final void start(ClientStreamListener listener) {

415425

drain(substream);

416426

}

417427418-

@SuppressWarnings("GuardedBy")

428+

@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok

419429

private void pushbackHedging(@Nullable Integer delayMillis) {

420430

if (delayMillis == null) {

421431

return;

@@ -434,8 +444,6 @@ private void pushbackHedging(@Nullable Integer delayMillis) {

434444

return;

435445

}

436446437-

// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead

438-

// found: 'this.lock'

439447

futureToBeCancelled = scheduledHedging.markCancelled();

440448

scheduledHedging = future = new FutureCanceller(lock);

441449

}

@@ -469,16 +477,13 @@ public void run() {

469477

}

470478

callExecutor.execute(

471479

new Runnable() {

472-

@SuppressWarnings("GuardedBy")

480+

@SuppressWarnings("GuardedBy") //TODO(b/145386688) lock==ScheduledCancellor.lock so ok

473481

@Override

474482

public void run() {

475483

boolean cancelled = false;

476484

FutureCanceller future = null;

477485478486

synchronized (lock) {

479-

// TODO(b/145386688): This access should be guarded by

480-

// 'HedgingRunnable.this.scheduledHedgingRef.lock'; instead found:

481-

// 'RetriableStream.this.lock'

482487

if (scheduledHedgingRef.isCancelled()) {

483488

cancelled = true;

484489

} else {

@@ -810,13 +815,11 @@ private boolean hasPotentialHedging(State state) {

810815

&& !state.hedgingFrozen;

811816

}

812817813-

@SuppressWarnings("GuardedBy")

818+

@SuppressWarnings("GuardedBy") // TODO(b/145386688) this.lock==ScheduledCancellor.lock so ok

814819

private void freezeHedging() {

815820

Future<?> futureToBeCancelled = null;

816821

synchronized (lock) {

817822

if (scheduledHedging != null) {

818-

// TODO(b/145386688): This access should be guarded by 'this.scheduledHedging.lock'; instead

819-

// found: 'this.lock'

820823

futureToBeCancelled = scheduledHedging.markCancelled();

821824

scheduledHedging = null;

822825

}

@@ -999,9 +1002,19 @@ public void run() {

9991002

synchronized (lock) {

10001003

scheduledRetry = scheduledRetryCopy = new FutureCanceller(lock);

10011004

}

1005+10021006

class RetryBackoffRunnable implements Runnable {

10031007

@Override

1008+

@SuppressWarnings("FutureReturnValueIgnored")

10041009

public void run() {

1010+

synchronized (scheduledRetryCopy.lock) {

1011+

if (scheduledRetryCopy.isCancelled()) {

1012+

return;

1013+

} else {

1014+

scheduledRetryCopy.markCancelled();

1015+

}

1016+

}

1017+10051018

callExecutor.execute(

10061019

new Runnable() {

10071020

@Override

@@ -1563,11 +1576,16 @@ private static final class FutureCanceller {

15631576

}

1564157715651578

void setFuture(Future<?> future) {

1579+

boolean wasCancelled;

15661580

synchronized (lock) {

1567-

if (!cancelled) {

1581+

wasCancelled = cancelled;

1582+

if (!wasCancelled) {

15681583

this.future = future;

15691584

}

15701585

}

1586+

if (wasCancelled) {

1587+

future.cancel(false);

1588+

}

15711589

}

1572159015731591

@GuardedBy("lock")