Skip to content

Commit bea0ced

Browse files
committed
Fixed the scheduler to handle inline tasks and to not create deadlock on threads of TickerQ
1 parent a0596fb commit bea0ced

File tree

5 files changed

+83
-55
lines changed

5 files changed

+83
-55
lines changed

src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,16 +68,16 @@ public async IAsyncEnumerable<TimeTickerEntity> QueueTimedOutTimeTickers([Enumer
6868
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
6969
var context = dbContext.Set<TTimeTicker>();
7070
var now = _clock.UtcNow;
71-
var fallbackThreshold = now.AddMilliseconds(-100); // Fallback picks up tasks overdue by > 100ms
71+
var fallbackThreshold = now.AddSeconds(-1); // Fallback picks up tasks older than main 1-second window
7272

7373
var timeTickersToUpdate = await context
7474
.AsNoTracking()
7575
.Where(x => x.ExecutionTime != null)
7676
.Where(x => x.Status == TickerStatus.Idle || x.Status == TickerStatus.Queued)
77-
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks overdue by more than 100ms
77+
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks older than 1 second
7878
.Include(x => x.Children.Where(y => y.ExecutionTime == null))
7979
.Select(MappingExtensions.ForQueueTimeTickers<TTimeTicker>())
80-
.ToArrayAsync(cancellationToken).ConfigureAwait(false);;
80+
.ToArrayAsync(cancellationToken).ConfigureAwait(false);
8181

8282
foreach (var timeTicker in timeTickersToUpdate)
8383
{
@@ -273,7 +273,7 @@ await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
273273
public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueTimedOutCronTickerOccurrences([EnumeratorCancellation] CancellationToken cancellationToken = default)
274274
{
275275
var now = _clock.UtcNow;
276-
var fallbackThreshold = now.AddMilliseconds(-100); // Fallback picks up tasks overdue by > 100ms
276+
var fallbackThreshold = now.AddSeconds(-1); // Fallback picks up tasks older than main 1-second window
277277

278278
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
279279
var context = dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>();
@@ -282,7 +282,7 @@ public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueTime
282282
.AsNoTracking()
283283
.Include(x => x.CronTicker)
284284
.Where(x => x.Status == TickerStatus.Idle || x.Status == TickerStatus.Queued)
285-
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks overdue by more than 100ms
285+
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks older than 1 second
286286
.Select(MappingExtensions.ForQueueCronTickerOccurrence<CronTickerOccurrenceEntity<TCronTicker>, TCronTicker>())
287287
.ToArrayAsync(cancellationToken).ConfigureAwait(false);
288288

@@ -438,13 +438,13 @@ public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueCron
438438
public async Task<CronTickerOccurrenceEntity<TCronTicker>> GetEarliestAvailableCronOccurrence(Guid[] ids, CancellationToken cancellationToken = default)
439439
{
440440
var now = _clock.UtcNow;
441-
var mainSchedulerThreshold = now.AddMilliseconds(-now.Millisecond);
441+
var mainSchedulerThreshold = now.AddSeconds(-1);
442442
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
443443
return await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
444444
.AsNoTracking()
445445
.Include(x => x.CronTicker)
446446
.Where(x => ids.Contains(x.CronTickerId))
447-
.Where(x => x.ExecutionTime >= mainSchedulerThreshold) // Only recent/upcoming tasks (not heavily overdue)
447+
.Where(x => x.ExecutionTime >= mainSchedulerThreshold) // Only items within the 1-second main scheduler window
448448
.WhereCanAcquire(_lockHolder)
449449
.OrderBy(x => x.ExecutionTime)
450450
.Select(MappingExtensions.ForLatestQueuedCronTickerOccurrence<CronTickerOccurrenceEntity<TCronTicker>, TCronTicker>())
@@ -475,4 +475,4 @@ await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
475475
}
476476

477477
#endregion
478-
}
478+
}

src/TickerQ/Exceptions/TerminateExecutionException.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ internal class TerminateExecutionException : Exception
77
{
88
internal readonly TickerStatus Status = TickerStatus.Skipped;
99
public TerminateExecutionException(string message) : base(message) { }
10-
public TerminateExecutionException(TickerStatus tickerType, string message) : base(message) {}
11-
public TerminateExecutionException(string message, Exception innerException) : base(message, innerException) {}
10+
public TerminateExecutionException(TickerStatus tickerType, string message) : base(message)
11+
=> Status = tickerType;
12+
public TerminateExecutionException(string message, Exception innerException) : base(message, innerException) { }
1213
public TerminateExecutionException(TickerStatus tickerType, string message, Exception innerException) : base(message, innerException)
1314
=> Status = tickerType;
1415
}

src/TickerQ/Src/BackgroundServices/TickerQSchedulerBackgroundService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ private async Task RunTickerQSchedulerAsync(CancellationToken stoppingToken, Can
106106
await _internalTickerManager.SetTickersInProgress(_executionContext.Functions, cancellationToken);
107107

108108
foreach (var function in _executionContext.Functions.OrderBy(x => x.CachedPriority))
109-
await _taskScheduler.QueueAsync(async ct => await _taskHandler.ExecuteTaskAsync(function,false, ct), function.CachedPriority, stoppingToken);
109+
_ = _taskScheduler.QueueAsync(async ct => await _taskHandler.ExecuteTaskAsync(function,false, ct), function.CachedPriority, stoppingToken);
110110
}
111111

112112
var (timeRemaining, functions) =

src/TickerQ/Src/Provider/TickerInMemoryPersistenceProvider.cs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,13 @@ public async IAsyncEnumerable<TimeTickerEntity> QueueTimeTickers(TimeTickerEntit
8181
public async IAsyncEnumerable<TimeTickerEntity> QueueTimedOutTimeTickers([EnumeratorCancellation] CancellationToken cancellationToken = default)
8282
{
8383
var now = _clock.UtcNow;
84-
var fallbackThreshold = now.AddMilliseconds(-100); // Fallback picks up tasks overdue by > 100ms
84+
var fallbackThreshold = now.AddSeconds(-1); // Fallback picks up tasks older than main 1-second window
8585

8686
// First, get the time tickers that need to be updated (matching EF query)
8787
var timeTickersToUpdate = TimeTickers.Values
8888
.Where(x => x.ExecutionTime != null)
8989
.Where(x => x.Status == TickerStatus.Idle || x.Status == TickerStatus.Queued)
90-
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks overdue by more than 100ms
90+
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks older than 1 second
9191
.Select(x => ForQueueTimeTickers(x)) // Map to TimeTickerEntity with children, matching EF's Select
9292
.ToArray();
9393

@@ -149,13 +149,15 @@ public Task ReleaseAcquiredTimeTickers(Guid[] timeTickerIds, CancellationToken c
149149
public Task<TimeTickerEntity[]> GetEarliestTimeTickers(CancellationToken cancellationToken = default)
150150
{
151151
var now = _clock.UtcNow;
152-
var mainSchedulerThreshold = now.AddMilliseconds(-100); // Main scheduler handles tasks up to 100ms overdue
152+
153+
// Define the window: ignore anything older than 1 second ago
154+
var oneSecondAgo = now.AddSeconds(-1);
153155

154156
// Build base query matching EF Core's approach
155157
var baseQuery = TimeTickers.Values
156158
.Where(x => x.ExecutionTime != null)
157159
.Where(CanAcquire)
158-
.Where(x => x.ExecutionTime >= mainSchedulerThreshold); // Only recent/upcoming tasks (not heavily overdue)
160+
.Where(x => x.ExecutionTime >= oneSecondAgo); // Ignore old tickers (fallback handles them)
159161

160162
// Get minimum execution time (matching EF's approach)
161163
var minExecutionTime = baseQuery
@@ -166,12 +168,22 @@ public Task<TimeTickerEntity[]> GetEarliestTimeTickers(CancellationToken cancell
166168
if (minExecutionTime == null)
167169
return Task.FromResult(Array.Empty<TimeTickerEntity>());
168170

169-
// Get tasks within 50ms window of the earliest task for batching efficiency
170-
var batchWindow = minExecutionTime.Value.AddMilliseconds(50);
171+
// Round the minimum execution time down to its second
172+
var minSecond = new DateTime(
173+
minExecutionTime.Value.Year,
174+
minExecutionTime.Value.Month,
175+
minExecutionTime.Value.Day,
176+
minExecutionTime.Value.Hour,
177+
minExecutionTime.Value.Minute,
178+
minExecutionTime.Value.Second,
179+
DateTimeKind.Utc);
180+
181+
// Fetch all tickers within that complete second (this ensures we get all tickers in the same second)
182+
var maxExecutionTime = minSecond.AddSeconds(1);
171183

172-
// Final query with mapping (matching EF's approach)
173184
var result = baseQuery
174-
.Where(x => x.ExecutionTime.Value <= batchWindow)
185+
.Where(x => x.ExecutionTime >= minSecond && x.ExecutionTime < maxExecutionTime)
186+
.OrderBy(x => x.ExecutionTime)
175187
.Select(ForQueueTimeTickers) // Use same mapping as EF Core
176188
.ToArray();
177189

@@ -598,7 +610,7 @@ public Task<int> RemoveCronTickers(Guid[] cronTickerIds, CancellationToken cance
598610
public Task<CronTickerOccurrenceEntity<TCronTicker>> GetEarliestAvailableCronOccurrence(Guid[] ids, CancellationToken cancellationToken = default)
599611
{
600612
var now = _clock.UtcNow;
601-
var mainSchedulerThreshold = now.AddMilliseconds(-100); // Main scheduler handles tasks up to 100ms overdue
613+
var mainSchedulerThreshold = now.AddSeconds(-1); // Main scheduler handles items within the 1-second window
602614

603615
var query = CronOccurrences.Values.AsEnumerable();
604616

@@ -673,11 +685,11 @@ public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueCron
673685
public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueTimedOutCronTickerOccurrences([EnumeratorCancellation] CancellationToken cancellationToken = default)
674686
{
675687
var now = _clock.UtcNow;
676-
var fallbackThreshold = now.AddMilliseconds(-100); // Fallback picks up tasks overdue by > 100ms
688+
var fallbackThreshold = now.AddSeconds(-1); // Fallback picks up tasks older than main 1-second window
677689

678690
var occurrencesToUpdate = CronOccurrences.Values
679691
.Where(x => x.Status == TickerStatus.Idle || x.Status == TickerStatus.Queued)
680-
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks overdue by more than 100ms
692+
.Where(x => x.ExecutionTime <= fallbackThreshold) // Only tasks older than 1 second
681693
.ToArray();
682694

683695
foreach (var occurrence in occurrencesToUpdate)
@@ -1004,4 +1016,4 @@ private void ApplyFunctionContextToCronOccurrence(CronTickerOccurrenceEntity<TCr
10041016

10051017
#endregion
10061018
}
1007-
}
1019+
}

src/TickerQ/Src/TickerQThreadPool/TickerQTaskScheduler.cs

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,11 @@ public TickerQTaskScheduler(
5353
_workerQueues[i] = new ConcurrentQueue<WorkItem>();
5454
}
5555

56-
// Start at least one worker immediately to handle incoming tasks
57-
TryStartWorker();
56+
// Start all workers upfront to honor maxConcurrency
57+
for (int i = 0; i < _maxConcurrency; i++)
58+
{
59+
TryStartWorker();
60+
}
5861
}
5962

6063
/// <summary>
@@ -149,20 +152,14 @@ private void EnsureWorkerAvailable()
149152
TryStartWorker();
150153
return;
151154
}
152-
153-
// Start more workers if we have queued tasks
155+
156+
// If there is queued work and we still have capacity, start another worker
154157
var totalQueued = _totalQueuedTasks;
155158
var activeWorkers = _activeWorkers;
156-
157-
// If we have tasks but not enough workers, start more
159+
158160
if (totalQueued > 0 && activeWorkers < _maxConcurrency)
159161
{
160-
// Start workers proportional to load
161-
var desiredWorkers = Math.Min(totalQueued, _maxConcurrency);
162-
if (desiredWorkers > activeWorkers)
163-
{
164-
TryStartWorker();
165-
}
162+
TryStartWorker();
166163
}
167164
}
168165

@@ -253,24 +250,7 @@ private async Task WorkerLoopCoreAsync(int workerId)
253250
// No work found - check if we should exit
254251
if (DateTime.UtcNow - lastWorkTime > _idleWorkerTimeout)
255252
{
256-
// Check ALL queues for any remaining work before exiting
257-
bool anyWorkRemaining = false;
258-
for (int i = 0; i < _maxConcurrency; i++)
259-
{
260-
if (_workerQueues[i].Count > 0)
261-
{
262-
anyWorkRemaining = true;
263-
break;
264-
}
265-
}
266-
267-
// Only exit if there's really no work and we have minimum workers
268-
if (!anyWorkRemaining && _totalQueuedTasks == 0 && _activeWorkers > 1)
269-
{
270-
break; // Exit this worker
271-
}
272-
273-
// Reset timer if we need to stay
253+
// Keep workers alive to maintain maxConcurrency; just reset timer
274254
lastWorkTime = DateTime.UtcNow;
275255
}
276256

@@ -337,8 +317,43 @@ private async Task ExecuteWorkAsync(WorkItem workItem)
337317
// Check cancellation before executing
338318
if (!workItem.UserToken.IsCancellationRequested && !_shutdownCts.Token.IsCancellationRequested)
339319
{
340-
// Execute the work asynchronously - this won't block the worker
341-
await workItem.Work(workItem.UserToken).ConfigureAwait(false);
320+
// Start the work without awaiting it so this worker
321+
// can continue processing other items while the task awaits.
322+
var task = workItem.Work(workItem.UserToken);
323+
324+
if (task == null)
325+
return;
326+
327+
if (!task.IsCompleted)
328+
{
329+
// Observe completion and exceptions without blocking the worker loop
330+
_ = task.ContinueWith(t =>
331+
{
332+
try
333+
{
334+
if (t.IsFaulted)
335+
{
336+
_ = t.Exception;
337+
}
338+
}
339+
catch
340+
{
341+
// Swallow continuation exceptions
342+
}
343+
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
344+
}
345+
else
346+
{
347+
// Task already completed synchronously – observe any exception
348+
try
349+
{
350+
await task.ConfigureAwait(false);
351+
}
352+
catch
353+
{
354+
// Swallow exceptions to keep worker alive
355+
}
356+
}
342357
}
343358
}
344359
catch (OperationCanceledException)

0 commit comments

Comments
 (0)