Skip to content

Commit b256a2b

Browse files
committed
Add UserConfigurationSiloConfigurator and refactor SignalR grains for improved observer management
1 parent 3bf19a3 commit b256a2b

File tree

10 files changed

+785
-143
lines changed

10 files changed

+785
-143
lines changed
Lines changed: 74 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System;
12
using System.Collections.Generic;
3+
using System.Linq;
24
using System.Threading;
35
using System.Threading.Tasks;
46
using ManagedCode.Orleans.SignalR.Core.Config;
@@ -12,56 +14,69 @@
1214
using Orleans;
1315
using Orleans.Concurrency;
1416
using Orleans.Runtime;
15-
using Orleans.Utilities;
1617

1718
namespace ManagedCode.Orleans.SignalR.Server;
1819

1920
[Reentrant]
2021
[GrainType($"ManagedCode.{nameof(SignalRConnectionHolderGrain)}")]
21-
public class SignalRConnectionHolderGrain : Grain, ISignalRConnectionHolderGrain
22+
public class SignalRConnectionHolderGrain : SignalRObserverGrainBase<SignalRConnectionHolderGrain>, ISignalRConnectionHolderGrain
2223
{
23-
private readonly ILogger<SignalRConnectionHolderGrain> _logger;
24-
private readonly ObserverManager<ISignalRObserver> _observerManager;
2524
private readonly IPersistentState<ConnectionState> _stateStorage;
2625

27-
public SignalRConnectionHolderGrain(ILogger<SignalRConnectionHolderGrain> logger,
28-
IOptions<OrleansSignalROptions> orleansSignalOptions, IOptions<HubOptions> hubOptions,
26+
public SignalRConnectionHolderGrain(
27+
ILogger<SignalRConnectionHolderGrain> logger,
28+
IOptions<OrleansSignalROptions> orleansSignalOptions,
29+
IOptions<HubOptions> hubOptions,
2930
[PersistentState(nameof(SignalRConnectionHolderGrain), OrleansSignalROptions.OrleansSignalRStorage)]
3031
IPersistentState<ConnectionState> stateStorage)
32+
: base(logger, orleansSignalOptions, hubOptions)
3133
{
32-
_logger = logger;
3334
_stateStorage = stateStorage;
34-
35-
var timeSpan = TimeIntervalHelper.GetClientTimeoutInterval(orleansSignalOptions, hubOptions);
36-
var expiration = TimeIntervalHelper.GetObserverExpiration(orleansSignalOptions, timeSpan);
37-
_observerManager = new ObserverManager<ISignalRObserver>(expiration, _logger);
3835
}
3936

37+
protected override int TrackedConnectionCount => _stateStorage.State.ConnectionIds.Count;
38+
4039
public Task AddConnection(string connectionId, ISignalRObserver observer)
4140
{
42-
Logs.AddConnection(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
43-
_observerManager.Subscribe(observer, observer);
44-
_stateStorage.State.ConnectionIds.Add(connectionId, observer.GetPrimaryKeyString());
41+
Logs.AddConnection(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
42+
_stateStorage.State.ConnectionIds[connectionId] = observer.GetPrimaryKeyString();
43+
TrackConnection(connectionId, observer);
4544
return Task.CompletedTask;
4645
}
4746

4847
public Task RemoveConnection(string connectionId, ISignalRObserver observer)
4948
{
50-
Logs.RemoveConnection(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
51-
_observerManager.Unsubscribe(observer);
49+
Logs.RemoveConnection(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
5250
_stateStorage.State.ConnectionIds.Remove(connectionId);
51+
UntrackConnection(connectionId, observer);
5352
return Task.CompletedTask;
5453
}
5554

5655
public async Task SendToAll(HubMessage message)
5756
{
58-
Logs.SendToAll(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
59-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message)));
57+
Logs.SendToAll(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
58+
59+
if (!KeepEachConnectionAlive && LiveObservers.Count > 0)
60+
{
61+
DispatchToLiveObservers(LiveObservers.Values, message);
62+
return;
63+
}
64+
65+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message)));
6066
}
6167

6268
public async Task SendToAllExcept(HubMessage message, string[] excludedConnectionIds)
6369
{
64-
Logs.SendToAllExcept(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), excludedConnectionIds);
70+
Logs.SendToAllExcept(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), excludedConnectionIds);
71+
72+
if (!KeepEachConnectionAlive && LiveObservers.Count > 0)
73+
{
74+
var excluded = new HashSet<string>(excludedConnectionIds, StringComparer.Ordinal);
75+
var targets = LiveObservers.Where(kvp => !excluded.Contains(kvp.Key)).Select(kvp => kvp.Value);
76+
DispatchToLiveObservers(targets, message);
77+
return;
78+
}
79+
6580
var hashSet = new HashSet<string>();
6681
foreach (var connectionId in excludedConnectionIds)
6782
{
@@ -71,28 +86,53 @@ public async Task SendToAllExcept(HubMessage message, string[] excludedConnectio
7186
}
7287
}
7388

74-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message),
89+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message),
7590
connection => !hashSet.Contains(connection.GetPrimaryKeyString())));
7691
}
7792

