Skip to content

Commit 6dc83a8

Browse files
committed
Implement slow-start budget for orphaned shard claims and enhance related logging
1 parent 9b9f920 commit 6dc83a8

File tree

9 files changed

+339
-23
lines changed

9 files changed

+339
-23
lines changed

src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,13 @@ public AzureStorageJobShardManager(
6666
{
6767
}
6868

69-
public override async Task<List<Orleans.DurableJobs.IJobShard>> AssignJobShardsAsync(DateTimeOffset maxShardStartTime, CancellationToken cancellationToken)
69+
public override async Task<List<Orleans.DurableJobs.IJobShard>> AssignJobShardsAsync(DateTimeOffset maxShardStartTime, int maxNewClaims, CancellationToken cancellationToken)
7070
{
7171
await InitializeIfNeeded(cancellationToken);
7272
LogAssigningShards(_logger, SiloAddress, maxShardStartTime, _containerName);
7373

7474
var result = new List<Orleans.DurableJobs.IJobShard>();
75+
var newClaimCount = 0;
7576
await foreach (var blob in _client.GetBlobsAsync(traits: BlobTraits.Metadata, states: BlobStates.None, cancellationToken: cancellationToken, prefix: _blobPrefix))
7677
{
7778
// Get the owner and creator of the shard
@@ -125,6 +126,12 @@ public AzureStorageJobShardManager(
125126
// Determine if this is an adopted shard (taken from dead owner) vs orphaned (gracefully released)
126127
var isAdopted = owner is not null && ownerStatus == SiloStatus.Dead;
127128

129+
// Respect the slow-start budget: skip claiming if we've exhausted the budget
130+
if (newClaimCount >= maxNewClaims)
131+
{
132+
continue;
133+
}
134+
128135
// Try to claim orphaned or adopted shard
129136
LogClaimingShard(_logger, blob.Name, SiloAddress, owner);
130137
var blobClient = _client.GetAppendBlobClient(blob.Name);
@@ -142,6 +149,7 @@ public AzureStorageJobShardManager(
142149
_jobShardCache[blob.Name] = orphanedShard;
143150
LogShardAssigned(_logger, blob.Name, SiloAddress);
144151
result.Add(orphanedShard);
152+
newClaimCount++;
145153
}
146154

147155
LogAssignmentCompleted(_logger, result.Count, SiloAddress);

src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,51 @@ public sealed class DurableJobsOptions
8888
/// </remarks>
8989
public int MaxAdoptedCount { get; set; } = 3;
9090

91+
/// <summary>
92+
/// Gets or sets the maximum number of orphaned shards a silo may claim immediately
93+
/// after startup. The cumulative budget grows linearly from this value to
94+
/// <see cref="SlowStartMaxBudget"/> over <see cref="SlowStartRampUpDuration"/>,
95+
/// after which the limit is removed entirely.
96+
/// This prevents a freshly started silo from overwhelming itself by claiming all orphaned shards
97+
/// at once during disaster-recovery scenarios.
98+
/// Default: 2.
99+
/// </summary>
100+
/// <example>
101+
/// <code>
102+
/// options.SlowStartInitialBudget = 1;
103+
/// </code>
104+
/// </example>
105+
public int SlowStartInitialBudget { get; set; } = 2;
106+
107+
/// <summary>
108+
/// Gets or sets the total number of orphaned shards the silo is allowed to have claimed
109+
/// by the end of the slow-start ramp-up period. The cumulative budget is linearly
110+
/// interpolated between <see cref="SlowStartInitialBudget"/> at startup and this value
111+
/// at <see cref="SlowStartRampUpDuration"/>.
112+
/// Default: 20.
113+
/// </summary>
114+
/// <example>
115+
/// <code>
116+
/// options.SlowStartMaxBudget = 50;
117+
/// </code>
118+
/// </example>
119+
public int SlowStartMaxBudget { get; set; } = 20;
120+
121+
/// <summary>
122+
/// Gets or sets the duration of the slow-start ramp-up period after silo activation.
123+
/// While the silo has been running for less than this duration, the number of orphaned shards
124+
/// it may claim is limited by a linearly increasing budget. Once this period elapses the
125+
/// silo claims all available orphaned shards without limit.
126+
/// Set to <see cref="TimeSpan.Zero"/> to disable slow-start entirely.
127+
/// Default: 5 minutes.
128+
/// </summary>
129+
/// <example>
130+
/// <code>
131+
/// options.SlowStartRampUpDuration = TimeSpan.FromMinutes(10);
132+
/// </code>
133+
/// </example>
134+
public TimeSpan SlowStartRampUpDuration { get; set; } = TimeSpan.FromMinutes(5);
135+
91136
private static DateTimeOffset? DefaultShouldRetry(IJobRunContext jobContext, Exception ex)
92137
{
93138
// Default retry logic: retry up to 5 times with exponential backoff
@@ -141,6 +186,18 @@ public void ValidateConfiguration()
141186
{
142187
throw new OrleansConfigurationException("DurableJobsOptions.MaxAdoptedCount must be greater than or equal to zero.");
143188
}
189+
if (options.SlowStartInitialBudget < 0)
190+
{
191+
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartInitialBudget must be non-negative.");
192+
}
193+
if (options.SlowStartMaxBudget < options.SlowStartInitialBudget)
194+
{
195+
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartMaxBudget must be greater than or equal to SlowStartInitialBudget.");
196+
}
197+
if (options.SlowStartRampUpDuration < TimeSpan.Zero)
198+
{
199+
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartRampUpDuration must be non-negative.");
200+
}
144201
_logger.LogInformation("DurableJobsOptions validated: ShardDuration={ShardDuration}", options.ShardDuration);
145202
}
146203
}

src/Orleans.DurableJobs/JobShardManager.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@ protected JobShardManager(SiloAddress siloAddress)
3131
/// Assigns orphaned job shards to this silo.
3232
/// </summary>
3333
/// <param name="maxDueTime">Maximum due time for shards to consider.</param>
34+
/// <param name="maxNewClaims">
35+
/// The maximum number of orphaned shards to claim in this call.
36+
/// Use <see cref="int.MaxValue"/> for unlimited.
37+
/// Shards already owned by this silo are always returned regardless of this limit.
38+
/// </param>
3439
/// <param name="cancellationToken">Cancellation token.</param>
3540
/// <returns>A list of job shards assigned to this silo.</returns>
36-
public abstract Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken);
41+
public abstract Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, int maxNewClaims, CancellationToken cancellationToken);
3742

3843
/// <summary>
3944
/// Creates a new job shard owned by this silo.
@@ -112,7 +117,7 @@ internal static async Task ClearAllShardsAsync()
112117
}
113118
}
114119

