Skip to content

Commit 9367a10

Browse files
websocket implementation with postgres backplane. Includes:
1) support for messages with ack 2) allows messages to be sent on user reconnect 3) updates order count when order received notification read. 4) deletes old messages 5) handles websocket reconnect 6) aws sticky sessions
1 parent 34799a4 commit 9367a10

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1971
-195
lines changed

api/Controllers/NotificationsController.cs

Lines changed: 0 additions & 41 deletions
This file was deleted.

api/Hubs/NotificationsHub.cs

Lines changed: 132 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
using System.Security.Claims;
21
using System;
32
using System.Linq;
43
using System.Threading.Tasks;
@@ -7,12 +6,29 @@
76
using Microsoft.Extensions.Configuration;
87
using Microsoft.Extensions.DependencyInjection;
98
using Microsoft.Extensions.Logging;
9+
using Scv.Api.Helpers.Extensions;
10+
using Scv.Api.SignalR;
11+
using Scv.Db.Repositories;
1012

1113
namespace Scv.Api.Hubs;
1214

1315
[Authorize]
1416
public class NotificationsHub : Hub
1517
{
18+
private readonly UserConnectionTracker _connectionTracker;
19+
20+
public NotificationsHub(UserConnectionTracker connectionTracker)
21+
{
22+
_connectionTracker = connectionTracker;
23+
}
24+
25+
/// <summary>
26+
/// Validates origin and user of this connection in order to safely send/receive notifications.
27+
/// </summary>
28+
/// <remarks>
29+
/// Uses server-side auth to validate that this user is authorized to ack this message.
30+
/// </remarks>
31+
/// <returns>A task representing the asynchronous operation.</returns>
1632
public override Task OnConnectedAsync()
1733
{
1834
var httpContext = Context.GetHttpContext();
@@ -55,7 +71,7 @@ public override Task OnConnectedAsync()
5571
}
5672
}
5773

