Timeouts: hop out of queue processing as soon as we know no more are eligible for timeout by NickCraver · Pull Request #2217 · StackExchange/StackExchange.Redis
Expand Up
@@ -665,15 +665,24 @@ internal void OnBridgeHeartbeat()
{
// We only handle async timeouts here, synchronous timeouts are handled upstream.
// Those sync timeouts happen in ConnectionMultiplexer.ExecuteSyncImpl() via Monitor.Wait.
if (msg.ResultBoxIsAsync && msg.HasTimedOut(now, timeout, out var elapsed))
if (msg.HasTimedOut(now, timeout, out var elapsed))
{
bool haveDeltas = msg.TryGetPhysicalState(out _, out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0;
var timeoutEx = ExceptionFactory.Timeout(multiplexer, haveDeltas
? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)"
: $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
multiplexer.OnMessageFaulted(msg, timeoutEx);
msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed
multiplexer.OnAsyncTimeout();
if (msg.ResultBoxIsAsync)
{
bool haveDeltas = msg.TryGetPhysicalState(out _, out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0;
var timeoutEx = ExceptionFactory.Timeout(multiplexer, haveDeltas
? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)"
: $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server);
multiplexer.OnMessageFaulted(msg, timeoutEx);
msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed
multiplexer.OnAsyncTimeout();
}
}
else
{
// This is a head-of-line queue, which means the first thing we hit that *hasn't* timed out means no more will timeout
// and we can stop looping and release the lock early.
break;
}
// Note: it is important that we **do not** remove the message unless we're tearing down the socket; that
// would disrupt the chain for MatchResult; we just preemptively abort the message from the caller's
Expand Down