7893
public async Task<bool> SendToConnection(HubMessage message, string connectionId)
7994
{
80-
Logs.SendToConnection(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
95+
Logs.SendToConnection(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionId);
8196

8297
if (!_stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
8398
{
8499
return false;
85100
}
86101

87-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message),
102+
if (TryGetLiveObserver(connectionId, out var liveObserver))
103+
{
104+
_ = liveObserver.OnNextAsync(message);
105+
return true;
106+
}
107+
108+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message),
88109
connection => connection.GetPrimaryKeyString() == observer));
89110

90111
return true;
91112
}
92113

93114
public async Task SendToConnections(HubMessage message, string[] connectionIds)
94115
{
95-
Logs.SendToConnections(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionIds);
116+
Logs.SendToConnections(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString(), connectionIds);
117+
118+
if (!KeepEachConnectionAlive && LiveObservers.Count > 0)
119+
{
120+
List<ISignalRObserver>? targets = null;
121+
foreach (var connectionId in connectionIds)
122+
{
123+
if (TryGetLiveObserver(connectionId, out var observer))
124+
{
125+
targets ??= new List<ISignalRObserver>();
126+
targets.Add(observer);
127+
}
128+
}
129+
130+
if (targets is not null)
131+
{
132+
DispatchToLiveObservers(targets, message);
133+
return;
134+
}
135+
}
96136

97137
var hashSet = new HashSet<string>();
98138
foreach (var connectionId in connectionIds)
@@ -103,23 +143,23 @@ public async Task SendToConnections(HubMessage message, string[] connectionIds)
103143
}
104144
}
105145

106-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message),
146+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message),
107147
connection => hashSet.Contains(connection.GetPrimaryKeyString())));
108148
}
109149

