Propagate timeout errors to callback. by vbabanin · Pull Request #1761 · mongodb/mongo-java-driver

Expand Up @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -88,7 +89,7 @@ protected void setChannel(final ExtendedAsynchronousByteChannel channel) {
@Override public void writeAsync(final List<ByteBuf> buffers, final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) { final AsyncCompletionHandler<Void> handler) { AsyncWritableByteChannelAdapter byteChannel = new AsyncWritableByteChannelAdapter(); Iterator<ByteBuf> iter = buffers.iterator(); pipeOneBuffer(byteChannel, iter.next(), operationContext, new AsyncCompletionHandler<Void>() { Expand Down Expand Up @@ -189,8 +190,11 @@ public void failed(final Throwable t) {
private class AsyncWritableByteChannelAdapter { void write(final ByteBuffer src, final OperationContext operationContext, final AsyncCompletionHandler<Void> handler) { getChannel().write(src, operationContext.getTimeoutContext().getWriteTimeoutMS(), MILLISECONDS, null, new AsyncWritableByteChannelAdapter.WriteCompletionHandler(handler)); beginAsync().thenRun((c) -> { long writeTimeoutMS = operationContext.getTimeoutContext().getWriteTimeoutMS(); getChannel().write(src, writeTimeoutMS, MILLISECONDS, null, new AsyncWritableByteChannelAdapter.WriteCompletionHandler(c.asHandler())); }).finish(handler.asCallback()); }
private class WriteCompletionHandler extends BaseCompletionHandler<Void, Integer, Object> { Expand Down Expand Up @@ -222,7 +226,7 @@ private final class BasicCompletionHandler extends BaseCompletionHandler<ByteBuf private final OperationContext operationContext;
private BasicCompletionHandler(final ByteBuf dst, final OperationContext operationContext, final AsyncCompletionHandler<ByteBuf> handler) { final AsyncCompletionHandler<ByteBuf> handler) { super(handler); this.byteBufReference = new AtomicReference<>(dst); this.operationContext = operationContext; Expand All @@ -231,17 +235,20 @@ private BasicCompletionHandler(final ByteBuf dst, final OperationContext operati @Override public void completed(final Integer result, final Void attachment) { AsyncCompletionHandler<ByteBuf> localHandler = getHandlerAndClear(); ByteBuf localByteBuf = byteBufReference.getAndSet(null); if (result == -1) { localByteBuf.release(); localHandler.failed(new MongoSocketReadException("Prematurely reached end of stream", serverAddress)); } else if (!localByteBuf.hasRemaining()) { localByteBuf.flip(); localHandler.completed(localByteBuf); } else { getChannel().read(localByteBuf.asNIO(), operationContext.getTimeoutContext().getReadTimeoutMS(), MILLISECONDS, null, new BasicCompletionHandler(localByteBuf, operationContext, localHandler)); } beginAsync().<ByteBuf>thenSupply((c) -> { ByteBuf localByteBuf = byteBufReference.getAndSet(null); if (result == -1) { localByteBuf.release(); throw new MongoSocketReadException("Prematurely reached end of stream", serverAddress); } else if (!localByteBuf.hasRemaining()) { localByteBuf.flip(); c.complete(localByteBuf); } else { long readTimeoutMS = operationContext.getTimeoutContext().getReadTimeoutMS(); getChannel().read(localByteBuf.asNIO(), readTimeoutMS, MILLISECONDS, null, new BasicCompletionHandler(localByteBuf, operationContext, c.asHandler())); } }).finish(localHandler.asCallback()); }
@Override Expand Down