Pub/Sub fixes for subscribe/re-subscribe by NickCraver · Pull Request #1947 · StackExchange/StackExchange.Redis

Expand Up @@ -8,7 +8,6 @@ using System.Threading.Channels; using System.Threading.Tasks; using static StackExchange.Redis.ConnectionMultiplexer; using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState; #if !NETCOREAPP using Pipelines.Sockets.Unofficial.Threading; using static Pipelines.Sockets.Unofficial.Threading.MutexSlim; Expand Down Expand Up @@ -102,7 +101,6 @@ public enum State : byte public void Dispose() { isDisposed = true; ShutdownSubscriptionQueue(); using (var tmp = physical) { physical = null; Expand Down Expand Up @@ -220,71 +218,6 @@ internal void GetCounters(ConnectionCounters counters) physical?.GetCounters(counters); }
private Channel<PendingSubscriptionState> _subscriptionBackgroundQueue; private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions { AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread SingleReader = true, // only one reader will be started per channel SingleWriter = true, // writes will be synchronized, because order matters };
private Channel<PendingSubscriptionState> GetSubscriptionQueue() { var queue = _subscriptionBackgroundQueue; if (queue == null) { queue = Channel.CreateUnbounded<PendingSubscriptionState>(s_subscriptionQueueOptions); var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);
if (existing != null) return existing; // we didn't win, but that's fine
// we won (_subqueue is now queue) // this means we have a new channel without a reader; let's fix that! Task.Run(() => ExecuteSubscriptionLoop()); } return queue; }
private void ShutdownSubscriptionQueue() { try { Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete(); } catch { } }
private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge { // note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it! try { while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next)) { try { // Treat these commands as background/handshake and do not allow queueing to backlog if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success) { next.Abort(); } } catch (Exception ex) { next.Fail(ex); } } } catch (Exception ex) { Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType); } }
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state) => !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal readonly struct BridgeStatus { /// <summary> Expand Down