Skip to content

Commit fc150cf

Browse files
committed
Implement slow-start budget for orphaned shard claims and enhance related logging
1 parent 9b9f920 commit fc150cf

File tree

9 files changed

+341
-23
lines changed

9 files changed

+341
-23
lines changed

src/Azure/Orleans.DurableJobs.AzureStorage/AzureStorageJobShardManager.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,13 @@ public AzureStorageJobShardManager(
6666
{
6767
}
6868

69-
public override async Task<List<Orleans.DurableJobs.IJobShard>> AssignJobShardsAsync(DateTimeOffset maxShardStartTime, CancellationToken cancellationToken)
69+
public override async Task<List<Orleans.DurableJobs.IJobShard>> AssignJobShardsAsync(DateTimeOffset maxShardStartTime, int maxNewClaims, CancellationToken cancellationToken)
7070
{
7171
await InitializeIfNeeded(cancellationToken);
7272
LogAssigningShards(_logger, SiloAddress, maxShardStartTime, _containerName);
7373

7474
var result = new List<Orleans.DurableJobs.IJobShard>();
75+
var newClaimCount = 0;
7576
await foreach (var blob in _client.GetBlobsAsync(traits: BlobTraits.Metadata, states: BlobStates.None, cancellationToken: cancellationToken, prefix: _blobPrefix))
7677
{
7778
// Get the owner and creator of the shard
@@ -125,6 +126,12 @@ public AzureStorageJobShardManager(
125126
// Determine if this is an adopted shard (taken from dead owner) vs orphaned (gracefully released)
126127
var isAdopted = owner is not null && ownerStatus == SiloStatus.Dead;
127128

129+
// Respect the slow-start budget: skip claiming if we've exhausted the budget
130+
if (newClaimCount >= maxNewClaims)
131+
{
132+
continue;
133+
}
134+
128135
// Try to claim orphaned or adopted shard
129136
LogClaimingShard(_logger, blob.Name, SiloAddress, owner);
130137
var blobClient = _client.GetAppendBlobClient(blob.Name);
@@ -142,6 +149,7 @@ public AzureStorageJobShardManager(
142149
_jobShardCache[blob.Name] = orphanedShard;
143150
LogShardAssigned(_logger, blob.Name, SiloAddress);
144151
result.Add(orphanedShard);
152+
newClaimCount++;
145153
}
146154

147155
LogAssignmentCompleted(_logger, result.Count, SiloAddress);

src/Orleans.DurableJobs/Hosting/DurableJobsOptions.cs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,51 @@ public sealed class DurableJobsOptions
8888
/// </remarks>
8989
public int MaxAdoptedCount { get; set; } = 3;
9090

91+
/// <summary>
92+
/// Gets or sets the maximum number of orphaned shards a silo may claim immediately
93+
/// after startup. The cumulative budget grows linearly from this value to
94+
/// <see cref="SlowStartMaxBudget"/> over <see cref="SlowStartRampUpDuration"/>,
95+
/// after which the limit is removed entirely.
96+
/// This prevents a freshly started silo from overwhelming itself by claiming all orphaned shards
97+
/// at once during disaster-recovery scenarios.
98+
/// Default: 2.
99+
/// </summary>
100+
/// <example>
101+
/// <code>
102+
/// options.SlowStartInitialBudget = 1;
103+
/// </code>
104+
/// </example>
105+
public int SlowStartInitialBudget { get; set; } = 2;
106+
107+
/// <summary>
108+
/// Gets or sets the total number of orphaned shards the silo is allowed to have claimed
109+
/// by the end of the slow-start ramp-up period. The cumulative budget is linearly
110+
/// interpolated between <see cref="SlowStartInitialBudget"/> at startup and this value
111+
/// at <see cref="SlowStartRampUpDuration"/>.
112+
/// Default: 20.
113+
/// </summary>
114+
/// <example>
115+
/// <code>
116+
/// options.SlowStartMaxBudget = 50;
117+
/// </code>
118+
/// </example>
119+
public int SlowStartMaxBudget { get; set; } = 20;
120+
121+
/// <summary>
122+
/// Gets or sets the duration of the slow-start ramp-up period after silo activation.
123+
/// While the silo has been running for less than this duration, the number of orphaned shards
124+
/// it may claim is limited by a linearly increasing budget. Once this period elapses the
125+
/// silo claims all available orphaned shards without limit.
126+
/// Set to <see cref="TimeSpan.Zero"/> to disable slow-start entirely.
127+
/// Default: 5 minutes.
128+
/// </summary>
129+
/// <example>
130+
/// <code>
131+
/// options.SlowStartRampUpDuration = TimeSpan.FromMinutes(10);
132+
/// </code>
133+
/// </example>
134+
public TimeSpan SlowStartRampUpDuration { get; set; } = TimeSpan.FromMinutes(5);
135+
91136
private static DateTimeOffset? DefaultShouldRetry(IJobRunContext jobContext, Exception ex)
92137
{
93138
// Default retry logic: retry up to 5 times with exponential backoff
@@ -141,6 +186,18 @@ public void ValidateConfiguration()
141186
{
142187
throw new OrleansConfigurationException("DurableJobsOptions.MaxAdoptedCount must be greater than or equal to zero.");
143188
}
189+
if (options.SlowStartInitialBudget < 0)
190+
{
191+
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartInitialBudget must be non-negative.");
192+
}
193+
if (options.SlowStartMaxBudget < options.SlowStartInitialBudget)
194+
{
195+
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartMaxBudget must be greater than or equal to SlowStartInitialBudget.");
196+
}
197+
if (options.SlowStartRampUpDuration < TimeSpan.Zero)
198+
{
199+
throw new OrleansConfigurationException("DurableJobsOptions.SlowStartRampUpDuration must be non-negative.");
200+
}
144201
_logger.LogInformation("DurableJobsOptions validated: ShardDuration={ShardDuration}", options.ShardDuration);
145202
}
146203
}

src/Orleans.DurableJobs/JobShardManager.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@ protected JobShardManager(SiloAddress siloAddress)
3131
/// Assigns orphaned job shards to this silo.
3232
/// </summary>
3333
/// <param name="maxDueTime">Maximum due time for shards to consider.</param>
34+
/// <param name="maxNewClaims">
35+
/// The maximum number of orphaned shards to claim in this call.
36+
/// Use <see cref="int.MaxValue"/> for unlimited.
37+
/// Shards already owned by this silo are always returned regardless of this limit.
38+
/// </param>
3439
/// <param name="cancellationToken">Cancellation token.</param>
3540
/// <returns>A list of job shards assigned to this silo.</returns>
36-
public abstract Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken);
41+
public abstract Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, int maxNewClaims, CancellationToken cancellationToken);
3742

3843
/// <summary>
3944
/// Creates a new job shard owned by this silo.
@@ -112,7 +117,7 @@ internal static async Task ClearAllShardsAsync()
112117
}
113118
}
114119