115-
public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken)
120+
public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, int maxNewClaims, CancellationToken cancellationToken)
116121
{
117122
var alreadyOwnedShards = new List<IJobShard>();
118123
var adoptedShards = new List<IJobShard>();
@@ -172,6 +177,12 @@ public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset
172177
}
173178
}
174179

180+
// Respect the slow-start budget: skip claiming if we've exhausted the budget
181+
if (adoptedShards.Count >= maxNewClaims)
182+
{
183+
continue;
184+
}
185+
175186
ownership.OwnerSiloAddress = SiloAddress.ToString();
176187
adoptedShards.Add(ownership.Shard);
177188
}

src/Orleans.DurableJobs/LocalDurableJobManager.Log.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,22 @@ internal partial class LocalDurableJobManager
131131
Message = "Creating new shard for key {ShardKey}"
132132
)]
133133
private static partial void LogCreatingNewShard(ILogger logger, DateTimeOffset shardKey);
134+
135+
[LoggerMessage(
136+
Level = LogLevel.Information,
137+
Message = "Claimed {NewClaims} new orphaned shard(s) this cycle (total claimed: {TotalClaimed})"
138+
)]
139+
private static partial void LogOrphanedShardsClaimed(ILogger logger, int newClaims, int totalClaimed);
140+
141+
[LoggerMessage(
142+
Level = LogLevel.Debug,
143+
Message = "Shard claim budget: {Budget} (totalBudget={TotalBudget}, alreadyClaimed={AlreadyClaimed}, elapsed={Elapsed}, rampUpDuration={RampUpDuration})"
144+
)]
145+
private static partial void LogShardClaimBudget(ILogger logger, int budget, int totalBudget, int alreadyClaimed, TimeSpan elapsed, TimeSpan rampUpDuration);
146+
147+
[LoggerMessage(
148+
Level = LogLevel.Warning,
149+
Message = "Silo is overloaded, pausing all new shard claims"
150+
)]
151+
private static partial void LogOverloadPausingShardClaims(ILogger logger);
134152
}

src/Orleans.DurableJobs/LocalDurableJobManager.cs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Concurrent;
33
using System.Collections.Generic;
4+
using System.Diagnostics;
45
using System.Linq;
56
using System.Runtime.CompilerServices;
67
using System.Threading;
@@ -11,6 +12,7 @@
1112
using Orleans.Internal;
1213
using Orleans.Runtime;
1314
using Orleans.Runtime.Internal;
15+
using Orleans.Runtime.Messaging;
1416

