Skip to content

Commit c011303

Browse files
committed
- Add some broad coverage tests
- Handle cancellation while waiting within both message loops - Unwrap task from Task.Factory.StartNew (returned by SqlReceiver.Start) - Fix bug in ReadGroupCommand - Add option to configure allowed modes of message receiving
1 parent e4b9a38 commit c011303

File tree

9 files changed

+322
-19
lines changed

9 files changed

+322
-19
lines changed

IntelliTect.AspNetCore.SignalR.SqlServer.sln

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "demo", "demo", "{62241700-2
77
EndProject
88
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{21874FF3-BE97-4F3B-AD63-EC347FF757FF}"
99
EndProject
10-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "IntelliTect.AspNetCore.SignalR.SqlServer", "src\IntelliTect.AspNetCore.SignalR.SqlServer\IntelliTect.AspNetCore.SignalR.SqlServer.csproj", "{62E576F6-E0F9-43AC-848E-97E7E93E4EF0}"
10+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IntelliTect.AspNetCore.SignalR.SqlServer", "src\IntelliTect.AspNetCore.SignalR.SqlServer\IntelliTect.AspNetCore.SignalR.SqlServer.csproj", "{62E576F6-E0F9-43AC-848E-97E7E93E4EF0}"
1111
EndProject
12-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DemoServer", "demo\DemoServer\DemoServer.csproj", "{FACCC87C-37E3-4095-9997-75EE3C54CD49}"
12+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DemoServer", "demo\DemoServer\DemoServer.csproj", "{FACCC87C-37E3-4095-9997-75EE3C54CD49}"
13+
EndProject
14+
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{CA676E97-11D2-4D19-96DA-2B66392A05D5}"
15+
EndProject
16+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "IntelliTect.AspNetCore.SignalR.SqlServer.Tests", "test\IntelliTect.AspNetCore.SignalR.SqlServer.Tests\IntelliTect.AspNetCore.SignalR.SqlServer.Tests.csproj", "{468B17AA-5509-481D-884E-145978311BBA}"
1317
EndProject
1418
Global
1519
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -25,13 +29,18 @@ Global
2529
{FACCC87C-37E3-4095-9997-75EE3C54CD49}.Debug|Any CPU.Build.0 = Debug|Any CPU
2630
{FACCC87C-37E3-4095-9997-75EE3C54CD49}.Release|Any CPU.ActiveCfg = Release|Any CPU
2731
{FACCC87C-37E3-4095-9997-75EE3C54CD49}.Release|Any CPU.Build.0 = Release|Any CPU
32+
{468B17AA-5509-481D-884E-145978311BBA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
33+
{468B17AA-5509-481D-884E-145978311BBA}.Debug|Any CPU.Build.0 = Debug|Any CPU
34+
{468B17AA-5509-481D-884E-145978311BBA}.Release|Any CPU.ActiveCfg = Release|Any CPU
35+
{468B17AA-5509-481D-884E-145978311BBA}.Release|Any CPU.Build.0 = Release|Any CPU
2836
EndGlobalSection
2937
GlobalSection(SolutionProperties) = preSolution
3038
HideSolutionNode = FALSE
3139
EndGlobalSection
3240
GlobalSection(NestedProjects) = preSolution
3341
{62E576F6-E0F9-43AC-848E-97E7E93E4EF0} = {21874FF3-BE97-4F3B-AD63-EC347FF757FF}
3442
{FACCC87C-37E3-4095-9997-75EE3C54CD49} = {62241700-24B6-4D13-A183-2502D628ED43}
43+
{468B17AA-5509-481D-884E-145978311BBA} = {CA676E97-11D2-4D19-96DA-2B66392A05D5}
3544
EndGlobalSection
3645
GlobalSection(ExtensibilityGlobals) = postSolution
3746
SolutionGuid = {41DEFC27-E139-4A35-989F-DA872E4B9F0A}

src/IntelliTect.AspNetCore.SignalR.SqlServer/Internal/SqlServer/SqlReceiver.cs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ public Task Start(Func<long, byte[], Task> onReceived)
6464

6565
_onReceived = onReceived;
6666

67-
return Task.Factory.StartNew(StartLoop, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach);
67+
return Task.Factory
68+
.StartNew(StartLoop, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach)
69+
.Unwrap();
6870
}
6971

