Pub/Sub fixes for subscribe/re-subscribe by NickCraver · Pull Request #1947 · StackExchange/StackExchange.Redis
Expand Up
@@ -8,7 +8,6 @@
using System.Threading.Channels;
using System.Threading.Tasks;
using static StackExchange.Redis.ConnectionMultiplexer;
using PendingSubscriptionState = global::StackExchange.Redis.ConnectionMultiplexer.Subscription.PendingSubscriptionState;
#if !NETCOREAPP
using Pipelines.Sockets.Unofficial.Threading;
using static Pipelines.Sockets.Unofficial.Threading.MutexSlim;
Expand Down
Expand Up
@@ -102,7 +101,6 @@ public enum State : byte
public void Dispose()
{
isDisposed = true;
ShutdownSubscriptionQueue();
using (var tmp = physical)
{
physical = null;
Expand Down
Expand Up
@@ -220,71 +218,6 @@ internal void GetCounters(ConnectionCounters counters)
physical?.GetCounters(counters);
}
private Channel<PendingSubscriptionState> _subscriptionBackgroundQueue; private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions { AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread SingleReader = true, // only one reader will be started per channel SingleWriter = true, // writes will be synchronized, because order matters };
private Channel<PendingSubscriptionState> GetSubscriptionQueue() { var queue = _subscriptionBackgroundQueue; if (queue == null) { queue = Channel.CreateUnbounded<PendingSubscriptionState>(s_subscriptionQueueOptions); var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);
if (existing != null) return existing; // we didn't win, but that's fine
// we won (_subqueue is now queue) // this means we have a new channel without a reader; let's fix that! Task.Run(() => ExecuteSubscriptionLoop()); } return queue; }
private void ShutdownSubscriptionQueue() { try { Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete(); } catch { } }
private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge { // note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it! try { while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next)) { try { // Treat these commands as background/handshake and do not allow queueing to backlog if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success) { next.Abort(); } } catch (Exception ex) { next.Fail(ex); } } } catch (Exception ex) { Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType); } }
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state) => !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal readonly struct BridgeStatus { /// <summary> Expand Down
private Channel<PendingSubscriptionState> _subscriptionBackgroundQueue; private static readonly UnboundedChannelOptions s_subscriptionQueueOptions = new UnboundedChannelOptions { AllowSynchronousContinuations = false, // we do *not* want the async work to end up on the caller's thread SingleReader = true, // only one reader will be started per channel SingleWriter = true, // writes will be synchronized, because order matters };
private Channel<PendingSubscriptionState> GetSubscriptionQueue() { var queue = _subscriptionBackgroundQueue; if (queue == null) { queue = Channel.CreateUnbounded<PendingSubscriptionState>(s_subscriptionQueueOptions); var existing = Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, queue, null);
if (existing != null) return existing; // we didn't win, but that's fine
// we won (_subqueue is now queue) // this means we have a new channel without a reader; let's fix that! Task.Run(() => ExecuteSubscriptionLoop()); } return queue; }
private void ShutdownSubscriptionQueue() { try { Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null)?.Writer.TryComplete(); } catch { } }
private async Task ExecuteSubscriptionLoop() // pushes items that have been enqueued over the bridge { // note: this will execute on the default pool rather than our dedicated pool; I'm... OK with this var queue = _subscriptionBackgroundQueue ?? Interlocked.CompareExchange(ref _subscriptionBackgroundQueue, null, null); // just to be sure we can read it! try { while (await queue.Reader.WaitToReadAsync().ForAwait() && queue.Reader.TryRead(out var next)) { try { // Treat these commands as background/handshake and do not allow queueing to backlog if ((await TryWriteAsync(next.Message, next.IsReplica).ForAwait()) != WriteResult.Success) { next.Abort(); } } catch (Exception ex) { next.Fail(ex); } } } catch (Exception ex) { Multiplexer.OnInternalError(ex, ServerEndPoint?.EndPoint, ConnectionType); } }
internal bool TryEnqueueBackgroundSubscriptionWrite(in PendingSubscriptionState state) => !isDisposed && (_subscriptionBackgroundQueue ?? GetSubscriptionQueue()).Writer.TryWrite(state);
internal readonly struct BridgeStatus { /// <summary> Expand Down