1517
namespace Orleans.DurableJobs;
1618

@@ -20,6 +22,7 @@ internal partial class LocalDurableJobManager : SystemTarget, ILocalDurableJobMa
2022
private readonly JobShardManager _shardManager;
2123
private readonly ShardExecutor _shardExecutor;
2224
private readonly IAsyncEnumerable<ClusterMembershipSnapshot> _clusterMembershipUpdates;
25+
private readonly IOverloadDetector _overloadDetector;
2326
private readonly ILogger<LocalDurableJobManager> _logger;
2427
private readonly DurableJobsOptions _options;
2528
private readonly CancellationTokenSource _cts = new();
@@ -33,12 +36,17 @@ internal partial class LocalDurableJobManager : SystemTarget, ILocalDurableJobMa
3336
private readonly SemaphoreSlim _shardCreationLock = new(1, 1);
3437
private readonly SemaphoreSlim _shardCheckSignal = new(0);
3538

39+
// Slow-start state
40+
private long _startTimestamp;
41+
private int _totalClaimedOrphanedShards;
42+
3643
private static readonly IDictionary<string, string> EmptyMetadata = new Dictionary<string, string>();
3744

3845
public LocalDurableJobManager(
3946
JobShardManager shardManager,
4047
ShardExecutor shardExecutor,
4148
IClusterMembershipService clusterMembership,
49+
IOverloadDetector overloadDetector,
4250
IOptions<DurableJobsOptions> options,
4351
SystemTargetShared shared,
4452
ILogger<LocalDurableJobManager> logger)
@@ -47,6 +55,7 @@ public LocalDurableJobManager(
4755
_shardManager = shardManager;
4856
_shardExecutor = shardExecutor;
4957
_clusterMembershipUpdates = clusterMembership.MembershipUpdates;
58+
_overloadDetector = overloadDetector;
5059
_logger = logger;
5160
_options = options.Value;
5261
}
@@ -115,6 +124,8 @@ private Task Start(CancellationToken ct)
115124
{
116125
LogStarting(_logger);
117126

127+
_startTimestamp = Stopwatch.GetTimestamp();
128+
118129
using (var _ = new ExecutionContextSuppressor())
119130
{
120131
_listenForClusterChangesTask = Task.Factory.StartNew(
@@ -247,14 +258,23 @@ private async Task PeriodicShardCheck()
247258
}
248259
}
249260

261+
// Compute the slow-start budget for this cycle
262+
var budget = ComputeClaimBudget();
263+
250264
// Query ShardManager for assigned shards (source of truth)
251-
var shards = await _shardManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), _cts.Token);
265+
var shards = await _shardManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), budget, _cts.Token);
266+
267+
// Count newly claimed shards (those not already in our cache)
268+
var newClaimsThisCycle = 0;
252269
if (shards.Count > 0)
253270
{
254271
LogAssignedShards(_logger, shards.Count);
255272
foreach (var shard in shards)
256273
{
257-
_shardCache.TryAdd(shard.Id, shard);
274+
if (_shardCache.TryAdd(shard.Id, shard))
275+
{
276+
newClaimsThisCycle++;
277+
}
258278

259279
if (!_runningShards.ContainsKey(shard.Id))
260280
{
@@ -266,6 +286,12 @@ private async Task PeriodicShardCheck()
266286
{
267287
LogNoShardsToAssign(_logger);
268288
}
289+
290+
if (newClaimsThisCycle > 0)
291+
{
292+
_totalClaimedOrphanedShards += newClaimsThisCycle;
293+
LogOrphanedShardsClaimed(_logger, newClaimsThisCycle, _totalClaimedOrphanedShards);
294+
}
269295
}
270296
catch (OperationCanceledException)
271297
{
@@ -279,6 +305,45 @@ private async Task PeriodicShardCheck()
279305
}
280306
}
281307

308+
/// <summary>
309+
/// Computes the maximum number of orphaned shards this silo may claim in the current check cycle.
310+
/// Returns <see cref="int.MaxValue"/> when unlimited (ramp-up complete or disabled).
311+
/// Returns <c>0</c> when the silo is overloaded, pausing all new claims.
312+
/// </summary>
313+
private int ComputeClaimBudget()
314+
{
315+
// If overloaded, claim nothing new regardless of ramp-up phase
316+
if (_overloadDetector.IsOverloaded)
317+
{
318+
LogOverloadPausingShardClaims(_logger);
319+
return 0;
320+
}
321+
322+
// Slow-start disabled
323+
if (_options.SlowStartRampUpDuration <= TimeSpan.Zero)
324+
{
325+
return int.MaxValue;
326+
}
327+
328+
var elapsed = Stopwatch.GetElapsedTime(_startTimestamp);
329+
330+
// After ramp-up period, no limit
331+
if (elapsed >= _options.SlowStartRampUpDuration)
332+
{
333+
return int.MaxValue;
334+
}
335+
336+
// Linear interpolation: InitialBudget at t=0, MaxBudget at t=RampUpDuration
337+
var progress = elapsed.TotalMilliseconds / _options.SlowStartRampUpDuration.TotalMilliseconds;
338+
var totalBudget = _options.SlowStartInitialBudget
339+
+ (int)(progress * (_options.SlowStartMaxBudget - _options.SlowStartInitialBudget));
340+
var remaining = totalBudget - _totalClaimedOrphanedShards;
341+
var budget = Math.Max(0, remaining);
342+
343+
LogShardClaimBudget(_logger, budget, totalBudget, _totalClaimedOrphanedShards, elapsed, _options.SlowStartRampUpDuration);
344+
return budget;
345+
}
346+
282347
private void TryActivateShard(IJobShard shard)
283348
{
284349
// Only start if not already running

test/Extensions/Orleans.Azure.Tests/DurableJobs/AzureStorageJobShardBatchingTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public async Task AzureStorageJobShard_MultipleOperationsBatched()
119119
SetSiloStatus(newSiloAddress, SiloStatus.Active);
120120

121121
var newManager = CreateManager(newSiloAddress);
122-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
122+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
123123
Assert.Single(shards);
124124

125125
var consumedJobs = new List<string>();
@@ -168,7 +168,7 @@ public async Task AzureStorageJobShard_PartialBatchFlushesOnTimeout()
168168
SetSiloStatus(newSiloAddress, SiloStatus.Active);
169169

170170
var newManager = CreateManager(newSiloAddress);
171-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
171+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
172172
Assert.Single(shards);
173173

174174
var consumedJobs = new List<string>();
@@ -222,7 +222,7 @@ public async Task AzureStorageJobShard_MaxBatchSizeEnforced()
222222
SetSiloStatus(newSiloAddress, SiloStatus.Active);
223223

224224
var newManager = CreateManager(newSiloAddress);
225-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
225+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
226226
Assert.Single(shards);
227227

228228
var consumedJobs = new List<string>();
@@ -290,7 +290,7 @@ public async Task AzureStorageJobShard_MetadataOperationsBreakBatches()
290290
StorageOptions.Value.BatchFlushInterval = TimeSpan.FromMilliseconds(100);
291291

292292
var newManager = CreateManager(newSiloAddress);
293-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
293+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
294294
Assert.Single(shards);
295295

296296
var consumedJobs = new List<string>();

test/Extensions/Orleans.Azure.Tests/DurableJobs/AzureStorageJobShardManagerTests.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,37 @@ public async Task AzureStorageJobShardManager_UnregisterShard_WithJobsRemaining(
179179
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
180180
await _runner.UnregisterShard_WithJobsRemaining(cts.Token);
181181
}
182+
183+
/// <summary>
184+
/// Tests that maxNewClaims limits the number of orphaned shards claimed.
185+
/// This test is delegated to the runner for reuse across providers.
186+
/// </summary>
187+
[SkippableFact, TestCategory("Azure"), TestCategory("Functional")]
188+
public async Task AzureStorageJobShardManager_SlowStart_LimitsOrphanedShardClaims()
189+
{
190+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
191+
await _runner.SlowStart_LimitsOrphanedShardClaims(cts.Token);
192+
}
193+
194+
/// <summary>
195+
/// Tests that maxNewClaims = 0 prevents claiming orphaned shards but returns owned shards.
196+
/// This test is delegated to the runner for reuse across providers.
197+
/// </summary>
198+
[SkippableFact, TestCategory("Azure"), TestCategory("Functional")]
199+
public async Task AzureStorageJobShardManager_SlowStart_ZeroBudgetClaimsNothing()
200+
{
201+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
202+
await _runner.SlowStart_ZeroBudgetClaimsNothing(cts.Token);
203+
}
204+
205+
/// <summary>
206+
/// Tests that maxNewClaims = int.MaxValue (unlimited) claims all orphaned shards.
207+
/// This test is delegated to the runner for reuse across providers.
208+
/// </summary>
209+
[SkippableFact, TestCategory("Azure"), TestCategory("Functional")]
210+
public async Task AzureStorageJobShardManager_SlowStart_UnlimitedBudgetClaimsAll()
211+
{
212+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
213+
await _runner.SlowStart_UnlimitedBudgetClaimsAll(cts.Token);
214+
}
182215
}

0 commit comments

Comments
 (0)