58-
var userId = Context.User?.FindFirstValue(ClaimTypes.NameIdentifier);
74+
var userId = Context.User?.UserId();
5975
if (string.IsNullOrWhiteSpace(userId))
6076
{
6177
if (logger != null)
@@ -74,6 +90,120 @@ public override Task OnConnectedAsync()
7490

7591
logger?.LogInformation("SignalR connection accepted for user {UserId}.", userId);
7692

93+
// Track users connected to this server, so that this server only attempts to send notifications to users with an active connection.
94+
_connectionTracker.AddConnection(userId, Context.ConnectionId);
95+
7796
return base.OnConnectedAsync();
7897
}
98+
99+
public override Task OnDisconnectedAsync(Exception exception)
100+
{
101+
var userId = Context.User?.UserId();
102+
if (!string.IsNullOrWhiteSpace(userId))
103+
{
104+
_connectionTracker.RemoveConnection(userId, Context.ConnectionId);
105+
}
106+
107+
return base.OnDisconnectedAsync(exception);
108+
}
109+
110+
/// <summary>
111+
/// Acknowledges a notification for the current user by marking it as read. Note that an ack just indicates the client JASPER app received the notification - the app is responsible for presenting
112+
/// notifications to users.
113+
/// </summary>
114+
/// <param name="ackGuid">The stable identifier of the notification to acknowledge.</param>
115+
/// <remarks>
116+
/// Uses server-side auth to validate that this user is authorized to ack this message.
117+
/// </remarks>
118+
/// <returns>A task representing the asynchronous operation.</returns>
119+
public async Task AckNotification(Guid ackGuid)
120+
{
121+
var logger = Context.GetHttpContext()
122+
?.RequestServices.GetService<ILogger<NotificationsHub>>();
123+
var userId = Context.User?.UserId();
124+
if (string.IsNullOrWhiteSpace(userId))
125+
{
126+
logger?.LogWarning(
127+
"SignalR ack failed due to missing user id. AckGuid={AckGuid}",
128+
ackGuid);
129+
return;
130+
}
131+
132+
var repository = Context.GetHttpContext()
133+
?.RequestServices.GetService<INotificationRepository>();
134+
if (repository == null)
135+
{
136+
logger?.LogWarning(
137+
"SignalR ack failed due to missing notification repository. AckGuid={AckGuid} UserId={UserId}",
138+
ackGuid,
139+
userId);
140+
return;
141+
}
142+
143+
var matches = await repository.FindAsync(message => message.AckGuid == ackGuid);
144+
var message = matches.FirstOrDefault();
145+
if (message == null)
146+
{
147+
logger?.LogWarning(
148+
"SignalR ack failed to find message. AckGuid={AckGuid} UserId={UserId}",
149+
ackGuid,
150+
userId);
151+
return;
152+
}
153+
154+
if (!IsAckAuthorized(message.UserId, userId, out var reason))
155+
{
156+
logger?.LogWarning(
157+
"SignalR ack rejected. AckGuid={AckGuid} UserId={UserId} Reason={Reason}",
158+
ackGuid,
159+
userId,
160+
reason);
161+
return;
162+
}
163+
164+
if (!message.AckRequired)
165+
{
166+
logger?.LogInformation(
167+
"SignalR ack skipped because ack is not required. AckGuid={AckGuid} UserId={UserId}",
168+
ackGuid,
169+
userId);
170+
return;
171+
}
172+
173+
if (message.AckedAt.HasValue)
174+
{
175+
logger?.LogInformation(
176+
"SignalR ack skipped because message already acked. AckGuid={AckGuid} UserId={UserId}",
177+
ackGuid,
178+
userId);
179+
return;
180+
}
181+
182+
message.AckedAt = DateTimeOffset.UtcNow;
183+
message.AckedBy = userId;
184+
await repository.UpdateAsync(message);
185+
186+
logger?.LogInformation(
187+
"SignalR ack accepted. AckGuid={AckGuid} UserId={UserId}",
188+
ackGuid,
189+
userId);
190+
}
191+
192+
private static bool IsAckAuthorized(string messageUserId, string userId, out string reason)
193+
{
194+
reason = string.Empty;
195+
if (string.IsNullOrWhiteSpace(messageUserId))
196+
{
197+
reason = "UserMissing";
198+
return false;
199+
}
200+
201+
if (!string.Equals(messageUserId, userId, StringComparison.OrdinalIgnoreCase))
202+
{
203+
reason = "UserMismatch";
204+
return false;
205+
}
206+
207+
return true;
208+
}
79209
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace Scv.Api.Infrastructure.Options;
2+
3+
public sealed class JobsCleanupSignalRMessagesOptions
4+
{
5+
public string CronSchedule { get; set; } = "0 0 * * *"; // Every day at midnight
6+
}

api/Infrastructure/ServiceCollectionExtensions.cs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Linq;
3+
using System.Collections.Generic;
34
using System.Net.Http;
45
using System.Reflection;
56
using Amazon;
@@ -24,6 +25,7 @@
2425
using Microsoft.Extensions.Options;
2526
using Microsoft.Graph;
2627
using MongoDB.Driver;
28+
using PostgreSQL.ListenNotify.DependencyInjection;
2729
using Scv.Api.Documents;
2830
using Scv.Api.Documents.Parsers;
2931
using Scv.Api.Documents.Strategies;
@@ -39,6 +41,7 @@
3941
using Scv.Api.Models.AccessControlManagement;
4042
using Scv.Api.Processors;
4143
using Scv.Api.Repositories;
44+
using Scv.Api.SignalR;
4245
using Scv.Api.Services;
4346
using Scv.Api.Services.Files;
4447
using Scv.Db.Contexts;
@@ -158,6 +161,46 @@ public static IServiceCollection AddGraphService(this IServiceCollection service
158161
return services;
159162
}
160163

