Skip to content

Commit f7b961a

Browse files
authored
Fix/schedulerbackground (#376)
* Fixed the scheduler background to Task.Delay since the Peridic Timer were skipping ticks on short interval with seconds. * small refactor * fixed the mixd up delete ticker
1 parent 15acdce commit f7b961a

File tree

6 files changed

+114
-155
lines changed

6 files changed

+114
-155
lines changed

src/TickerQ.EntityFrameworkCore/Infrastructure/BasePersistenceProvider.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public async Task ReleaseAcquiredCronTickerOccurrences(Guid[] occurrenceIds, Can
339339
? dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
340340
: dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>().Where(x => occurrenceIds.Contains(x.Id));
341341

342-
await baseQuery.Where(x => occurrenceIds.Contains(x.Id))
342+
await baseQuery
343343
.WhereCanAcquire(_lockHolder)
344344
.ExecuteUpdateAsync(setter => setter
345345
.SetProperty(x => x.LockHolder, _ => null)
@@ -415,7 +415,7 @@ public async IAsyncEnumerable<CronTickerOccurrenceEntity<TCronTicker>> QueueCron
415415
{
416416
Id = item.NextCronOccurrence.Id,
417417
CronTickerId = item.Id,
418-
ExecutionTime = now,
418+
ExecutionTime = executionTime,
419419
Status = TickerStatus.Queued,
420420
LockHolder = _lockHolder,
421421
LockedAt = now,
@@ -464,12 +464,12 @@ public async Task<byte[]> GetCronTickerOccurrenceRequest(Guid tickerId, Cancella
464464
.ConfigureAwait(false);
465465
}
466466

467-
public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] timeTickerIds, InternalFunctionContext functionContext,
467+
public async Task UpdateCronTickerOccurrencesWithUnifiedContext(Guid[] cronOccurrenceIds, InternalFunctionContext functionContext,
468468
CancellationToken cancellationToken = default)
469469
{
470470
await using var dbContext = await DbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);;
471471
await dbContext.Set<CronTickerOccurrenceEntity<TCronTicker>>()
472-
.Where(x => timeTickerIds.Contains(x.CronTickerId))
472+
.Where(x => cronOccurrenceIds.Contains(x.Id))
473473
.ExecuteUpdateAsync(setter => setter.UpdateCronTickerOccurrence<TCronTicker>(functionContext), cancellationToken)
474474
.ConfigureAwait(false);
475475
}

src/TickerQ.Utilities/Managers/InternalTickerManager.cs

Lines changed: 75 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -28,132 +28,99 @@ public InternalTickerManager(
2828
_clock = clock ?? throw new ArgumentNullException(nameof(clock));
2929
_notificationHubSender = notificationHubSender;
3030
}
31-
31+
3232
public async Task<(TimeSpan TimeRemaining, InternalFunctionContext[] Functions)> GetNextTickers(CancellationToken cancellationToken = default)
3333
{
34-
while (true)
35-
{
36-
var minCronGroupTask = GetEarliestCronTickerGroupAsync(cancellationToken);
37-
var minTimeTickersTask = _persistenceProvider.GetEarliestTimeTickers(cancellationToken);
38-
39-
await Task.WhenAll(minCronGroupTask, minTimeTickersTask).ConfigureAwait(false);
40-
41-
var (minCronGroup, minTimeTickers) = (await minCronGroupTask, await minTimeTickersTask);
42-
43-
var minTimeTickerTime = minTimeTickers.Length != 0
44-
? minTimeTickers[0].ExecutionTime ?? default
45-
: default;
34+
var now = _clock.UtcNow;
4635

47-
var minTimeRemaining = CalculateMinTimeRemaining(minCronGroup, minTimeTickerTime, out var typesToQueue);
36+
var minCronGroupTask = GetEarliestCronTickerGroupAsync(cancellationToken);
37+
var minTimeTickersTask = _persistenceProvider.GetEarliestTimeTickers(cancellationToken);
4838

49-
if (minTimeRemaining == Timeout.InfiniteTimeSpan)
50-
return (Timeout.InfiniteTimeSpan, []);
39+
await Task.WhenAll(minCronGroupTask, minTimeTickersTask).ConfigureAwait(false);
5140

52-
var nextTickers = await RetrieveEligibleTickersAsync(minCronGroup, minTimeTickers, typesToQueue, cancellationToken).ConfigureAwait(false);
41+
var minCronGroup = await minCronGroupTask.ConfigureAwait(false);
42+
var minTimeTickers = await minTimeTickersTask.ConfigureAwait(false);
5343

54-
if (nextTickers.Length != 0)
55-
return (minTimeRemaining, nextTickers);
56-
57-
if(typesToQueue.All(x => x == TickerType.CronTickerOccurrence))
58-
return (minTimeRemaining, nextTickers);
59-
60-
await Task.Delay(TimeSpan.FromMilliseconds(10), cancellationToken).ConfigureAwait(false); // Faster retry for time-sensitive tasks
61-
}
62-
}
44+
var cronTime = minCronGroup?.Key;
45+
var timeTickerTime = minTimeTickers.Length > 0
46+
? minTimeTickers[0].ExecutionTime
47+
: null;
6348

64-
private TimeSpan CalculateMinTimeRemaining(
65-
(DateTime Key, InternalManagerContext[] Items)? minCronTicker,
66-
DateTime minTimeTicker,
67-
out TickerType[] sources)
68-
{
69-
var now = _clock.UtcNow;
49+
if (cronTime is null && timeTickerTime is null)
50+
return (Timeout.InfiniteTimeSpan, []);
7051

71-
DateTime? cron = minCronTicker?.Key;
72-
DateTime? time = minTimeTicker == default ? null : minTimeTicker;
52+
TimeSpan timeRemaining;
53+
bool includeCron = false;
54+
bool includeTimeTickers = false;
7355

74-
// no values
75-
if (cron is null && time is null)
56+
if (cronTime is null)
7657
{
77-
sources = [];
78-
return Timeout.InfiniteTimeSpan;
58+
includeTimeTickers = true;
59+
timeRemaining = SafeRemaining(timeTickerTime!.Value, now);
7960
}
80-
81-
// only cron
82-
if (time is null)
61+
else if (timeTickerTime is null)
8362
{
84-
sources = [TickerType.CronTickerOccurrence];
85-
var cronRemaining = cron.Value - now;
86-
// Ensure we don't return negative values - schedule for immediate execution
87-
return cronRemaining < TimeSpan.Zero ? TimeSpan.Zero : cronRemaining;
63+
includeCron = true;
64+
timeRemaining = SafeRemaining(cronTime.Value, now);
8865
}
89-
90-
// only time
91-
if (cron is null)
66+
else
9267
{
93-
sources = [TickerType.TimeTicker];
94-
var timeRemaining = time.Value - now;
95-
// Ensure we don't return negative values - schedule for immediate execution
96-
return timeRemaining < TimeSpan.Zero ? TimeSpan.Zero : timeRemaining;
97-
}
68+
var cronSecond = new DateTime(cronTime.Value.Year, cronTime.Value.Month, cronTime.Value.Day,
69+
cronTime.Value.Hour, cronTime.Value.Minute, cronTime.Value.Second);
70+
var timeSecond = new DateTime(timeTickerTime.Value.Year, timeTickerTime.Value.Month, timeTickerTime.Value.Day,
71+
timeTickerTime.Value.Hour, timeTickerTime.Value.Minute, timeTickerTime.Value.Second);
9872

99-
// both present - check if they're in the exact same second (ignoring milliseconds)
100-
var cronSecond = new DateTime(cron.Value.Year, cron.Value.Month, cron.Value.Day,
101-
cron.Value.Hour, cron.Value.Minute, cron.Value.Second);
102-
var timeSecond = new DateTime(time.Value.Year, time.Value.Month, time.Value.Day,
103-
time.Value.Hour, time.Value.Minute, time.Value.Second);
104-
105-
// Only batch if they're in the exact same second
106-
if (cronSecond == timeSecond)
107-
{
108-
sources = [TickerType.CronTickerOccurrence, TickerType.TimeTicker];
109-
var earliest = cron < time ? cron.Value : time.Value;
110-
var earliestRemaining = earliest - now;
111-
// Ensure we don't return negative values
112-
return earliestRemaining < TimeSpan.Zero ? TimeSpan.Zero : earliestRemaining;
73+
if (cronSecond == timeSecond)
74+
{
75+
includeCron = true;
76+
includeTimeTickers = true;
77+
var earliest = cronTime < timeTickerTime ? cronTime.Value : timeTickerTime.Value;
78+
timeRemaining = SafeRemaining(earliest, now);
79+
}
80+
else if (cronTime < timeTickerTime)
81+
{
82+
includeCron = true;
83+
timeRemaining = SafeRemaining(cronTime.Value, now);
84+
}
85+
else
86+
{
87+
includeTimeTickers = true;
88+
timeRemaining = SafeRemaining(timeTickerTime.Value, now);
89+
}
11390
}
11491

115-
// Different seconds - only process the earliest one
116-
if (cron < time)
117-
{
118-
sources = [TickerType.CronTickerOccurrence];
119-
var cronRemaining = cron.Value - now;
120-
return cronRemaining < TimeSpan.Zero ? TimeSpan.Zero : cronRemaining;
121-
}
92+
if (!includeCron && !includeTimeTickers)
93+
return (Timeout.InfiniteTimeSpan, []);
12294

123-
sources = [TickerType.TimeTicker];
124-
var finalTimeRemaining = time.Value - now;
125-
return finalTimeRemaining < TimeSpan.Zero ? TimeSpan.Zero : finalTimeRemaining;
126-
}
95+
InternalFunctionContext[] cronFunctions = [];
96+
InternalFunctionContext[] timeFunctions = [];
12797

128-
private async Task<InternalFunctionContext[]>RetrieveEligibleTickersAsync(
129-
(DateTime Key, InternalManagerContext[] Items)? minCronTicker,
130-
TimeTickerEntity[] minTimeTicker,
131-
TickerType[] typesToQueue,
132-
CancellationToken cancellationToken = default)
133-
{
134-
135-
if (typesToQueue.Contains(TickerType.CronTickerOccurrence) && typesToQueue.Contains(TickerType.TimeTicker))
136-
{
137-
var nextCronTickersTask = QueueNextCronTickersAsync(minCronTicker!.Value, cancellationToken);
138-
var nextTimeTickersTask = QueueNextTimeTickersAsync(minTimeTicker, cancellationToken);
98+
if (includeCron && minCronGroup is not null)
99+
cronFunctions = await QueueNextCronTickersAsync(minCronGroup.Value, cancellationToken).ConfigureAwait(false);
139100

140-
await Task.WhenAll(nextCronTickersTask, nextTimeTickersTask).ConfigureAwait(false);
141-
142-
var (nextCronTickers, nextTimeTickers) = (await nextCronTickersTask, await nextTimeTickersTask);
143-
144-
// Safety check for extremely large datasets
145-
var totalLength = nextCronTickers.Length + nextTimeTickers.Length;
146-
147-
var merged = new InternalFunctionContext[totalLength];
148-
nextCronTickers.AsSpan().CopyTo(merged.AsSpan(0, nextCronTickers.Length));
149-
nextTimeTickers.AsSpan().CopyTo(merged.AsSpan(nextCronTickers.Length, nextTimeTickers.Length));
150-
return merged;
151-
}
101+
if (includeTimeTickers && minTimeTickers.Length > 0)
102+
timeFunctions = await QueueNextTimeTickersAsync(minTimeTickers, cancellationToken).ConfigureAwait(false);
152103

153-
if (typesToQueue.Contains(TickerType.TimeTicker))
154-
return await QueueNextTimeTickersAsync(minTimeTicker, cancellationToken).ConfigureAwait(false);
155-
else
156-
return await QueueNextCronTickersAsync(minCronTicker!.Value, cancellationToken).ConfigureAwait(false);
104+
if (cronFunctions.Length == 0 && timeFunctions.Length == 0)
105+
return (timeRemaining, []);
106+
107+
if (cronFunctions.Length == 0)
108+
return (timeRemaining, timeFunctions);
109+
110+
if (timeFunctions.Length == 0)
111+
return (timeRemaining, cronFunctions);
112+
113+
var merged = new InternalFunctionContext[cronFunctions.Length + timeFunctions.Length];
114+
cronFunctions.AsSpan().CopyTo(merged.AsSpan(0, cronFunctions.Length));
115+
timeFunctions.AsSpan().CopyTo(merged.AsSpan(cronFunctions.Length, timeFunctions.Length));
116+
117+
return (timeRemaining, merged);
118+
}
119+
120+
private static TimeSpan SafeRemaining(DateTime target, DateTime now)
121+
{
122+
var remaining = target - now;
123+
return remaining < TimeSpan.Zero ? TimeSpan.Zero : remaining;
157124
}
158125

159126
private async Task<InternalFunctionContext[]> QueueNextTimeTickersAsync(TimeTickerEntity[] minTimeTickers, CancellationToken cancellationToken = default)
@@ -489,9 +456,9 @@ public async Task MigrateDefinedCronTickers((string, string)[] cronExpressions,
489456
public async Task DeleteTicker(Guid tickerId, TickerType type, CancellationToken cancellationToken = default)
490457
{
491458
if (type == TickerType.CronTickerOccurrence)
492-
await _persistenceProvider.RemoveTimeTickers([tickerId], cancellationToken).ConfigureAwait(false);
493-
else
494459
await _persistenceProvider.RemoveCronTickers([tickerId], cancellationToken).ConfigureAwait(false);
460+
else
461+
await _persistenceProvider.RemoveTimeTickers([tickerId], cancellationToken).ConfigureAwait(false);
495462
}
496463

497464
public async Task ReleaseDeadNodeResources(string instanceIdentifier, CancellationToken cancellationToken = default)
@@ -503,4 +470,4 @@ public async Task ReleaseDeadNodeResources(string instanceIdentifier, Cancellati
503470
await Task.WhenAll(cronOccurrence, timeTickers).ConfigureAwait(false);
504471
}
505472
}
506-
}
473+
}

