xds: Allow FaultFilter's interceptor to be reused · grpc/grpc-java@b3db8c2
@@ -190,94 +190,102 @@ public ClientInterceptor buildClientInterceptor(
190190config = overrideConfig;
191191 }
192192FaultConfig faultConfig = (FaultConfig) config;
193-Long delayNanos = null;
194-Status abortStatus = null;
195-if (faultConfig.maxActiveFaults() == null
196- || activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
197-Metadata headers = args.getHeaders();
198-if (faultConfig.faultDelay() != null) {
199-delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
200- }
201-if (faultConfig.faultAbort() != null) {
202-abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers);
203- }
204- }
205-if (delayNanos == null && abortStatus == null) {
206-return null;
207- }
208-final Long finalDelayNanos = delayNanos;
209-final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus);
210193211194final class FaultInjectionInterceptor implements ClientInterceptor {
212195@Override
213196public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
214197final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
215198final Channel next) {
216-Executor callExecutor = callOptions.getExecutor();
217-if (callExecutor == null) { // This should never happen in practice because
218-// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
219-// a callExecutor.
220-// TODO(https://github.com/grpc/grpc-java/issues/7868)
221-callExecutor = MoreExecutors.directExecutor();
199+boolean checkFault = false;
200+if (faultConfig.maxActiveFaults() == null
201+ || activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
202+checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
222203 }
223-if (finalDelayNanos != null) {
224-Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
225-if (finalAbortStatus != null) {
226-callSupplier = Suppliers.ofInstance(
227-new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor));
228- } else {
229-callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
230-@Override
231-public ClientCall<ReqT, RespT> get() {
232-return next.newCall(method, callOptions);
233- }
234- };
204+if (!checkFault) {
205+return next.newCall(method, callOptions);
206+ }
207+final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
208+private ClientCall<ReqT, RespT> delegate;
209+210+@Override
211+protected ClientCall<ReqT, RespT> delegate() {
212+return delegate;
235213 }
236-final DelayInjectedCall<ReqT, RespT> delayInjectedCall = new DelayInjectedCall<>(
237-finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
238214239-final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
240-@Override
241-protected ClientCall<ReqT, RespT> delegate() {
242-return delayInjectedCall;
215+@Override
216+public void start(Listener<RespT> listener, Metadata headers) {
217+Executor callExecutor = callOptions.getExecutor();
218+if (callExecutor == null) { // This should never happen in practice because
219+// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
220+// a callExecutor.
221+// TODO(https://github.com/grpc/grpc-java/issues/7868)
222+callExecutor = MoreExecutors.directExecutor();
243223 }
244224245-@Override
246-public void start(Listener<RespT> listener, Metadata headers) {
247-Listener<RespT> finalListener =
248-new SimpleForwardingClientCallListener<RespT>(listener) {
249-@Override
250-public void onClose(Status status, Metadata trailers) {
251-if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
252-// TODO(zdapeng:) check effective deadline locally, and
253-// do the following only if the local deadline is exceeded.
254-// (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
255-// injected delay does not contribute to the error, because the request is
256-// only sent out after the delay. There could be a race between local and
257-// remote, but it is rather rare.)
258-String description = String.format(
259-Locale.US,
260-"Deadline exceeded after up to %d ns of fault-injected delay",
261-finalDelayNanos);
262-if (status.getDescription() != null) {
263-description = description + ": " + status.getDescription();
264- }
265-status = Status.DEADLINE_EXCEEDED
266- .withDescription(description).withCause(status.getCause());
267-// Replace trailers to prevent mixing sources of status and trailers.
268-trailers = new Metadata();
225+Long delayNanos;
226+Status abortStatus = null;
227+if (faultConfig.faultDelay() != null) {
228+delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
229+ } else {
230+delayNanos = null;
231+ }
232+if (faultConfig.faultAbort() != null) {
233+abortStatus = getAbortStatusWithDescription(
234+determineFaultAbortStatus(faultConfig.faultAbort(), headers));
235+ }
236+237+Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
238+if (abortStatus != null) {
239+callSupplier = Suppliers.ofInstance(
240+new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
241+ } else {
242+callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
243+@Override
244+public ClientCall<ReqT, RespT> get() {
245+return next.newCall(method, callOptions);
246+ }
247+ };
248+ }
249+if (delayNanos == null) {
250+delegate = callSupplier.get();
251+delegate().start(listener, headers);
252+return;
253+ }
254+255+delegate = new DelayInjectedCall<>(
256+delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
257+258+Listener<RespT> finalListener =
259+new SimpleForwardingClientCallListener<RespT>(listener) {
260+@Override
261+public void onClose(Status status, Metadata trailers) {
262+if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
263+// TODO(zdapeng:) check effective deadline locally, and
264+// do the following only if the local deadline is exceeded.
265+// (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
266+// injected delay does not contribute to the error, because the request is
267+// only sent out after the delay. There could be a race between local and
268+// remote, but it is rather rare.)
269+String description = String.format(
270+Locale.US,
271+"Deadline exceeded after up to %d ns of fault-injected delay",
272+delayNanos);
273+if (status.getDescription() != null) {
274+description = description + ": " + status.getDescription();
269275 }
270-delegate().onClose(status, trailers);
276+status = Status.DEADLINE_EXCEEDED
277+ .withDescription(description).withCause(status.getCause());
278+// Replace trailers to prevent mixing sources of status and trailers.
279+trailers = new Metadata();
271280 }
272- };
273-delegate().start(finalListener, headers);
274- }
281+delegate().onClose(status, trailers);
282+ }
283+ };
284+delegate().start(finalListener, headers);
275285 }
276-277-return new DeadlineInsightForwardingCall();
278- } else {
279-return new FailingClientCall<>(finalAbortStatus, callExecutor);
280286 }
287+288+return new DeadlineInsightForwardingCall();
281289 }
282290 }
283291