110150
public Task Ping(ISignalRObserver observer)
111151
{
112-
Logs.Ping(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
113-
_observerManager.Subscribe(observer, observer);
152+
Logs.Ping(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
153+
TouchObserver(observer);
114154
return Task.CompletedTask;
115155
}
116156

117157
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
118158
{
119-
Logs.OnDeactivateAsync(_logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
120-
_observerManager.ClearExpired();
159+
Logs.OnDeactivateAsync(Logger, nameof(SignalRConnectionHolderGrain), this.GetPrimaryKeyString());
160+
ClearObserverTracking();
121161

122-
if (_observerManager.Count == 0 || _stateStorage.State.ConnectionIds.Count == 0)
162+
if (ObserverManager.Count == 0 || _stateStorage.State.ConnectionIds.Count == 0)
123163
{
124164
await _stateStorage.ClearStateAsync(cancellationToken);
125165
}
@@ -128,4 +168,9 @@ public override async Task OnDeactivateAsync(DeactivationReason reason, Cancella
128168
await _stateStorage.WriteStateAsync(cancellationToken);
129169
}
130170
}
171+
172+
protected override void OnLiveObserverDispatchFailure(Exception exception)
173+
{
174+
Logger.LogWarning(exception, "Live observer send failed for holder {Holder}.", this.GetPrimaryKeyString());
175+
}
131176
}
Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
using System;
12
using System.Collections.Generic;
3+
using System.Linq;
24
using System.Threading;
35
using System.Threading.Tasks;
46
using ManagedCode.Orleans.SignalR.Core.Config;
@@ -12,58 +14,71 @@
1214
using Orleans;
1315
using Orleans.Concurrency;
1416
using Orleans.Runtime;
15-
using Orleans.Utilities;
1617

1718
namespace ManagedCode.Orleans.SignalR.Server;
1819

1920
[Reentrant]
2021
[GrainType($"ManagedCode.{nameof(SignalRConnectionPartitionGrain)}")]
21-
public class SignalRConnectionPartitionGrain : Grain, ISignalRConnectionPartitionGrain
22+
public class SignalRConnectionPartitionGrain : SignalRObserverGrainBase<SignalRConnectionPartitionGrain>, ISignalRConnectionPartitionGrain
2223
{
23-
private readonly ILogger<SignalRConnectionPartitionGrain> _logger;
24-
private readonly ObserverManager<ISignalRObserver> _observerManager;
2524
private readonly IPersistentState<ConnectionState> _stateStorage;
2625

27-
public SignalRConnectionPartitionGrain(ILogger<SignalRConnectionPartitionGrain> logger,
28-
IOptions<OrleansSignalROptions> orleansSignalOptions, IOptions<HubOptions> hubOptions,
26+
public SignalRConnectionPartitionGrain(
27+
ILogger<SignalRConnectionPartitionGrain> logger,
28+
IOptions<OrleansSignalROptions> orleansSignalOptions,
29+
IOptions<HubOptions> hubOptions,
2930
[PersistentState(nameof(SignalRConnectionPartitionGrain), OrleansSignalROptions.OrleansSignalRStorage)]
3031
IPersistentState<ConnectionState> stateStorage)
32+
: base(logger, orleansSignalOptions, hubOptions)
3133
{
32-
_logger = logger;
3334
_stateStorage = stateStorage;
3435

35-
var timeSpan = TimeIntervalHelper.GetClientTimeoutInterval(orleansSignalOptions, hubOptions);
36-
var expiration = TimeIntervalHelper.GetObserverExpiration(orleansSignalOptions, timeSpan);
37-
_observerManager = new ObserverManager<ISignalRObserver>(expiration, _logger);
38-
3936
_stateStorage.State ??= new ConnectionState();
4037
}
4138

39+
protected override int TrackedConnectionCount => _stateStorage.State.ConnectionIds.Count;
40+
4241
public Task AddConnection(string connectionId, ISignalRObserver observer)
4342
{
44-
Logs.AddConnection(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionId);
45-
_observerManager.Subscribe(observer, observer);
43+
Logs.AddConnection(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionId);
4644
_stateStorage.State.ConnectionIds.Add(connectionId, observer.GetPrimaryKeyString());
45+
TrackConnection(connectionId, observer);
4746
return Task.CompletedTask;
4847
}
4948

5049
public Task RemoveConnection(string connectionId, ISignalRObserver observer)
5150
{
52-
Logs.RemoveConnection(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionId);
53-
_observerManager.Unsubscribe(observer);
51+
Logs.RemoveConnection(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionId);
5452
_stateStorage.State.ConnectionIds.Remove(connectionId);
53+
UntrackConnection(connectionId, observer);
5554
return Task.CompletedTask;
5655
}
5756

5857
public async Task SendToPartition(HubMessage message)
5958
{
60-
Logs.SendToAll(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString());
61-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message)));
59+
Logs.SendToAll(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString());
60+
61+
if (!KeepEachConnectionAlive && LiveObservers.Count > 0)
62+
{
63+
DispatchToLiveObservers(LiveObservers.Values, message);
64+
return;
65+
}
66+
67+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message)));
6268
}
6369

