Allow configuration of the queued byte threshold at which a Stream is… · grpc/grpc-java@2c83ef0

@@ -77,6 +77,19 @@ public final void flush() {

7777

}

7878

}

797980+

/**

81+

* A hint to the stream that specifies how many bytes must be queued before

82+

* {@link #isReady()} will return false. A stream may ignore this property if

83+

* unsupported. This may only be set during stream initialization before

84+

* any messages are set.

85+

*

86+

* @param numBytes The number of bytes that must be queued. Must be a

87+

* positive integer.

88+

*/

89+

protected void setOnReadyThreshold(int numBytes) {

90+

transportState().setOnReadyThreshold(numBytes);

91+

}

92+8093

/**

8194

* Closes the underlying framer. Should be called when the outgoing stream is gracefully closed

8295

* (half closure on client; closure on server).

@@ -143,6 +156,9 @@ public abstract static class TransportState

143156

@GuardedBy("onReadyLock")

144157

private boolean deallocated;

145158159+

@GuardedBy("onReadyLock")

160+

private int onReadyThreshold;

161+146162

protected TransportState(

147163

int maxMessageSize,

148164

StatsTraceContext statsTraceCtx,

@@ -157,6 +173,7 @@ protected TransportState(

157173

transportTracer);

158174

// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.

159175

deframer = rawDeframer;

176+

onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;

160177

}

161178162179

final void optimizeForDirectExecutor() {

@@ -178,6 +195,20 @@ final void setMaxInboundMessageSize(int maxSize) {

178195

*/

179196

protected abstract StreamListener listener();

180197198+

/**

199+

* A hint to the stream that specifies how many bytes must be queued before

200+

* {@link #isReady()} will return false. A stream may ignore this property if

201+

* unsupported. This may only be set before any messages are sent.

202+

*

203+

* @param numBytes The number of bytes that must be queued. Must be a

204+

* positive integer.

205+

*/

206+

void setOnReadyThreshold(int numBytes) {

207+

synchronized (onReadyLock) {

208+

this.onReadyThreshold = numBytes;

209+

}

210+

}

211+181212

@Override

182213

public void messagesAvailable(StreamListener.MessageProducer producer) {

183214

listener().messagesAvailable(producer);

@@ -259,7 +290,7 @@ protected final void setDecompressor(Decompressor decompressor) {

259290260291

private boolean isReady() {

261292

synchronized (onReadyLock) {

262-

return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;

293+

return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;

263294

}

264295

}

265296

@@ -316,9 +347,9 @@ public final void onSentBytes(int numBytes) {

316347

synchronized (onReadyLock) {

317348

checkState(allocated,

318349

"onStreamAllocated was not called, but it seems the stream is active");

319-

boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;

350+

boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;

320351

numSentBytesQueued -= numBytes;

321-

boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;

352+

boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;

322353

doNotify = !belowThresholdBefore && belowThresholdAfter;

323354

}

324355

if (doNotify) {