xds: Allow FaultFilter's interceptor to be reused · grpc/grpc-java@b3db8c2

@@ -190,94 +190,102 @@ public ClientInterceptor buildClientInterceptor(

190190

config = overrideConfig;

191191

}

192192

FaultConfig faultConfig = (FaultConfig) config;

193-

Long delayNanos = null;

194-

Status abortStatus = null;

195-

if (faultConfig.maxActiveFaults() == null

196-

|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {

197-

Metadata headers = args.getHeaders();

198-

if (faultConfig.faultDelay() != null) {

199-

delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);

200-

}

201-

if (faultConfig.faultAbort() != null) {

202-

abortStatus = determineFaultAbortStatus(faultConfig.faultAbort(), headers);

203-

}

204-

}

205-

if (delayNanos == null && abortStatus == null) {

206-

return null;

207-

}

208-

final Long finalDelayNanos = delayNanos;

209-

final Status finalAbortStatus = getAbortStatusWithDescription(abortStatus);

210193211194

final class FaultInjectionInterceptor implements ClientInterceptor {

212195

@Override

213196

public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

214197

final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,

215198

final Channel next) {

216-

Executor callExecutor = callOptions.getExecutor();

217-

if (callExecutor == null) { // This should never happen in practice because

218-

// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with

219-

// a callExecutor.

220-

// TODO(https://github.com/grpc/grpc-java/issues/7868)

221-

callExecutor = MoreExecutors.directExecutor();

199+

boolean checkFault = false;

200+

if (faultConfig.maxActiveFaults() == null

201+

|| activeFaultCounter.get() < faultConfig.maxActiveFaults()) {

202+

checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;

222203

}

223-

if (finalDelayNanos != null) {

224-

Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;

225-

if (finalAbortStatus != null) {

226-

callSupplier = Suppliers.ofInstance(

227-

new FailingClientCall<ReqT, RespT>(finalAbortStatus, callExecutor));

228-

} else {

229-

callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {

230-

@Override

231-

public ClientCall<ReqT, RespT> get() {

232-

return next.newCall(method, callOptions);

233-

}

234-

};

204+

if (!checkFault) {

205+

return next.newCall(method, callOptions);

206+

}

207+

final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {

208+

private ClientCall<ReqT, RespT> delegate;

209+210+

@Override

211+

protected ClientCall<ReqT, RespT> delegate() {

212+

return delegate;

235213

}

236-

final DelayInjectedCall<ReqT, RespT> delayInjectedCall = new DelayInjectedCall<>(

237-

finalDelayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);

238214239-

final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {

240-

@Override

241-

protected ClientCall<ReqT, RespT> delegate() {

242-

return delayInjectedCall;

215+

@Override

216+

public void start(Listener<RespT> listener, Metadata headers) {

217+

Executor callExecutor = callOptions.getExecutor();

218+

if (callExecutor == null) { // This should never happen in practice because

219+

// ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with

220+

// a callExecutor.

221+

// TODO(https://github.com/grpc/grpc-java/issues/7868)

222+

callExecutor = MoreExecutors.directExecutor();

243223

}

244224245-

@Override

246-

public void start(Listener<RespT> listener, Metadata headers) {

247-

Listener<RespT> finalListener =

248-

new SimpleForwardingClientCallListener<RespT>(listener) {

249-

@Override

250-

public void onClose(Status status, Metadata trailers) {

251-

if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {

252-

// TODO(zdapeng:) check effective deadline locally, and

253-

// do the following only if the local deadline is exceeded.

254-

// (If the server sends DEADLINE_EXCEEDED for its own deadline, then the

255-

// injected delay does not contribute to the error, because the request is

256-

// only sent out after the delay. There could be a race between local and

257-

// remote, but it is rather rare.)

258-

String description = String.format(

259-

Locale.US,

260-

"Deadline exceeded after up to %d ns of fault-injected delay",

261-

finalDelayNanos);

262-

if (status.getDescription() != null) {

263-

description = description + ": " + status.getDescription();

264-

}

265-

status = Status.DEADLINE_EXCEEDED

266-

.withDescription(description).withCause(status.getCause());

267-

// Replace trailers to prevent mixing sources of status and trailers.

268-

trailers = new Metadata();

225+

Long delayNanos;

226+

Status abortStatus = null;

227+

if (faultConfig.faultDelay() != null) {

228+

delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);

229+

} else {

230+

delayNanos = null;

231+

}

232+

if (faultConfig.faultAbort() != null) {

233+

abortStatus = getAbortStatusWithDescription(

234+

determineFaultAbortStatus(faultConfig.faultAbort(), headers));

235+

}

236+237+

Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;

238+

if (abortStatus != null) {

239+

callSupplier = Suppliers.ofInstance(

240+

new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));

241+

} else {

242+

callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {

243+

@Override

244+

public ClientCall<ReqT, RespT> get() {

245+

return next.newCall(method, callOptions);

246+

}

247+

};

248+

}

249+

if (delayNanos == null) {

250+

delegate = callSupplier.get();

251+

delegate().start(listener, headers);

252+

return;

253+

}

254+255+

delegate = new DelayInjectedCall<>(

256+

delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);

257+258+

Listener<RespT> finalListener =

259+

new SimpleForwardingClientCallListener<RespT>(listener) {

260+

@Override

261+

public void onClose(Status status, Metadata trailers) {

262+

if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {

263+

// TODO(zdapeng:) check effective deadline locally, and

264+

// do the following only if the local deadline is exceeded.

265+

// (If the server sends DEADLINE_EXCEEDED for its own deadline, then the

266+

// injected delay does not contribute to the error, because the request is

267+

// only sent out after the delay. There could be a race between local and

268+

// remote, but it is rather rare.)

269+

String description = String.format(

270+

Locale.US,

271+

"Deadline exceeded after up to %d ns of fault-injected delay",

272+

delayNanos);

273+

if (status.getDescription() != null) {

274+

description = description + ": " + status.getDescription();

269275

}

270-

delegate().onClose(status, trailers);

276+

status = Status.DEADLINE_EXCEEDED

277+

.withDescription(description).withCause(status.getCause());

278+

// Replace trailers to prevent mixing sources of status and trailers.

279+

trailers = new Metadata();

271280

}

272-

};

273-

delegate().start(finalListener, headers);

274-

}

281+

delegate().onClose(status, trailers);

282+

}

283+

};

284+

delegate().start(finalListener, headers);

275285

}

276-277-

return new DeadlineInsightForwardingCall();

278-

} else {

279-

return new FailingClientCall<>(finalAbortStatus, callExecutor);

280286

}

287+288+

return new DeadlineInsightForwardingCall();

281289

}

282290

}

283291