Skip to content

Commit 89430ed

Browse files
authored
Merge pull request #723 from DDzia/fix/issue-721
added DateTimeProvider to usages #issue-721
2 parents ad4b6db + 3e72d29 commit 89430ed

File tree

10 files changed

+44
-30
lines changed

10 files changed

+44
-30
lines changed

src/WorkflowCore/Services/ActivityController.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public ActivityController(ISubscriptionRepository subscriptionRepository, IWorkf
2828

2929
public async Task<PendingActivity> GetPendingActivity(string activityName, string workerId, TimeSpan? timeout = null)
3030
{
31-
var endTime = DateTime.UtcNow.Add(timeout ?? TimeSpan.Zero);
31+
var endTime = _dateTimeProvider.UtcNow.Add(timeout ?? TimeSpan.Zero);
3232
var firstPass = true;
3333
EventSubscription subscription = null;
34-
while ((subscription == null && DateTime.UtcNow < endTime) || firstPass)
34+
while ((subscription == null && _dateTimeProvider.UtcNow < endTime) || firstPass)
3535
{
3636
if (!firstPass)
3737
await Task.Delay(100);

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,17 @@ internal class RunnablePoller : IBackgroundTask
1515
private readonly ILogger _logger;
1616
private readonly IGreyList _greylist;
1717
private readonly WorkflowOptions _options;
18+
private readonly IDateTimeProvider _dateTimeProvider;
1819
private Timer _pollTimer;
1920

20-
public RunnablePoller(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IGreyList greylist, WorkflowOptions options)
21+
public RunnablePoller(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IGreyList greylist, IDateTimeProvider dateTimeProvider, WorkflowOptions options)
2122
{
2223
_persistenceStore = persistenceStore;
2324
_greylist = greylist;
2425
_queueProvider = queueProvider;
2526
_logger = loggerFactory.CreateLogger<RunnablePoller>();
2627
_lockProvider = lockProvider;
28+
_dateTimeProvider = dateTimeProvider;
2729
_options = options;
2830
}
2931

@@ -54,7 +56,7 @@ private async void PollRunnables(object target)
5456
try
5557
{
5658
_logger.LogInformation("Polling for runnable workflows");
57-
var runnables = await _persistenceStore.GetRunnableInstances(DateTime.Now);
59+
var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now);
5860
foreach (var item in runnables)
5961
{
6062
if (_greylist.Contains($"wf:{item}"))
@@ -85,7 +87,7 @@ private async void PollRunnables(object target)
8587
try
8688
{
8789
_logger.LogInformation("Polling for unprocessed events");
88-
var events = await _persistenceStore.GetRunnableEvents(DateTime.Now);
90+
var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now);
8991
foreach (var item in events.ToList())
9092
{
9193
if (_greylist.Contains($"evt:{item}"))

src/WorkflowCore/Services/CancellationProcessor.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ public class CancellationProcessor : ICancellationProcessor
1212
{
1313
protected readonly ILogger _logger;
1414
private readonly IExecutionResultProcessor _executionResultProcessor;
15+
private readonly IDateTimeProvider _dateTimeProvider;
1516

16-
public CancellationProcessor(IExecutionResultProcessor executionResultProcessor, ILoggerFactory logFactory)
17+
public CancellationProcessor(IExecutionResultProcessor executionResultProcessor, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
1718
{
1819
_executionResultProcessor = executionResultProcessor;
1920
_logger = logFactory.CreateLogger<CancellationProcessor>();
21+
_dateTimeProvider = dateTimeProvider;
2022
}
2123

2224
public void ProcessCancellations(WorkflowInstance workflow, WorkflowDefinition workflowDef, WorkflowExecutorResult executionResult)
@@ -44,13 +46,13 @@ public void ProcessCancellations(WorkflowInstance workflow, WorkflowDefinition w
4446
_executionResultProcessor.ProcessExecutionResult(workflow, workflowDef, ptr, step, ExecutionResult.Next(), executionResult);
4547
}
4648

47-
ptr.EndTime = DateTime.Now.ToUniversalTime();
49+
ptr.EndTime = _dateTimeProvider.UtcNow;
4850
ptr.Active = false;
4951
ptr.Status = PointerStatus.Cancelled;
5052

5153
foreach (var descendent in workflow.ExecutionPointers.FindByScope(ptr.Id).Where(x => x.Status != PointerStatus.Complete && x.Status != PointerStatus.Cancelled))
5254
{
53-
descendent.EndTime = DateTime.Now.ToUniversalTime();
55+
descendent.EndTime = _dateTimeProvider.UtcNow;
5456
descendent.Active = false;
5557
descendent.Status = PointerStatus.Cancelled;
5658
}

src/WorkflowCore/Services/GreyList.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,29 @@ public class GreyList : IGreyList, IDisposable
1111
private readonly Timer _cycleTimer;
1212
private readonly ConcurrentDictionary<string, DateTime> _list;
1313
private readonly ILogger _logger;
14+
private readonly IDateTimeProvider _dateTimeProvider;
1415
private const int CYCLE_TIME = 30;
1516
private const int TTL = 5;
1617

17-
public GreyList(ILoggerFactory loggerFactory)
18+
public GreyList(ILoggerFactory loggerFactory, IDateTimeProvider dateTimeProvider)
1819
{
1920
_logger = loggerFactory.CreateLogger<GreyList>();
21+
_dateTimeProvider = dateTimeProvider;
2022
_list = new ConcurrentDictionary<string, DateTime>();
2123
_cycleTimer = new Timer(new TimerCallback(Cycle), null, TimeSpan.FromMinutes(CYCLE_TIME), TimeSpan.FromMinutes(CYCLE_TIME));
2224
}
2325

2426
public void Add(string id)
2527
{
26-
_list.AddOrUpdate(id, DateTime.Now, (key, val) => DateTime.Now);
28+
_list.AddOrUpdate(id, _dateTimeProvider.Now, (key, val) => _dateTimeProvider.Now);
2729
}
2830

2931
public bool Contains(string id)
3032
{
3133
if (!_list.TryGetValue(id, out var start))
3234
return false;
3335

34-
var result = start > (DateTime.Now.AddMinutes(-1 * TTL));
36+
var result = start > (_dateTimeProvider.Now.AddMinutes(-1 * TTL));
3537

3638
if (!result)
3739
_list.TryRemove(id, out var _);

src/WorkflowCore/Services/SyncWorkflowRunner.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ public class SyncWorkflowRunner : ISyncWorkflowRunner
1818
private readonly IPersistenceProvider _persistenceStore;
1919
private readonly IExecutionPointerFactory _pointerFactory;
2020
private readonly IQueueProvider _queueService;
21+
private readonly IDateTimeProvider _dateTimeProvider;
2122

22-
public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistributedLockProvider lockService, IWorkflowRegistry registry, IPersistenceProvider persistenceStore, IExecutionPointerFactory pointerFactory, IQueueProvider queueService)
23+
public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistributedLockProvider lockService, IWorkflowRegistry registry, IPersistenceProvider persistenceStore, IExecutionPointerFactory pointerFactory, IQueueProvider queueService, IDateTimeProvider dateTimeProvider)
2324
{
2425
_host = host;
2526
_executor = executor;
@@ -28,6 +29,7 @@ public SyncWorkflowRunner(IWorkflowHost host, IWorkflowExecutor executor, IDistr
2829
_persistenceStore = persistenceStore;
2930
_pointerFactory = pointerFactory;
3031
_queueService = queueService;
32+
_dateTimeProvider = dateTimeProvider;
3133
}
3234

3335
public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, int version, TData data, string reference, TimeSpan timeOut, bool persistSate = true)
@@ -46,7 +48,7 @@ public async Task<WorkflowInstance> RunWorkflowSync<TData>(string workflowId, in
4648
Data = data,
4749
Description = def.Description,
4850
NextExecution = 0,
49-
CreateTime = DateTime.Now.ToUniversalTime(),
51+
CreateTime = _dateTimeProvider.UtcNow,
5052
Status = WorkflowStatus.Suspended,
5153
Reference = reference
5254
};

src/WorkflowCore/Services/WorkflowController.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ public class WorkflowController : IWorkflowController
2222
private readonly ILifeCycleEventHub _eventHub;
2323
private readonly IServiceProvider _serviceProvider;
2424
private readonly ILogger _logger;
25+
private readonly IDateTimeProvider _dateTimeProvider;
2526

26-
public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, IExecutionPointerFactory pointerFactory, ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
27+
public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLockProvider lockProvider, IWorkflowRegistry registry, IQueueProvider queueProvider, IExecutionPointerFactory pointerFactory, ILifeCycleEventHub eventHub, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IDateTimeProvider dateTimeProvider)
2728
{
2829
_persistenceStore = persistenceStore;
2930
_lockProvider = lockProvider;
@@ -33,6 +34,7 @@ public WorkflowController(IPersistenceProvider persistenceStore, IDistributedLoc
3334
_eventHub = eventHub;
3435
_serviceProvider = serviceProvider;
3536
_logger = loggerFactory.CreateLogger<WorkflowController>();
37+
_dateTimeProvider = dateTimeProvider;
3638
}
3739

3840
public Task<string> StartWorkflow(string workflowId, object data = null, string reference=null)
@@ -68,7 +70,7 @@ public async Task<string> StartWorkflow<TData>(string workflowId, int? version,
6870
Data = data,
6971
Description = def.Description,
7072
NextExecution = 0,
71-
CreateTime = DateTime.Now.ToUniversalTime(),
73+
CreateTime = _dateTimeProvider.UtcNow,
7274
Status = WorkflowStatus.Runnable,
7375
Reference = reference
7476
};
@@ -94,7 +96,7 @@ public async Task<string> StartWorkflow<TData>(string workflowId, int? version,
9496
await _queueProvider.QueueWork(id, QueueType.Index);
9597
await _eventHub.PublishNotification(new WorkflowStarted()
9698
{
97-
EventTimeUtc = DateTime.UtcNow,
99+
EventTimeUtc = _dateTimeProvider.UtcNow,
98100
Reference = reference,
99101
WorkflowInstanceId = id,
100102
WorkflowDefinitionId = def.Id,
@@ -111,7 +113,7 @@ public async Task PublishEvent(string eventName, string eventKey, object eventDa
111113
if (effectiveDate.HasValue)
112114
evt.EventTime = effectiveDate.Value.ToUniversalTime();
113115
else
114-
evt.EventTime = DateTime.Now.ToUniversalTime();
116+
evt.EventTime = _dateTimeProvider.UtcNow;
115117

116118
evt.EventData = eventData;
117119
evt.EventKey = eventKey;
@@ -137,7 +139,7 @@ public async Task<bool> SuspendWorkflow(string workflowId)
137139
await _queueProvider.QueueWork(workflowId, QueueType.Index);
138140
await _eventHub.PublishNotification(new WorkflowSuspended()
139141
{
140-
EventTimeUtc = DateTime.UtcNow,
142+
EventTimeUtc = _dateTimeProvider.UtcNow,
141143
Reference = wf.Reference,
142144
WorkflowInstanceId = wf.Id,
143145
WorkflowDefinitionId = wf.WorkflowDefinitionId,
@@ -173,7 +175,7 @@ public async Task<bool> ResumeWorkflow(string workflowId)
173175
await _queueProvider.QueueWork(workflowId, QueueType.Index);
174176
await _eventHub.PublishNotification(new WorkflowResumed()
175177
{
176-
EventTimeUtc = DateTime.UtcNow,
178+
EventTimeUtc = _dateTimeProvider.UtcNow,
177179
Reference = wf.Reference,
178180
WorkflowInstanceId = wf.Id,
179181
WorkflowDefinitionId = wf.WorkflowDefinitionId,
@@ -207,7 +209,7 @@ public async Task<bool> TerminateWorkflow(string workflowId)
207209
await _queueProvider.QueueWork(workflowId, QueueType.Index);
208210
await _eventHub.PublishNotification(new WorkflowTerminated()
209211
{
210-
EventTimeUtc = DateTime.UtcNow,
212+
EventTimeUtc = _dateTimeProvider.UtcNow,
211213
Reference = wf.Reference,
212214
WorkflowInstanceId = wf.Id,
213215
WorkflowDefinitionId = wf.WorkflowDefinitionId,

src/providers/WorkflowCore.Providers.AWS/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static WorkflowOptions UseAwsSimpleQueueService(this WorkflowOptions opti
2121

2222
public static WorkflowOptions UseAwsDynamoLocking(this WorkflowOptions options, AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName)
2323
{
24-
options.UseDistributedLockManager(sp => new DynamoLockProvider(credentials, config, tableName, sp.GetService<ILoggerFactory>()));
24+
options.UseDistributedLockManager(sp => new DynamoLockProvider(credentials, config, tableName, sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
2525
return options;
2626
}
2727

@@ -35,7 +35,7 @@ public static WorkflowOptions UseAwsDynamoPersistence(this WorkflowOptions optio
3535
public static WorkflowOptions UseAwsKinesis(this WorkflowOptions options, AWSCredentials credentials, RegionEndpoint region, string appName, string streamName)
3636
{
3737
options.Services.AddTransient<IKinesisTracker>(sp => new KinesisTracker(credentials, region, "workflowcore_kinesis", sp.GetService<ILoggerFactory>()));
38-
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(credentials, region, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>()));
38+
options.Services.AddTransient<IKinesisStreamConsumer>(sp => new KinesisStreamConsumer(credentials, region, sp.GetService<IKinesisTracker>(), sp.GetService<IDistributedLockProvider>(), sp.GetService<ILoggerFactory>(), sp.GetService<IDateTimeProvider>()));
3939
options.UseEventHub(sp => new KinesisProvider(credentials, region, appName, streamName, sp.GetService<IKinesisStreamConsumer>(), sp.GetService<ILoggerFactory>()));
4040
return options;
4141
}

src/providers/WorkflowCore.Providers.AWS/Services/DynamoLockProvider.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,16 @@ public class DynamoLockProvider : IDistributedLockProvider
2323
private Task _heartbeatTask;
2424
private CancellationTokenSource _cancellationTokenSource;
2525
private readonly AutoResetEvent _mutex = new AutoResetEvent(true);
26+
private readonly IDateTimeProvider _dateTimeProvider;
2627

27-
public DynamoLockProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName, ILoggerFactory logFactory)
28+
public DynamoLockProvider(AWSCredentials credentials, AmazonDynamoDBConfig config, string tableName, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
2829
{
2930
_logger = logFactory.CreateLogger<DynamoLockProvider>();
3031
_client = new AmazonDynamoDBClient(credentials, config);
3132
_localLocks = new List<string>();
3233
_tableName = tableName;
3334
_nodeId = Guid.NewGuid().ToString();
35+
_dateTimeProvider = dateTimeProvider;
3436
}
3537

3638
public async Task<bool> AcquireLock(string Id, CancellationToken cancellationToken)
@@ -46,7 +48,7 @@ public async Task<bool> AcquireLock(string Id, CancellationToken cancellationTok
4648
{ "lock_owner", new AttributeValue(_nodeId) },
4749
{ "expires", new AttributeValue()
4850
{
49-
N = Convert.ToString(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds() + _ttl)
51+
N = Convert.ToString(new DateTimeOffset(_dateTimeProvider.UtcNow).ToUnixTimeMilliseconds() + _ttl)
5052
}
5153
}
5254
},
@@ -55,7 +57,7 @@ public async Task<bool> AcquireLock(string Id, CancellationToken cancellationTok
5557
{
5658
{ ":expired", new AttributeValue()
5759
{
58-
N = Convert.ToString(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds() + _jitter)
60+
N = Convert.ToString(new DateTimeOffset(_dateTimeProvider.UtcNow).ToUnixTimeMilliseconds() + _jitter)
5961
}
6062
}
6163
}
@@ -154,7 +156,7 @@ private async void SendHeartbeat()
154156
{ "lock_owner", new AttributeValue(_nodeId) },
155157
{ "expires", new AttributeValue()
156158
{
157-
N = Convert.ToString(new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds() + _ttl)
159+
N = Convert.ToString(new DateTimeOffset(_dateTimeProvider.UtcNow).ToUnixTimeMilliseconds() + _ttl)
158160
}
159161
}
160162
},

src/providers/WorkflowCore.Providers.AWS/Services/KinesisStreamConsumer.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,17 @@ public class KinesisStreamConsumer : IKinesisStreamConsumer, IDisposable
2323
private readonly Task _processTask;
2424
private readonly int _batchSize = 100;
2525
private ICollection<ShardSubscription> _subscribers = new HashSet<ShardSubscription>();
26+
private readonly IDateTimeProvider _dateTimeProvider;
2627

27-
public KinesisStreamConsumer(AWSCredentials credentials, RegionEndpoint region, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory)
28+
public KinesisStreamConsumer(AWSCredentials credentials, RegionEndpoint region, IKinesisTracker tracker, IDistributedLockProvider lockManager, ILoggerFactory logFactory, IDateTimeProvider dateTimeProvider)
2829
{
2930
_logger = logFactory.CreateLogger(GetType());
3031
_tracker = tracker;
3132
_lockManager = lockManager;
3233
_client = new AmazonKinesisClient(credentials, region);
3334
_processTask = new Task(Process);
3435
_processTask.Start();
36+
_dateTimeProvider = dateTimeProvider;
3537
}
3638

3739
public async Task Subscribe(string appName, string stream, Action<Record> action)
@@ -59,7 +61,7 @@ private async void Process()
5961
{
6062
try
6163
{
62-
var todo = _subscribers.Where(x => x.Snooze < DateTime.Now).ToList();
64+
var todo = _subscribers.Where(x => x.Snooze < _dateTimeProvider.Now).ToList();
6365
foreach (var sub in todo)
6466
{
6567
if (!await _lockManager.AcquireLock($"{sub.AppName}.{sub.Stream}.{sub.Shard.ShardId}",
@@ -71,7 +73,7 @@ private async void Process()
7173
var records = await GetBatch(sub);
7274

7375
if (records.Records.Count == 0)
74-
sub.Snooze = DateTime.Now.AddSeconds(5);
76+
sub.Snooze = _dateTimeProvider.Now.AddSeconds(5);
7577

7678
var lastSequence = string.Empty;
7779

src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public async Task PersistWorkflow(WorkflowInstance workflow)
6262
public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
6363
{
6464
var result = new List<string>();
65-
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", -1, DateTime.UtcNow.Ticks);
65+
var data = await _redis.SortedSetRangeByScoreAsync($"{_prefix}.{WORKFLOW_SET}.{RUNNABLE_INDEX}", -1, asAt.ToUniversalTime().Ticks);
6666

6767
foreach (var item in data)
6868
result.Add(item);

0 commit comments

Comments
 (0)