Timeouts: hop out of queue processing as soon as we know no more are eligible for timeout by NickCraver · Pull Request #2217 · StackExchange/StackExchange.Redis

Expand Up @@ -665,15 +665,24 @@ internal void OnBridgeHeartbeat() { // We only handle async timeouts here, synchronous timeouts are handled upstream. // Those sync timeouts happen in ConnectionMultiplexer.ExecuteSyncImpl() via Monitor.Wait. if (msg.ResultBoxIsAsync && msg.HasTimedOut(now, timeout, out var elapsed)) if (msg.HasTimedOut(now, timeout, out var elapsed)) { bool haveDeltas = msg.TryGetPhysicalState(out _, out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0; var timeoutEx = ExceptionFactory.Timeout(multiplexer, haveDeltas ? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)" : $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); multiplexer.OnMessageFaulted(msg, timeoutEx); msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed multiplexer.OnAsyncTimeout(); if (msg.ResultBoxIsAsync) { bool haveDeltas = msg.TryGetPhysicalState(out _, out _, out long sentDelta, out var receivedDelta) && sentDelta >= 0 && receivedDelta >= 0; var timeoutEx = ExceptionFactory.Timeout(multiplexer, haveDeltas ? $"Timeout awaiting response (outbound={sentDelta >> 10}KiB, inbound={receivedDelta >> 10}KiB, {elapsed}ms elapsed, timeout is {timeout}ms)" : $"Timeout awaiting response ({elapsed}ms elapsed, timeout is {timeout}ms)", msg, server); multiplexer.OnMessageFaulted(msg, timeoutEx); msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed multiplexer.OnAsyncTimeout(); } } else { // This is a head-of-line queue, which means the first thing we hit that *hasn't* timed out means no more will timeout // and we can stop looping and release the lock early. break; } // Note: it is important that we **do not** remove the message unless we're tearing down the socket; that // would disrupt the chain for MatchResult; we just preemptively abort the message from the caller's Expand Down