src/TickerQ.Utilities/Models/InternalFunctionContext.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Diagnostics.CodeAnalysis;
4-
using System.Linq;
54
using System.Linq.Expressions;
65
using System.Reflection;
76
using TickerQ.Utilities.Enums;

src/TickerQ/Src/BackgroundServices/TickerQFallbackBackgroundService.cs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ namespace TickerQ.BackgroundServices;
1111
internal class TickerQFallbackBackgroundService : BackgroundService
1212
{
1313
private int _started;
14-
private PeriodicTimer _tickerFallbackJobPeriodicTimer;
1514
private readonly IInternalTickerManager _internalTickerManager;
1615
private readonly TickerExecutionTaskHandler _tickerExecutionTaskHandler;
1716
private readonly TickerQTaskScheduler _tickerQTaskScheduler;
@@ -33,16 +32,8 @@ public override Task StartAsync(CancellationToken ct)
3332

3433
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
3534
{
36-
_tickerFallbackJobPeriodicTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(10));
37-
await RunTickerQFallbackAsync(stoppingToken);
38-
}
39-
40-
private async Task RunTickerQFallbackAsync(CancellationToken stoppingToken)
41-
{
42-
while (await _tickerFallbackJobPeriodicTimer.WaitForNextTickAsync(stoppingToken))
35+
while (!stoppingToken.IsCancellationRequested)
4336
{
44-
var oldPeriod = _tickerFallbackJobPeriodicTimer.Period;
45-
4637
var functions = await _internalTickerManager.RunTimedOutTickers(stoppingToken);
4738

4839
if (functions.Length != 0)
@@ -76,13 +67,12 @@ private async Task RunTickerQFallbackAsync(CancellationToken stoppingToken)
7667
await _tickerQTaskScheduler.QueueAsync(ct => _tickerExecutionTaskHandler.ExecuteTaskAsync(function, true, ct), function.CachedPriority, stoppingToken);
7768
}
7869

79-
_tickerFallbackJobPeriodicTimer.Period = TimeSpan.FromMilliseconds(10);
70+
await Task.Delay(TimeSpan.FromMilliseconds(10), stoppingToken);
8071
}
8172
else
82-
_tickerFallbackJobPeriodicTimer.Period = _fallbackJobPeriod;
83-
84-
if(oldPeriod != _fallbackJobPeriod)
85-
await _tickerFallbackJobPeriodicTimer.WaitForNextTickAsync(stoppingToken);
73+
{
74+
await Task.Delay(_fallbackJobPeriod, stoppingToken);
75+
}
8676
}
8777
}
8878

@@ -92,4 +82,4 @@ public override async Task StopAsync(CancellationToken cancellationToken)
9282
Interlocked.Exchange(ref _started, 0);
9383
await base.StopAsync(cancellationToken);
9484
}
95-
}
85+
}

0 commit comments

Comments
 (0)