115-
public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, CancellationToken cancellationToken)
120+
public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset maxDueTime, int maxNewClaims, CancellationToken cancellationToken)
116121
{
117122
var alreadyOwnedShards = new List<IJobShard>();
118123
var adoptedShards = new List<IJobShard>();
@@ -172,6 +177,12 @@ public override async Task<List<IJobShard>> AssignJobShardsAsync(DateTimeOffset
172177
}
173178
}
174179

180+
// Respect the slow-start budget: skip claiming if we've exhausted the budget
181+
if (adoptedShards.Count >= maxNewClaims)
182+
{
183+
continue;
184+
}
185+
175186
ownership.OwnerSiloAddress = SiloAddress.ToString();
176187
adoptedShards.Add(ownership.Shard);
177188
}

src/Orleans.DurableJobs/LocalDurableJobManager.Log.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,4 +131,22 @@ internal partial class LocalDurableJobManager
131131
Message = "Creating new shard for key {ShardKey}"
132132
)]
133133
private static partial void LogCreatingNewShard(ILogger logger, DateTimeOffset shardKey);
134+
135+
[LoggerMessage(
136+
Level = LogLevel.Information,
137+
Message = "Claimed {NewClaims} new orphaned shard(s) this cycle (total claimed: {TotalClaimed})"
138+
)]
139+
private static partial void LogOrphanedShardsClaimed(ILogger logger, int newClaims, int totalClaimed);
140+
141+
[LoggerMessage(
142+
Level = LogLevel.Debug,
143+
Message = "Shard claim budget: {Budget} (totalBudget={TotalBudget}, alreadyClaimed={AlreadyClaimed}, elapsed={Elapsed}, rampUpDuration={RampUpDuration})"
144+
)]
145+
private static partial void LogShardClaimBudget(ILogger logger, int budget, int totalBudget, int alreadyClaimed, TimeSpan elapsed, TimeSpan rampUpDuration);
146+
147+
[LoggerMessage(
148+
Level = LogLevel.Warning,
149+
Message = "Silo is overloaded, pausing all new shard claims"
150+
)]
151+
private static partial void LogOverloadPausingShardClaims(ILogger logger);
134152
}

src/Orleans.DurableJobs/LocalDurableJobManager.cs

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using Orleans.Internal;
1212
using Orleans.Runtime;
1313
using Orleans.Runtime.Internal;
14+
using Orleans.Runtime.Messaging;
1415

1516
namespace Orleans.DurableJobs;
1617