6470
public async Task SendToPartitionExcept(HubMessage message, string[] excludedConnectionIds)
6571
{
66-
Logs.SendToAllExcept(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), excludedConnectionIds);
72+
Logs.SendToAllExcept(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), excludedConnectionIds);
73+
74+
if (!KeepEachConnectionAlive && LiveObservers.Count > 0)
75+
{
76+
var excluded = new HashSet<string>(excludedConnectionIds, StringComparer.Ordinal);
77+
var targets = LiveObservers.Where(kvp => !excluded.Contains(kvp.Key)).Select(kvp => kvp.Value);
78+
DispatchToLiveObservers(targets, message);
79+
return;
80+
}
81+
6782
var hashSet = new HashSet<string>();
6883
foreach (var connectionId in excludedConnectionIds)
6984
{
@@ -73,28 +88,53 @@ public async Task SendToPartitionExcept(HubMessage message, string[] excludedCon
7388
}
7489
}
7590

76-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message),
91+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message),
7792
connection => !hashSet.Contains(connection.GetPrimaryKeyString())));
7893
}
7994

8095
public async Task<bool> SendToConnection(HubMessage message, string connectionId)
8196
{
82-
Logs.SendToConnection(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionId);
97+
Logs.SendToConnection(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionId);
8398

8499
if (!_stateStorage.State.ConnectionIds.TryGetValue(connectionId, out var observer))
85100
{
86101
return false;
87102
}
88103

89-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message),
104+
if (TryGetLiveObserver(connectionId, out var live))
105+
{
106+
_ = live.OnNextAsync(message);
107+
return true;
108+
}
109+
110+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message),
90111
connection => connection.GetPrimaryKeyString() == observer));
91112

92113
return true;
93114
}
94115

95116
public async Task SendToConnections(HubMessage message, string[] connectionIds)
96117
{
97-
Logs.SendToConnections(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionIds);
118+
Logs.SendToConnections(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString(), connectionIds);
119+
120+
if (!KeepEachConnectionAlive && LiveObservers.Count > 0)
121+
{
122+
List<ISignalRObserver>? targets = null;
123+
foreach (var connectionId in connectionIds)
124+
{
125+
if (TryGetLiveObserver(connectionId, out var observer))
126+
{
127+
targets ??= new List<ISignalRObserver>();
128+
targets.Add(observer);
129+
}
130+
}
131+
132+
if (targets is not null)
133+
{
134+
DispatchToLiveObservers(targets, message);
135+
return;
136+
}
137+
}
98138

99139
var hashSet = new HashSet<string>();
100140
foreach (var connectionId in connectionIds)
@@ -105,23 +145,23 @@ public async Task SendToConnections(HubMessage message, string[] connectionIds)
105145
}
106146
}
107147

108-
await Task.Run(() => _observerManager.Notify(s => s.OnNextAsync(message),
148+
await Task.Run(() => ObserverManager.Notify(s => s.OnNextAsync(message),
109149
connection => hashSet.Contains(connection.GetPrimaryKeyString())));
110150
}
111151

112152
public Task Ping(ISignalRObserver observer)
113153
{
114-
Logs.Ping(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString());
115-
_observerManager.Subscribe(observer, observer);
154+
Logs.Ping(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString());
155+
TouchObserver(observer);
116156
return Task.CompletedTask;
117157
}
118158

119159
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
120160
{
121-
Logs.OnDeactivateAsync(_logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString());
122-
_observerManager.ClearExpired();
161+
Logs.OnDeactivateAsync(Logger, nameof(SignalRConnectionPartitionGrain), this.GetPrimaryKeyLong().ToString());
162+
ClearObserverTracking();
123163

124-
if (_observerManager.Count == 0 || _stateStorage.State.ConnectionIds.Count == 0)
164+
if (ObserverManager.Count == 0 || _stateStorage.State.ConnectionIds.Count == 0)
125165
{
126166
await _stateStorage.ClearStateAsync(cancellationToken);
127167
}
@@ -130,4 +170,9 @@ public override async Task OnDeactivateAsync(DeactivationReason reason, Cancella
130170
await _stateStorage.WriteStateAsync(cancellationToken);
131171
}
132172
}
173+
174+
protected override void OnLiveObserverDispatchFailure(Exception exception)
175+
{
176+
Logger.LogWarning(exception, "Live observer send failed for partition {PartitionId}.", this.GetPrimaryKeyLong());
177+
}
133178
}

0 commit comments

Comments
 (0)