Allow configuration of the queued byte threshold at which a Stream is… · grpc/grpc-java@2c83ef0
@@ -77,6 +77,19 @@ public final void flush() {
7777 }
7878 }
797980+/**
81+ * A hint to the stream that specifies how many bytes must be queued before
82+ * {@link #isReady()} will return false. A stream may ignore this property if
83+ * unsupported. This may only be set during stream initialization before
84+ * any messages are set.
85+ *
86+ * @param numBytes The number of bytes that must be queued. Must be a
87+ * positive integer.
88+ */
89+protected void setOnReadyThreshold(int numBytes) {
90+transportState().setOnReadyThreshold(numBytes);
91+ }
92+8093/**
8194 * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
8295 * (half closure on client; closure on server).
@@ -143,6 +156,9 @@ public abstract static class TransportState
143156@GuardedBy("onReadyLock")
144157private boolean deallocated;
145158159+@GuardedBy("onReadyLock")
160+private int onReadyThreshold;
161+146162protected TransportState(
147163int maxMessageSize,
148164StatsTraceContext statsTraceCtx,
@@ -157,6 +173,7 @@ protected TransportState(
157173transportTracer);
158174// TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
159175deframer = rawDeframer;
176+onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
160177 }
161178162179final void optimizeForDirectExecutor() {
@@ -178,6 +195,20 @@ final void setMaxInboundMessageSize(int maxSize) {
178195 */
179196protected abstract StreamListener listener();
180197198+/**
199+ * A hint to the stream that specifies how many bytes must be queued before
200+ * {@link #isReady()} will return false. A stream may ignore this property if
201+ * unsupported. This may only be set before any messages are sent.
202+ *
203+ * @param numBytes The number of bytes that must be queued. Must be a
204+ * positive integer.
205+ */
206+void setOnReadyThreshold(int numBytes) {
207+synchronized (onReadyLock) {
208+this.onReadyThreshold = numBytes;
209+ }
210+ }
211+181212@Override
182213public void messagesAvailable(StreamListener.MessageProducer producer) {
183214listener().messagesAvailable(producer);
@@ -259,7 +290,7 @@ protected final void setDecompressor(Decompressor decompressor) {
259290260291private boolean isReady() {
261292synchronized (onReadyLock) {
262-return allocated && numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD && !deallocated;
293+return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
263294 }
264295 }
265296@@ -316,9 +347,9 @@ public final void onSentBytes(int numBytes) {
316347synchronized (onReadyLock) {
317348checkState(allocated,
318349"onStreamAllocated was not called, but it seems the stream is active");
319-boolean belowThresholdBefore = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
350+boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
320351numSentBytesQueued -= numBytes;
321-boolean belowThresholdAfter = numSentBytesQueued < DEFAULT_ONREADY_THRESHOLD;
352+boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
322353doNotify = !belowThresholdBefore && belowThresholdAfter;
323354 }
324355if (doNotify) {