@@ -20,6 +21,8 @@ internal partial class LocalDurableJobManager : SystemTarget, ILocalDurableJobMa
2021
private readonly JobShardManager _shardManager;
2122
private readonly ShardExecutor _shardExecutor;
2223
private readonly IAsyncEnumerable<ClusterMembershipSnapshot> _clusterMembershipUpdates;
24+
private readonly IOverloadDetector _overloadDetector;
25+
private readonly TimeProvider _timeProvider;
2326
private readonly ILogger<LocalDurableJobManager> _logger;
2427
private readonly DurableJobsOptions _options;
2528
private readonly CancellationTokenSource _cts = new();
@@ -33,12 +36,18 @@ internal partial class LocalDurableJobManager : SystemTarget, ILocalDurableJobMa
3336
private readonly SemaphoreSlim _shardCreationLock = new(1, 1);
3437
private readonly SemaphoreSlim _shardCheckSignal = new(0);
3538

39+
// Slow-start state
40+
private long _startTimestamp;
41+
private int _totalClaimedOrphanedShards;
42+
3643
private static readonly IDictionary<string, string> EmptyMetadata = new Dictionary<string, string>();
3744

3845
public LocalDurableJobManager(
3946
JobShardManager shardManager,
4047
ShardExecutor shardExecutor,
4148
IClusterMembershipService clusterMembership,
49+
IOverloadDetector overloadDetector,
50+
TimeProvider timeProvider,
4251
IOptions<DurableJobsOptions> options,
4352
SystemTargetShared shared,
4453
ILogger<LocalDurableJobManager> logger)
@@ -47,6 +56,8 @@ public LocalDurableJobManager(
4756
_shardManager = shardManager;
4857
_shardExecutor = shardExecutor;
4958
_clusterMembershipUpdates = clusterMembership.MembershipUpdates;
59+
_overloadDetector = overloadDetector;
60+
_timeProvider = timeProvider;
5061
_logger = logger;
5162
_options = options.Value;
5263
}
@@ -115,6 +126,8 @@ private Task Start(CancellationToken ct)
115126
{
116127
LogStarting(_logger);
117128

129+
_startTimestamp = _timeProvider.GetTimestamp();
130+
118131
using (var _ = new ExecutionContextSuppressor())
119132
{
120133
_listenForClusterChangesTask = Task.Factory.StartNew(
@@ -247,14 +260,23 @@ private async Task PeriodicShardCheck()
247260
}
248261
}
249262

263+
// Compute the slow-start budget for this cycle
264+
var budget = ComputeClaimBudget();
265+
250266
// Query ShardManager for assigned shards (source of truth)
251-
var shards = await _shardManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), _cts.Token);
267+
var shards = await _shardManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), budget, _cts.Token);
268+
269+
// Count newly claimed shards (those not already in our cache)
270+
var newClaimsThisCycle = 0;
252271
if (shards.Count > 0)
253272
{
254273
LogAssignedShards(_logger, shards.Count);
255274
foreach (var shard in shards)
256275
{
257-
_shardCache.TryAdd(shard.Id, shard);
276+
if (_shardCache.TryAdd(shard.Id, shard))
277+
{
278+
newClaimsThisCycle++;
279+
}
258280

259281
if (!_runningShards.ContainsKey(shard.Id))
260282
{
@@ -266,6 +288,12 @@ private async Task PeriodicShardCheck()
266288
{
267289
LogNoShardsToAssign(_logger);
268290
}
291+
292+
if (newClaimsThisCycle > 0)
293+
{
294+
_totalClaimedOrphanedShards += newClaimsThisCycle;
295+
LogOrphanedShardsClaimed(_logger, newClaimsThisCycle, _totalClaimedOrphanedShards);
296+
}
269297
}
270298
catch (OperationCanceledException)
271299
{
@@ -279,6 +307,45 @@ private async Task PeriodicShardCheck()
279307
}
280308
}
281309

310+
/// <summary>
311+
/// Computes the maximum number of orphaned shards this silo may claim in the current check cycle.
312+
/// Returns <see cref="int.MaxValue"/> when unlimited (ramp-up complete or disabled).
313+
/// Returns <c>0</c> when the silo is overloaded, pausing all new claims.
314+
/// </summary>
315+
private int ComputeClaimBudget()
316+
{
317+
// If overloaded, claim nothing new regardless of ramp-up phase
318+
if (_overloadDetector.IsOverloaded)
319+
{
320+
LogOverloadPausingShardClaims(_logger);
321+
return 0;
322+
}
323+
324+
// Slow-start disabled
325+
if (_options.SlowStartRampUpDuration <= TimeSpan.Zero)
326+
{
327+
return int.MaxValue;
328+
}
329+
330+
var elapsed = _timeProvider.GetElapsedTime(_startTimestamp);
331+
332+
// After ramp-up period, no limit
333+
if (elapsed >= _options.SlowStartRampUpDuration)
334+
{
335+
return int.MaxValue;
336+
}
337+
338+
// Linear interpolation: InitialBudget at t=0, MaxBudget at t=RampUpDuration
339+
var progress = elapsed.TotalMilliseconds / _options.SlowStartRampUpDuration.TotalMilliseconds;
340+
var totalBudget = _options.SlowStartInitialBudget
341+
+ (int)(progress * (_options.SlowStartMaxBudget - _options.SlowStartInitialBudget));
342+
var remaining = totalBudget - _totalClaimedOrphanedShards;
343+
var budget = Math.Max(0, remaining);
344+
345+
LogShardClaimBudget(_logger, budget, totalBudget, _totalClaimedOrphanedShards, elapsed, _options.SlowStartRampUpDuration);
346+
return budget;
347+
}
348+
282349
private void TryActivateShard(IJobShard shard)
283350
{
284351
// Only start if not already running

test/Extensions/Orleans.Azure.Tests/DurableJobs/AzureStorageJobShardBatchingTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public async Task AzureStorageJobShard_MultipleOperationsBatched()
119119
SetSiloStatus(newSiloAddress, SiloStatus.Active);
120120

121121
var newManager = CreateManager(newSiloAddress);
122-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
122+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
123123
Assert.Single(shards);
124124

125125
var consumedJobs = new List<string>();
@@ -168,7 +168,7 @@ public async Task AzureStorageJobShard_PartialBatchFlushesOnTimeout()
168168
SetSiloStatus(newSiloAddress, SiloStatus.Active);
169169

170170
var newManager = CreateManager(newSiloAddress);
171-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
171+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
172172
Assert.Single(shards);
173173

174174
var consumedJobs = new List<string>();
@@ -222,7 +222,7 @@ public async Task AzureStorageJobShard_MaxBatchSizeEnforced()
222222
SetSiloStatus(newSiloAddress, SiloStatus.Active);
223223

224224
var newManager = CreateManager(newSiloAddress);
225-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
225+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
226226
Assert.Single(shards);
227227

228228
var consumedJobs = new List<string>();
@@ -290,7 +290,7 @@ public async Task AzureStorageJobShard_MetadataOperationsBreakBatches()
290290
StorageOptions.Value.BatchFlushInterval = TimeSpan.FromMilliseconds(100);
291291

292292
var newManager = CreateManager(newSiloAddress);
293-
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), cancellationToken);
293+
var shards = await newManager.AssignJobShardsAsync(DateTime.UtcNow.AddHours(1), maxNewClaims: int.MaxValue, cancellationToken);
294294
Assert.Single(shards);
295295

