-
-
Notifications
You must be signed in to change notification settings - Fork 295
Expand file tree
/
Copy pathClusterConfig.cs
More file actions
412 lines (359 loc) · 17.5 KB
/
ClusterConfig.cs
File metadata and controls
412 lines (359 loc) · 17.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
// -----------------------------------------------------------------------
// <copyright file="ClusterConfig.cs" company="Asynkron AB">
// Copyright (C) 2015-2025 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Collections.Immutable;
using System.Linq;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Proto.Cluster.Gossip;
using Proto.Cluster.Identity;
using Proto.Cluster.PubSub;
using Proto.Remote;
namespace Proto.Cluster;
[PublicAPI]
public record ClusterConfig
{
/// <summary>
/// Creates new instance of <see cref="ClusterConfig" />.
/// </summary>
/// <param name="clusterName">
/// A name of the cluster. Various clustering providers will use this name
/// to distinguish between different clusters. The value should be the same for all members of the cluster.
/// </param>
/// <param name="clusterProvider"><see cref="IClusterProvider" /> to use for the cluster</param>
/// <param name="identityLookup"><see cref="IdentityLookup" /> implementation to use for the cluster</param>
/// <exception cref="ArgumentNullException"></exception>
private ClusterConfig(string clusterName, IClusterProvider clusterProvider, IIdentityLookup identityLookup)
{
ClusterName = clusterName ?? throw new ArgumentNullException(nameof(clusterName));
ClusterProvider = clusterProvider ?? throw new ArgumentNullException(nameof(clusterProvider));
ActorRequestTimeout = TimeSpan.FromSeconds(5);
ActorSpawnVerificationTimeout = TimeSpan.FromSeconds(5);
ActorActivationTimeout = TimeSpan.FromSeconds(5);
MaxNumberOfEventsInRequestLogThrottlePeriod = 3;
RequestLogThrottlePeriod = TimeSpan.FromSeconds(2);
GossipInterval = TimeSpan.FromMilliseconds(300);
GossipRequestTimeout = TimeSpan.FromMilliseconds(1500);
GossipFanout = 3;
GossipMaxSend = 50;
HeartbeatExpiration = TimeSpan.Zero;
ClusterRequestDeDuplicationWindow = TimeSpan.FromMinutes(1);
IdentityLookup = identityLookup;
MemberStrategyBuilder = (_, _) => new SimpleMemberStrategy();
RemotePidCacheTimeToLive = TimeSpan.FromMinutes(15);
RemotePidCacheClearInterval = TimeSpan.FromSeconds(15);
}
/// <summary>
/// A delegate that returns a <see cref="IMemberStrategy" /> for the given cluster kind.
/// By default, <see cref="SimpleMemberStrategy" /> is used for all cluster kinds.
/// </summary>
[JsonIgnore]
public Func<Cluster, string, IMemberStrategy> MemberStrategyBuilder { get; init; }
/// <summary>
/// A name of the cluster. Various clustering providers will use this name
/// to distinguish between different clusters. The value should be the same for all members of the cluster.
/// </summary>
public string ClusterName { get; }
/// <summary>
/// Enables debug logging for the gossip protocol.
/// </summary>
public bool GossipDebugLogging { get; set; } = false;
/// <summary>
/// Cluster kinds define types of the virtual actors supported by this member.
/// </summary>
public ImmutableList<ClusterKind> ClusterKinds { get; init; } = ImmutableList<ClusterKind>.Empty;
/// <summary>
/// <see cref="IClusterProvider" /> to use for the cluster.
/// </summary>
[JsonIgnore]
public IClusterProvider ClusterProvider { get; }
/// <summary>
/// Interval between gossip updates. Default is 300ms.
/// </summary>
public TimeSpan GossipInterval { get; init; }
/// <summary>
/// The timeout for sending the gossip state to another member. Default is 1500ms.
/// </summary>
public TimeSpan GossipRequestTimeout { get; init; }
/// <summary>
/// Gossip heartbeat timeout. If the member does not update its heartbeat within this period, it will be added to the
/// <see cref="BlockList" />.
/// Default is 20s. Set to <see cref="TimeSpan.Zero" /> to disable.
/// </summary>
public TimeSpan HeartbeatExpiration { get; set; }
/// <summary>
/// Timeout for single retry of actor request. Default is 5s.
/// Overall timeout for the request is controlled by the cancellation token on
/// <see cref="IClusterContext.RequestAsync{T}(ClusterIdentity, object, ISenderContext, CancellationToken)" />
/// </summary>
public TimeSpan ActorRequestTimeout { get; init; }
/// <summary>
/// Timeout for running the <see cref="ClusterKind.CanSpawnIdentity" /> check. Default is 5s.
/// </summary>
public TimeSpan ActorSpawnVerificationTimeout { get; init; }
/// <summary>
/// Timeout for activating an actor. Exact usage varies depending on <see cref="IIdentityLookup" /> used. Default is
/// 5s.
/// </summary>
public TimeSpan ActorActivationTimeout { get; init; }
/// <summary>
/// Throttle maximum events logged from cluster requests in a period of time. Specify period in this property. The
/// default is 2s.
/// </summary>
public TimeSpan RequestLogThrottlePeriod { get; init; }
/// <summary>
/// Throttle maximum events logged from cluster requests in a period of time. Specify number of events in this
/// property. The default is 3.
/// </summary>
public int MaxNumberOfEventsInRequestLogThrottlePeriod { get; init; }
/// <summary>
/// The number of random members the gossip will be sent to during each gossip update. Default is 3.
/// </summary>
public int GossipFanout { get; init; }
/// <summary>
/// Maximum number of member states to be sent to each member receiving gossip. Default is 50.
/// </summary>
public int GossipMaxSend { get; init; }
/// <summary>
/// The <see cref="IIdentityLookup" /> to use for the cluster
/// </summary>
[JsonIgnore]
public IIdentityLookup IdentityLookup { get; }
/// <summary>
/// Default window size for cluster deduplication (<see cref="Extensions.WithClusterRequestDeduplication" />). Default
/// is 30s.
/// </summary>
public TimeSpan ClusterRequestDeDuplicationWindow { get; init; }
/// <summary>
/// TTL for remote PID cache. Default is 15min. Set to <see cref="TimeSpan.Zero" /> to disable.
/// </summary>
public TimeSpan RemotePidCacheTimeToLive { get; set; }
/// <summary>
/// How often to check for stale PIDs in the remote PID cache. Default is 15s. Set to <see cref="TimeSpan.Zero" /> to
/// disable.
/// </summary>
public TimeSpan RemotePidCacheClearInterval { get; set; }
/// <summary>
/// Creates the <see cref="IClusterContext" />. The default implementation creates an instance of
/// <see cref="DefaultClusterContext" />
/// </summary>
[JsonIgnore]
public Func<Cluster, IClusterContext> ClusterContextProducer { get; init; } = c => new DefaultClusterContext(c);
/// <summary>
/// Configuration for the PubSub extension.
/// </summary>
public PubSubConfig PubSubConfig { get; init; } = PubSubConfig.Setup();
/// <summary>
/// Backwards compatibility. Set to true to have timed out requests return default(TResponse) instead of throwing
/// <see cref="TimeoutException" />
/// Default is false.
/// </summary>
public bool LegacyRequestTimeoutBehavior { get; init; }
/// <summary>
/// Exit the application process when the cluster is terminated. Default is false.
/// </summary>
public bool ExitOnShutdown { get; set; } = false;
/// <summary>
/// Timeout for single retry of actor request. Default is 5s.
/// Overall timeout for the request is controlled by the cancellation token on
/// <see cref="IClusterContext.RequestAsync{T}(ClusterIdentity, object, ISenderContext, CancellationToken)" />
/// </summary>
/// <param name="timeSpan"></param>
/// <returns></returns>
public ClusterConfig WithActorRequestTimeout(TimeSpan timeSpan) => this with { ActorRequestTimeout = timeSpan };
/// <summary>
/// Timeout for running the <see cref="ClusterKind.CanSpawnIdentity" /> check. Default is 5s.
/// </summary>
/// <param name="timeSpan"></param>
/// <returns></returns>
public ClusterConfig WithActorSpawnVerificationTimeout(TimeSpan timeSpan) =>
this with { ActorSpawnVerificationTimeout = timeSpan };
/// <summary>
/// Timeout for DB Identity Lookup operations. Default is 5s.
/// </summary>
/// <param name="timeSpan"></param>
/// <returns></returns>
public ClusterConfig WithActorActivationTimeout(TimeSpan timeSpan) =>
this with { ActorActivationTimeout = timeSpan };
/// <summary>
/// Throttle maximum events logged from cluster requests in a period of time. Specify period in this property. The
/// default is 2s.
/// </summary>
/// <param name="timeSpan"></param>
/// <returns></returns>
public ClusterConfig WithRequestLogThrottlePeriod(TimeSpan timeSpan) =>
this with { RequestLogThrottlePeriod = timeSpan };
/// <summary>
/// Throttle maximum events logged from cluster requests in a period of time. Specify number of events in this
/// property. The default is 3.
/// </summary>
/// <param name="max"></param>
/// <returns></returns>
public ClusterConfig WithMaxNumberOfEventsInRequestLogThrottlePeriod(int max) =>
this with { MaxNumberOfEventsInRequestLogThrottlePeriod = max };
/// <summary>
/// Adds a <see cref="ClusterKind" /> to this member
/// </summary>
/// <param name="kind">Kind name</param>
/// <param name="prop">Props to spawn an actor of this kind</param>
/// <returns></returns>
public ClusterConfig WithClusterKind(string kind, Props prop) => WithClusterKind(new ClusterKind(kind, prop));
/// <summary>
/// Adds a <see cref="ClusterKind" /> to this member
/// </summary>
/// <param name="kind">Kind name</param>
/// <param name="prop">Props to spawn an actor of this kind</param>
/// <param name="strategyBuilder">Specifies <see cref="IMemberStrategy" /> for this cluster kind</param>
/// <returns></returns>
public ClusterConfig WithClusterKind(string kind, Props prop, Func<Cluster, IMemberStrategy> strategyBuilder) =>
WithClusterKind(new ClusterKind(kind, prop) { StrategyBuilder = strategyBuilder });
/// <summary>
/// Adds <see cref="ClusterKind" /> list to this member
/// </summary>
/// <param name="knownKinds">List of tuples of (kind name, props to spawn an actor of this kind)</param>
/// <returns></returns>
public ClusterConfig WithClusterKinds(params (string kind, Props prop)[] knownKinds) =>
WithClusterKinds(knownKinds.Select(k => new ClusterKind(k.kind, k.prop)).ToArray());
/// <summary>
/// Adds <see cref="ClusterKind" /> list to this member
/// </summary>
/// <param name="knownKinds">
/// List of tuples of (kind name, props to spawn an actor of this kind,
/// <see cref="IMemberStrategy" /> for this kind)
/// </param>
/// <returns></returns>
public ClusterConfig WithClusterKinds(
params (string kind, Props prop, Func<Cluster, IMemberStrategy> strategyBuilder)[] knownKinds) =>
WithClusterKinds(knownKinds
.Select(k => new ClusterKind(k.kind, k.prop) { StrategyBuilder = k.strategyBuilder })
.ToArray());
/// <summary>
/// Adds a <see cref="ClusterKind" /> to this member
/// </summary>
/// <param name="clusterKind"></param>
/// <returns></returns>
public ClusterConfig WithClusterKind(ClusterKind clusterKind) => WithClusterKinds(clusterKind);
/// <summary>
/// Adds <see cref="ClusterKind" /> list to this member
/// </summary>
/// <param name="clusterKinds"></param>
/// <returns></returns>
public ClusterConfig WithClusterKinds(params ClusterKind[] clusterKinds) =>
this with { ClusterKinds = ClusterKinds.AddRange(clusterKinds) };
/// <summary>
/// Sets the default <see cref="IMemberStrategy" /> for this cluster kinds
/// </summary>
/// <param name="builder"></param>
/// <returns></returns>
public ClusterConfig WithMemberStrategyBuilder(Func<Cluster, string, IMemberStrategy> builder) =>
this with { MemberStrategyBuilder = builder };
/// <summary>
/// Sets the delegate that creates the <see cref="IClusterContext" />. The default implementation creates an instance
/// of <see cref="DefaultClusterContext" />
/// </summary>
/// <param name="producer"></param>
/// <returns></returns>
public ClusterConfig WithClusterContextProducer(Func<Cluster, IClusterContext> producer) =>
this with { ClusterContextProducer = producer };
/// <summary>
/// Interval between gossip updates. Default is 300ms.
/// </summary>
/// <param name="interval"></param>
/// <returns></returns>
public ClusterConfig WithGossipInterval(TimeSpan interval) => this with { GossipInterval = interval };
/// <summary>
/// The number of random members the gossip will be sent to during each gossip update. Default is 3.
/// </summary>
/// <param name="fanout"></param>
/// <returns></returns>
public ClusterConfig WithGossipFanOut(int fanout) => this with { GossipFanout = fanout };
/// <summary>
/// Maximum number of member states to be sent to each member receiving gossip. Default is 50.
/// </summary>
/// <param name="maxSend"></param>
/// <returns></returns>
public ClusterConfig WithGossipMaxSend(int maxSend) => this with { GossipMaxSend = maxSend };
/// <summary>
/// The timeout for sending the gossip state to another member. Default is 1500ms.
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
public ClusterConfig WithGossipRequestTimeout(TimeSpan timeout) => this with { GossipRequestTimeout = timeout };
/// <summary>
/// Enables debug logging for the gossip protocol.
/// </summary>
/// <param name="enabled"></param>
/// <returns></returns>
public ClusterConfig WithGossipDebugLogging(bool enabled = true) => this with { GossipDebugLogging = enabled };
/// <summary>
/// TTL for remote PID cache. Default is 15min. Set to <see cref="TimeSpan.Zero" /> to disable.
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
public ClusterConfig WithRemotePidCacheTimeToLive(TimeSpan timeout) =>
this with { RemotePidCacheTimeToLive = timeout };
/// <summary>
/// Gossip heartbeat timeout. If the member does not update its heartbeat within this period, it will be added to the
/// <see cref="BlockList" />.
/// Default is 20s. Set to <see cref="TimeSpan.Zero" /> to disable.
/// </summary>
/// <param name="expiration"></param>
/// <returns></returns>
public ClusterConfig WithHeartbeatExpiration(TimeSpan expiration) => this with { HeartbeatExpiration = expiration };
/// <summary>
/// Disables gossip heartbeat expiration.
/// </summary>
/// <returns></returns>
public ClusterConfig WithHeartbeatExpirationDisabled() => this with { HeartbeatExpiration = TimeSpan.Zero };
/// <summary>
/// Configuration for the PubSub extension.
/// </summary>
public ClusterConfig WithPubSubConfig(PubSubConfig config) => this with { PubSubConfig = config };
/// <summary>
/// Backwards compatibility. Set to true to have timed out requests return default(TResponse) instead of throwing
/// <see cref="TimeoutException" />
/// Default is false.
/// </summary>
public ClusterConfig WithLegacyRequestTimeoutBehavior(bool enabled = true) =>
this with { LegacyRequestTimeoutBehavior = enabled };
/// <summary>
/// Exit the application process when the cluster is shutdown.
/// </summary>
/// <param name="enabled"></param>
/// <returns></returns>
public ClusterConfig WithExitOnShutdown(bool enabled = true) =>
this with { ExitOnShutdown = enabled };
/// <summary>
/// Creates a new <see cref="ClusterConfig" />
/// </summary>
/// <param name="clusterName">
/// A name of the cluster. Various clustering providers will use this name
/// to distinguish between different clusters. The value should be the same for all members of the cluster.
/// </param>
/// <param name="clusterProvider"><see cref="IClusterProvider" /> to use for the cluster</param>
/// <param name="identityLookup"><see cref="IdentityLookup" /> implementation to use for the cluster</param>
/// <returns></returns>
public static ClusterConfig Setup(
string clusterName,
IClusterProvider clusterProvider,
IIdentityLookup identityLookup
) =>
new(clusterName, clusterProvider, identityLookup);
/// <summary>
/// The code to run when a member is expired from the cluster.
/// </summary>
[JsonIgnore]
public Func<Cluster, Task> HeartbeatExpirationHandler { get; init; } = GossipDefaults.BlockExpiredMembers;
/// <summary>
/// Configures the code to run when a member is expired from the cluster.
/// </summary>
/// <param name="heartbeatExpirationHandler"></param>
/// <returns></returns>
public ClusterConfig WithHeartbeatExpirationHandler(Func<Cluster, Task> heartbeatExpirationHandler) =>
this with { HeartbeatExpirationHandler = heartbeatExpirationHandler };
}