7072
private async Task StartLoop()
@@ -78,19 +80,26 @@ private async Task StartLoop()
7880

7981
if (_cts.IsCancellationRequested) return;
8082

81-
Func<CancellationToken, Task> loop = StartSqlDependencyListener()
82-
? NotificationLoop
83-
: PollingLoop;
84-
85-
await loop(_cts.Token);
83+
if (_options.Mode.HasFlag(SqlServerMessageMode.ServiceBroker) && StartSqlDependencyListener())
84+
{
85+
await NotificationLoop(_cts.Token);
86+
}
87+
else if (_options.Mode.HasFlag(SqlServerMessageMode.Polling))
88+
{
89+
await PollingLoop(_cts.Token);
90+
}
91+
else
92+
{
93+
throw new InvalidOperationException("None of the configured SqlServerMessageMode are suitable for use.");
94+
}
8695
}
8796

8897
/// <summary>
8998
/// Loop until cancelled, using SQL service broker notifications to watch for new rows.
9099
/// </summary>
91100
private async Task NotificationLoop(CancellationToken cancellationToken)
92101
{
93-
while (StartSqlDependencyListener())
102+
while (!_notificationsDisabled)
94103
{
95104
if (cancellationToken.IsCancellationRequested) return;
96105

@@ -99,6 +108,7 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
99108
_logger.LogDebug("{0}Setting up SQL notification", _tracePrefix);
100109

101110
var tcs = new TaskCompletionSource<SqlNotificationEventArgs>();
111+
var cancelReg = cancellationToken.Register((t) => ((TaskCompletionSource<SqlNotificationEventArgs>)t!).TrySetCanceled(), tcs);
102112
var recordCount = await ReadRows(command =>
103113
{
104114
var dependency = new SqlDependency(command, null, (int)_dependencyTimeout.TotalSeconds);
@@ -121,6 +131,7 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
121131
_logger.LogTrace("{0}No records received while setting up SQL notification", _tracePrefix);
122132

123133
var depResult = await tcs.Task;
134+
cancelReg.Dispose();
124135
if (cancellationToken.IsCancellationRequested) return;
125136

126137
// Check notification args for issues
@@ -166,6 +177,7 @@ private async Task NotificationLoop(CancellationToken cancellationToken)
166177
break;
167178
}
168179
}
180+
catch (TaskCanceledException) { return; }
169181
catch (Exception ex)
170182
{
171183
_logger.LogError(ex, "{0}Error in SQL notification loop", _tracePrefix);
@@ -195,18 +207,19 @@ private async Task PollingLoop(CancellationToken cancellationToken)
195207
{
196208
if (cancellationToken.IsCancellationRequested) return;
197209

198-
if (retryDelay > 0)
199-
{
200-
_logger.LogTrace("{0}Waiting {1}ms before checking for messages again", _tracePrefix, retryDelay);
201-
202-
await Task.Delay(retryDelay, cancellationToken);
203-
}
204-
205210
var recordCount = 0;
206211
try
207212
{
213+
if (retryDelay > 0)
214+
{
215+
_logger.LogTrace("{0}Waiting {1}ms before checking for messages again", _tracePrefix, retryDelay);
216+
217+
await Task.Delay(retryDelay, cancellationToken);
218+
}
219+
208220
recordCount = await ReadRows(null);
209221
}
222+
catch (TaskCanceledException) { return; }
210223
catch (Exception ex)
211224
{
212225
_logger.LogError(ex, "{0}Error in SQL polling loop", _tracePrefix);

src/IntelliTect.AspNetCore.SignalR.SqlServer/Internal/SqlServerProtocol.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private void WriteInvocationCore(ref MessagePackWriter writer, string methodName
113113
WriteHubMessage(ref writer, new InvocationMessage(methodName, args));
114114
}
115115

116-
public SqlServerInvocation ReadInvocation(ReadOnlyMemory<byte> data)
116+
public SqlServerInvocation ReadInvocationAll(ReadOnlyMemory<byte> data)
117117
{
118118
// See WriteInvocation for the format
119119
var reader = new MessagePackReader(data);
@@ -230,10 +230,11 @@ public SqlServerGroupCommand ReadGroupCommand(ReadOnlyMemory<byte> data)
230230
{
231231
var reader = new MessagePackReader(data);
232232

233+
reader.ReadByte(); // Skip header
234+
233235
// See WriteGroupCommand for format.
234236
ValidateArraySize(ref reader, 5, "GroupCommand");
235237

236-
reader.ReadByte(); // Skip header
237238
var id = reader.ReadInt32();
238239
var serverName = reader.ReadString();
239240
var action = (GroupAction)reader.ReadByte();

src/IntelliTect.AspNetCore.SignalR.SqlServer/SqlServerHubLifetimeManager.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
using Microsoft.Extensions.Logging;
1616
using Microsoft.Extensions.Options;
1717
using Microsoft.AspNetCore.SignalR;
18+
using System.Runtime.CompilerServices;
19+
20+
[assembly: InternalsVisibleTo("IntelliTect.AspNetCore.SignalR.SqlServer.Tests")]
1821

1922
namespace IntelliTect.AspNetCore.SignalR.SqlServer
2023
{
@@ -408,6 +411,8 @@ void StartReceiving(int streamIndex)
408411
{
409412
if (_disposed) return;
410413

414+
_logger.LogError(t.Exception, "{0}The SQL Server SignalR message receiver encountered an error.", tracePrefix);
415+
411416
// Try again in a little bit
412417
await Task.Delay(2000);
413418
StartReceiving(streamIndex);
@@ -437,7 +442,7 @@ private async Task OnReceived(byte[] message)
437442
{
438443
case MessageType.InvocationAll:
439444

440-
invocation = _protocol.ReadInvocation(message);
445+
invocation = _protocol.ReadInvocationAll(message);
441446
connections = _connections;
442447
goto multiInvocation;
443448

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
3+
4+
using System;
5+
using System.ComponentModel.DataAnnotations;
6+
using System.IO;
7+
using System.Net;
8+
using System.Threading.Tasks;
9+
10+
namespace IntelliTect.AspNetCore.SignalR.SqlServer
11+
{
12+
13+
public enum SqlServerMessageMode
14+
{
15+
ServiceBroker = 1 << 0,
16+
Polling = 1 << 1,
17+
18+
Auto = ServiceBroker | Polling,
19+
}
20+
}

src/IntelliTect.AspNetCore.SignalR.SqlServer/SqlServerOptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,7 @@ public class SqlServerOptions
4545
/// "ALTER DATABASE [DatabaseName] SET ENABLE_BROKER". It requires an exclusive lock on the database.
4646
/// </summary>
4747
public bool AutoEnableServiceBroker { get; set; } = false;
48+
49+
public SqlServerMessageMode Mode { get; set; } = SqlServerMessageMode.Auto;
4850
}
4951
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net5.0</TargetFramework>
5+
6+
<IsPackable>false</IsPackable>
7+
</PropertyGroup>
8+
9+
<ItemGroup>
10+
<PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" />
11+
<PackageReference Include="Microsoft.AspNetCore.SignalR.Protocols.MessagePack" Version="5.0.10" />
12+
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0" />
13+
<PackageReference Include="xunit" Version="2.4.1" />
14+
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
15+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
16+
<PrivateAssets>all</PrivateAssets>
17+
</PackageReference>
18+
<PackageReference Include="coverlet.collector" Version="3.1.0">
19+
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
20+
<PrivateAssets>all</PrivateAssets>
21+
</PackageReference>
22+
<PackageReference Include="Xunit.SkippableFact" Version="1.4.13" />
23+
</ItemGroup>
24+
25+
<ItemGroup>
26+
<ProjectReference Include="..\..\src\IntelliTect.AspNetCore.SignalR.SqlServer\IntelliTect.AspNetCore.SignalR.SqlServer.csproj" />
27+
</ItemGroup>
28+
29+
</Project>
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using IntelliTect.AspNetCore.SignalR.SqlServer.Internal;
2+
using Microsoft.Data.SqlClient;
3+
using Microsoft.Extensions.Logging.Abstractions;
4+
using System;
5+
using System.Collections.Concurrent;
6+
using System.Collections.Generic;
7+
using System.Linq;
8+
using System.Text;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Xunit;
12+
13+
namespace IntelliTect.AspNetCore.SignalR.SqlServer.Tests
14+
{
15+
public class SqlServerEndToEndTests
16+
{
17+
private const string databaseName = "SignalRUnitTestsDb";
18+
private const string connectionString =
19+
"Server=localhost;Database=" + databaseName + ";Trusted_Connection=True;";
20+
21+
[SkippableFact]
22+
public async Task CanSendAndReceivePayloads_WithServiceBroker()
23+
{
24+
await CreateDatabaseAsync();
25+
26+
var options = new SqlServerOptions
27+
{
28+
ConnectionString = connectionString,
29+
AutoEnableServiceBroker = true,
30+
Mode = SqlServerMessageMode.ServiceBroker
31+
};
32+
33+
var prefix = nameof(CanSendAndReceivePayloads_WithServiceBroker);
34+
await RunCore(options, prefix);
35+
}
36+
37+
[SkippableFact]
38+
public async Task CanSendAndReceivePayloads_WithPolling()
39+
{
40+
await CreateDatabaseAsync();
41+
42+
var options = new SqlServerOptions
43+
{
44+
ConnectionString = connectionString,
45+
Mode = SqlServerMessageMode.Polling
46+
};
47+
48+
var prefix = nameof(CanSendAndReceivePayloads_WithPolling);
49+
await RunCore(options, prefix);
50+
}
51+
52+
private static async Task RunCore(SqlServerOptions options, string prefix)
53+
{
54+
var installer = new SqlInstaller(options, NullLogger.Instance, prefix);
55+
var sender = new SqlSender(options, NullLogger.Instance, prefix + "_0");
56+
var receiver = new SqlReceiver(options, NullLogger.Instance, prefix + "_0", "");
57+
58+
var receivedMessages = new ConcurrentBag<byte[]>();
59+
var receivedEvent = new SemaphoreSlim(0);
60+
await installer.Install();
61+
var receiverTask = receiver.Start((_, message) =>
62+
{
63+
receivedMessages.Add(message);
64+
receivedEvent.Release();
65+
return Task.CompletedTask;
66+
});
67+
// Give the receiver time to reach a steady state (waiting).
68+
await Task.Delay(150);
69+
70+
var payload = new byte[255];
71+
new Random().NextBytes(payload);
72+
await sender.Send(payload);
73+
74+
await receivedEvent.WaitAsync();
75+
var message = Assert.Single(receivedMessages);
76+
Assert.Equal(payload, message);
77+
78+
// Give the receiver time to reach a steady state (waiting).
79+
await Task.Delay(50);
80+
81+
receiver.Dispose();
82+
await receiverTask;
83+
}
84+
85+
private static async Task CreateDatabaseAsync()
86+
{
87+
try
88+
{
89+
using var connection = new SqlConnection(connectionString.Replace(databaseName, "master"));
90+
await connection.OpenAsync();
91+
using var command = connection.CreateCommand();
92+
command.CommandText = $@"IF NOT EXISTS (SELECT * FROM sys.databases WHERE name = '{databaseName}')
93+
BEGIN CREATE DATABASE {databaseName}; END";
94+
await command.ExecuteNonQueryAsync();
95+
}
96+
catch (SqlException ex) when (ex.Number == 53)
97+
{
98+
Skip.If(true, "SQL Server not available on localhost");
99+
}
100+
}
101+
}
102+
}

0 commit comments

Comments
 (0)