Support sharded pubsub commands by mgravell · Pull Request #2887 · StackExchange/StackExchange.Redis

Expand Up @@ -29,7 +29,7 @@ internal sealed partial class PhysicalConnection : IDisposable
private const int DefaultRedisDatabaseCount = 16;
private static readonly CommandBytes message = "message", pmessage = "pmessage"; private static readonly CommandBytes message = "message", pmessage = "pmessage", smessage = "smessage";
private static readonly Message[] ReusableChangeDatabaseCommands = Enumerable.Range(0, DefaultRedisDatabaseCount).Select( i => Message.Create(i, CommandFlags.FireAndForget, RedisCommand.SELECT)).ToArray(); Expand Down Expand Up @@ -276,7 +276,11 @@ private enum ReadMode : byte private RedisProtocol _protocol; // note starts at **zero**, not RESP2 public RedisProtocol? Protocol => _protocol == 0 ? null : _protocol;
internal void SetProtocol(RedisProtocol value) => _protocol = value; internal void SetProtocol(RedisProtocol value) { _protocol = value; BridgeCouldBeNull?.SetProtocol(value); }
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2202:Do not dispose objects multiple times", Justification = "Trust me yo")] internal void Shutdown() Expand Down Expand Up @@ -384,7 +388,7 @@ public void RecordConnectionFailed( bool isInitialConnect = false, IDuplexPipe? connectingPipe = null) { bool weAskedForThis = false; bool weAskedForThis; Exception? outerException = innerException; IdentifyFailureType(innerException, ref failureType); var bridge = BridgeCouldBeNull; Expand Down Expand Up @@ -1644,9 +1648,9 @@ private void MatchResult(in RawResult result)
// out of band message does not match to a queued message var items = result.GetItems(); if (items.Length >= 3 && items[0].IsEqual(message)) if (items.Length >= 3 && (items[0].IsEqual(message) || items[0].IsEqual(smessage))) { _readStatus = ReadStatus.PubSubMessage; _readStatus = items[0].IsEqual(message) ? ReadStatus.PubSubMessage : ReadStatus.PubSubSMessage;
// special-case the configuration change broadcasts (we don't keep that in the usual pub/sub registry) var configChanged = muxer.ConfigurationChangedChannel; Expand All @@ -1668,8 +1672,17 @@ private void MatchResult(in RawResult result) }
// invoke the handlers var channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); Trace("MESSAGE: " + channel); RedisChannel channel; if (items[0].IsEqual(message)) { channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.None); Trace("MESSAGE: " + channel); } else // see check on outer-if that restricts to message / smessage { channel = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Sharded); Trace("SMESSAGE: " + channel); } if (!channel.IsNull) { if (TryGetPubSubPayload(items[2], out var payload)) Expand All @@ -1690,27 +1703,30 @@ private void MatchResult(in RawResult result) { _readStatus = ReadStatus.PubSubPMessage;
var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Literal); var channel = items[2].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
Trace("PMESSAGE: " + channel); if (!channel.IsNull) { if (TryGetPubSubPayload(items[3], out var payload)) { var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
_readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payload); } else if (TryGetMultiPubSubPayload(items[3], out var payloads)) { var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.PatternMode.Pattern); var sub = items[1].AsRedisChannel(ChannelPrefix, RedisChannel.RedisChannelOptions.Pattern);
_readStatus = ReadStatus.InvokePubSub; muxer.OnMessage(sub, channel, payloads); } } return; // AND STOP PROCESSING! }
// if it didn't look like "[p]message", then we still need to process the pending queue // if it didn't look like "[p|s]message", then we still need to process the pending queue } Trace("Matching result...");
Expand Down Expand Up @@ -2110,6 +2126,7 @@ internal enum ReadStatus MatchResult, PubSubMessage, PubSubPMessage, PubSubSMessage, Reconfigure, InvokePubSub, ResponseSequenceCheck, // high-integrity mode only Expand Down