Skip to content

Commit be40edc

Browse files
Merged PR 31899: Avoid Redis pattern matching (#49938)
1 parent c153534 commit be40edc

File tree

3 files changed

+114
-0
lines changed

3 files changed

+114
-0
lines changed

src/SignalR/server/StackExchangeRedis/test/RedisEndToEnd.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,73 @@ public async Task CanSendAndReceiveUserMessagesWhenOneConnectionWithUserDisconne
144144
}
145145
}
146146

147+
[ConditionalTheory]
148+
[SkipIfDockerNotPresent]
149+
[MemberData(nameof(TransportTypesAndProtocolTypes))]
150+
public async Task HubConnectionCanSendAndReceiveGroupMessagesGroupNameWithPatternIsTreatedAsLiteral(HttpTransportType transportType, string protocolName)
151+
{
152+
using (StartVerifiableLog())
153+
{
154+
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
155+
156+
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, LoggerFactory);
157+
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, LoggerFactory);
158+
159+
var tcs = new TaskCompletionSource<string>();
160+
connection.On<string>("Echo", message => tcs.TrySetResult(message));
161+
var tcs2 = new TaskCompletionSource<string>();
162+
secondConnection.On<string>("Echo", message => tcs2.TrySetResult(message));
163+
164+
var groupName = $"TestGroup_{transportType}_{protocolName}_{Guid.NewGuid()}";
165+
166+
await secondConnection.StartAsync().DefaultTimeout();
167+
await connection.StartAsync().DefaultTimeout();
168+
await connection.InvokeAsync("AddSelfToGroup", "*").DefaultTimeout();
169+
await secondConnection.InvokeAsync("AddSelfToGroup", groupName).DefaultTimeout();
170+
await connection.InvokeAsync("EchoGroup", groupName, "Hello, World!").DefaultTimeout();
171+
172+
Assert.Equal("Hello, World!", await tcs2.Task.DefaultTimeout());
173+
Assert.False(tcs.Task.IsCompleted);
174+
175+
await connection.InvokeAsync("EchoGroup", "*", "Hello, World!").DefaultTimeout();
176+
Assert.Equal("Hello, World!", await tcs.Task.DefaultTimeout());
177+
178+
await connection.DisposeAsync().DefaultTimeout();
179+
}
180+
}
181+
182+
[ConditionalTheory]
183+
[SkipIfDockerNotPresent]
184+
[MemberData(nameof(TransportTypesAndProtocolTypes))]
185+
public async Task CanSendAndReceiveUserMessagesUserNameWithPatternIsTreatedAsLiteral(HttpTransportType transportType, string protocolName)
186+
{
187+
using (StartVerifiableLog())
188+
{
189+
var protocol = HubProtocolHelpers.GetHubProtocol(protocolName);
190+
191+
var connection = CreateConnection(_serverFixture.FirstServer.Url + "/echo", transportType, protocol, LoggerFactory, userName: "*");
192+
var secondConnection = CreateConnection(_serverFixture.SecondServer.Url + "/echo", transportType, protocol, LoggerFactory, userName: "userA");
193+
194+
var tcs = new TaskCompletionSource<string>();
195+
connection.On<string>("Echo", message => tcs.TrySetResult(message));
196+
var tcs2 = new TaskCompletionSource<string>();
197+
secondConnection.On<string>("Echo", message => tcs2.TrySetResult(message));
198+
199+
await secondConnection.StartAsync().DefaultTimeout();
200+
await connection.StartAsync().DefaultTimeout();
201+
await connection.InvokeAsync("EchoUser", "userA", "Hello, World!").DefaultTimeout();
202+
203+
Assert.Equal("Hello, World!", await tcs2.Task.DefaultTimeout());
204+
Assert.False(tcs.Task.IsCompleted);
205+
206+
await connection.InvokeAsync("EchoUser", "*", "Hello, World!").DefaultTimeout();
207+
Assert.Equal("Hello, World!", await tcs.Task.DefaultTimeout());
208+
209+
await connection.DisposeAsync().DefaultTimeout();
210+
await secondConnection.DisposeAsync().DefaultTimeout();
211+
}
212+
}
213+
147214
private static HubConnection CreateConnection(string url, HttpTransportType transportType, IHubProtocol protocol, ILoggerFactory loggerFactory, string userName = null)
148215
{
149216
var hubConnectionBuilder = new HubConnectionBuilder()

src/SignalR/server/StackExchangeRedis/test/RedisHubLifetimeManagerTests.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,26 @@ public async Task ErrorFromConnectionFactoryLogsAndAllowsDisconnect()
125125
Assert.Equal("throw from connect", logs[0].Exception.Message);
126126
}
127127