296296
var consumedJobs = new List<string>();

test/Extensions/Orleans.Azure.Tests/DurableJobs/AzureStorageJobShardManagerTests.cs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,4 +179,37 @@ public async Task AzureStorageJobShardManager_UnregisterShard_WithJobsRemaining(
179179
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
180180
await _runner.UnregisterShard_WithJobsRemaining(cts.Token);
181181
}
182+
183+
/// <summary>
184+
/// Tests that maxNewClaims limits the number of orphaned shards claimed.
185+
/// This test is delegated to the runner for reuse across providers.
186+
/// </summary>
187+
[SkippableFact, TestCategory("Azure"), TestCategory("Functional")]
188+
public async Task AzureStorageJobShardManager_SlowStart_LimitsOrphanedShardClaims()
189+
{
190+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
191+
await _runner.SlowStart_LimitsOrphanedShardClaims(cts.Token);
192+
}
193+
194+
/// <summary>
195+
/// Tests that maxNewClaims = 0 prevents claiming orphaned shards but returns owned shards.
196+
/// This test is delegated to the runner for reuse across providers.
197+
/// </summary>
198+
[SkippableFact, TestCategory("Azure"), TestCategory("Functional")]
199+
public async Task AzureStorageJobShardManager_SlowStart_ZeroBudgetClaimsNothing()
200+
{
201+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
202+
await _runner.SlowStart_ZeroBudgetClaimsNothing(cts.Token);
203+
}
204+
205+
/// <summary>
206+
/// Tests that maxNewClaims = int.MaxValue (unlimited) claims all orphaned shards.
207+
/// This test is delegated to the runner for reuse across providers.
208+
/// </summary>
209+
[SkippableFact, TestCategory("Azure"), TestCategory("Functional")]
210+
public async Task AzureStorageJobShardManager_SlowStart_UnlimitedBudgetClaimsAll()
211+
{
212+
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
213+
await _runner.SlowStart_UnlimitedBudgetClaimsAll(cts.Token);
214+
}
182215
}

0 commit comments

Comments
 (0)