164+
public static IServiceCollection AddSignalRPostgresBackplane(this IServiceCollection services, IConfiguration configuration)
165+
{
166+
var connectionString = configuration.GetValue<string>("DatabaseConnectionString");
167+
if (string.IsNullOrWhiteSpace(connectionString))
168+
{
169+
throw new InvalidOperationException("DatabaseConnectionString is required to use the SignalR Postgres backplane.");
170+
}
171+
172+
var channel = configuration.GetValue<string>("SignalR:PostgresChannel") ?? "signalr";
173+
var instanceId = $"{Environment.MachineName}-{Guid.NewGuid()}";
174+
var maxPayloadBytes = configuration.GetValue<int?>("SignalR:PostgresMaxPayloadBytes") ?? 7000;
175+
var outboxPollSeconds = configuration.GetValue<int?>("SignalR:PostgresOutboxPollSeconds") ?? 10;
176+
var outboxBatchSize = configuration.GetValue<int?>("SignalR:PostgresOutboxBatchSize") ?? 100;
177+
var outboxRetentionMinutes = configuration.GetValue<int?>("SignalR:PostgresOutboxRetentionMinutes") ?? 1440;
178+
var outboxMinAgeSeconds = configuration.GetValue<int?>("SignalR:PostgresOutboxMinAgeSeconds") ?? 10;
179+
180+
services.AddPostgresNotifications(options =>
181+
{
182+
options.ConnectionString = connectionString;
183+
options.ListenChannels = new List<string> { channel };
184+
options.DefaultNotifyChannel = channel;
185+
options.ApplicationName = "Jasper.SignalR";
186+
});
187+
188+
services.AddSingleton(new PostgresSignalRBackplaneOptions(
189+
channel,
190+
instanceId,
191+
maxPayloadBytes,
192+
outboxPollSeconds,
193+
outboxBatchSize,
194+
outboxRetentionMinutes,
195+
outboxMinAgeSeconds));
196+
services.AddSingleton<UserConnectionTracker>();
197+
services.AddScoped<OrderReceivedAckNotification>();
198+
services.AddScoped<INotificationService, PostgresSignalRNotificationPublisher>();
199+
services.AddHostedService<PostgresSignalRNotificationBridge>();
200+
201+
return services;
202+
}
203+
161204
public static IServiceCollection AddAWSConfig(this IServiceCollection services, IConfiguration configuration)
162205
{
163206
var region = configuration.GetValue<string>("AWS_REGION");
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using LazyCache;
4+
using MapsterMapper;
5+
using Microsoft.Extensions.Configuration;
6+
using Microsoft.Extensions.Logging;
7+
using Microsoft.Extensions.Options;
8+
using Scv.Api.Infrastructure.Options;
9+
using Scv.Db.Repositories;
10+
11+
namespace Scv.Api.Jobs;
12+
13+
public class CleanupSignalRMessagesJob(
14+
IConfiguration configuration,
15+
IAppCache cache,
16+
IMapper mapper,
17+
ILogger<CleanupSignalRMessagesJob> logger,
18+
INotificationRepository notificationRepository,
19+
IOptions<JobsCleanupSignalRMessagesOptions> options)
20+
: RecurringJobBase<CleanupSignalRMessagesJob>(configuration, cache, mapper, logger)
21+
{
22+
private readonly INotificationRepository _notificationRepository = notificationRepository;
23+
private readonly JobsCleanupSignalRMessagesOptions _options = options.Value;
24+
25+
public override string JobName => nameof(CleanupSignalRMessagesJob);
26+
27+
public override string CronSchedule => _options.CronSchedule;
28+
29+
public override async Task Execute()
30+
{
31+
var retentionMinutes = Configuration.GetValue<int>("SignalR:PostgresOutboxRetentionMinutes");
32+
if (retentionMinutes <= 0)
33+
{
34+
Logger.LogInformation(
35+
"SignalR cleanup skipped because retention minutes is {RetentionMinutes}.",
36+
retentionMinutes);
37+
return;
38+
}
39+
40+
var cutoff = DateTimeOffset.UtcNow.AddMinutes(-retentionMinutes);
41+
var deleted = await _notificationRepository.DeleteOlderThanAsync(cutoff);
42+
43+
Logger.LogInformation(
44+
"SignalR cleanup deleted {DeletedCount} messages older than {Cutoff}.",
45+
deleted,
46+
cutoff);
47+
}
48+
}

0 commit comments

Comments
 (0)