128+
// Smoke test that Debug.Asserts in TestSubscriber aren't hit
129+
[Fact]
130+
public async Task PatternGroupAndUser()
131+
{
132+
var server = new TestRedisServer();
133+
using (var client = new TestClient())
134+
{
135+
var manager = CreateLifetimeManager(server);
136+
137+
var connection = HubConnectionContextUtils.Create(client.Connection);
138+
connection.UserIdentifier = "*";
139+
140+
await manager.OnConnectedAsync(connection).DefaultTimeout();
141+
142+
var groupName = "*";
143+
144+
await manager.AddToGroupAsync(connection.ConnectionId, groupName).DefaultTimeout();
145+
}
146+
}
147+
128148
public override TestRedisServer CreateBackplane()
129149
{
130150
return new TestRedisServer();

src/SignalR/server/StackExchangeRedis/test/TestConnectionMultiplexer.cs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System;
55
using System.Collections.Concurrent;
66
using System.Collections.Generic;
7+
using System.Diagnostics;
78
using System.IO;
89
using System.Net;
910
using System.Reflection;
@@ -12,6 +13,7 @@
1213
using StackExchange.Redis;
1314
using StackExchange.Redis.Maintenance;
1415
using StackExchange.Redis.Profiling;
16+
using Xunit;
1517

1618
namespace Microsoft.AspNetCore.SignalR.Tests;
1719

@@ -244,6 +246,8 @@ public class TestRedisServer
244246

245247
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
246248
{
249+
AssertRedisChannel(channel);
250+
247251
if (_subscriptions.TryGetValue(channel, out var handlers))
248252
{
249253
lock (handlers)
@@ -260,6 +264,8 @@ public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags
260264

261265
public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, CommandFlags flags = CommandFlags.None)
262266
{
267+
AssertRedisChannel(messageQueue.Channel);
268+
263269
Action<RedisChannel, RedisValue> handler = (channel, value) =>
264270
{
265271
// Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
@@ -280,6 +286,8 @@ public void Subscribe(ChannelMessageQueue messageQueue, int subscriberId, Comman
280286

281287
public void Unsubscribe(RedisChannel channel, int subscriberId, CommandFlags flags = CommandFlags.None)
282288
{
289+
AssertRedisChannel(channel);
290+
283291
if (_subscriptions.TryGetValue(channel, out var list))
284292
{
285293
lock (list)
@@ -288,6 +296,11 @@ public void Unsubscribe(RedisChannel channel, int subscriberId, CommandFlags fla
288296
}
289297
}
290298
}
299+
300+
internal static void AssertRedisChannel(RedisChannel channel)
301+
{
302+
Assert.False(channel.IsPattern);
303+
}
291304
}
292305

293306
public class TestSubscriber : ISubscriber
@@ -333,11 +346,15 @@ public Task<TimeSpan> PingAsync(CommandFlags flags = CommandFlags.None)
333346

334347
public long Publish(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
335348
{
349+
TestRedisServer.AssertRedisChannel(channel);
350+
336351
return _server.Publish(channel, message, flags);
337352
}
338353

339354
public async Task<long> PublishAsync(RedisChannel channel, RedisValue message, CommandFlags flags = CommandFlags.None)
340355
{
356+
TestRedisServer.AssertRedisChannel(channel);
357+
341358
await Task.Yield();
342359
return Publish(channel, message, flags);
343360
}
@@ -349,6 +366,8 @@ public void Subscribe(RedisChannel channel, Action<RedisChannel, RedisValue> han
349366

350367
public Task SubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler, CommandFlags flags = CommandFlags.None)
351368
{
369+
TestRedisServer.AssertRedisChannel(channel);
370+
352371
Subscribe(channel, handler, flags);
353372
return Task.CompletedTask;
354373
}
@@ -365,6 +384,8 @@ public bool TryWait(Task task)
365384

366385
public void Unsubscribe(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
367386
{
387+
TestRedisServer.AssertRedisChannel(channel);
388+
368389
_server.Unsubscribe(channel, _id, flags);
369390
}
370391

@@ -380,6 +401,8 @@ public Task UnsubscribeAllAsync(CommandFlags flags = CommandFlags.None)
380401

381402
public Task UnsubscribeAsync(RedisChannel channel, Action<RedisChannel, RedisValue> handler = null, CommandFlags flags = CommandFlags.None)
382403
{
404+
TestRedisServer.AssertRedisChannel(channel);
405+
383406
Unsubscribe(channel, handler, flags);
384407
return Task.CompletedTask;
385408
}
@@ -401,6 +424,8 @@ public void WaitAll(params Task[] tasks)
401424

402425
public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags = CommandFlags.None)
403426
{
427+
TestRedisServer.AssertRedisChannel(channel);
428+
404429
// Workaround for https://github.com/StackExchange/StackExchange.Redis/issues/969
405430
var redisSubscriberType = typeof(RedisChannel).Assembly.GetType("StackExchange.Redis.RedisSubscriber");
406431
var ctor = typeof(ChannelMessageQueue).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic,
@@ -414,6 +439,8 @@ public ChannelMessageQueue Subscribe(RedisChannel channel, CommandFlags flags =
414439

415440
public Task<ChannelMessageQueue> SubscribeAsync(RedisChannel channel, CommandFlags flags = CommandFlags.None)
416441
{
442+
TestRedisServer.AssertRedisChannel(channel);
443+
417444
var t = Subscribe(channel, flags);
418445
return Task.FromResult(t);
419446
}

0 commit comments

Comments
 (0)