Skip to content

Commit 76d26b4

Browse files
committed
fix(Application): Added a new IBackgroundJobManager and its default implementation, which is used to schedule background jobs, such as the periodic triggering of a schedule
Closes #303
1 parent f5f90c5 commit 76d26b4

35 files changed

+653
-292
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright © 2022-Present The Synapse Authors
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
using ServerlessWorkflow.Sdk;
19+
20+
namespace Synapse.Application.Commands.Schedules;
21+
22+
/// <summary>
23+
/// Represents the <see cref="ICommand"/> used to complete a <see cref="V1Schedule"/>'s occurence
24+
/// </summary>
25+
[DataTransferObjectType(typeof(Integration.Commands.Schedules.V1CompleteScheduleOccurenceCommand))]
26+
public class V1CompleteScheduleOccurenceCommand
27+
: Command<Integration.Models.V1Schedule>
28+
{
29+
30+
/// <summary>
31+
/// Initializes a new <see cref="V1CompleteScheduleOccurenceCommand"/>
32+
/// </summary>
33+
protected V1CompleteScheduleOccurenceCommand() { }
34+
35+
/// <summary>
36+
/// Initializes a new <see cref="V1CompleteScheduleOccurenceCommand"/>
37+
/// </summary>
38+
/// <param name="scheduleId">The id of the <see cref="V1Schedule"/> to complete an occurence of</param>
39+
/// <param name="workflowInstanceId">The id of the <see cref="V1WorkflowInstance"/> that has been executed</param>
40+
public V1CompleteScheduleOccurenceCommand(string scheduleId, string workflowInstanceId)
41+
{
42+
this.ScheduleId = scheduleId;
43+
this.WorkflowInstanceId = workflowInstanceId;
44+
}
45+
46+
/// <summary>
47+
/// Gets the id of the <see cref="V1Schedule"/> to complete an occurence of
48+
/// </summary>
49+
public virtual string ScheduleId { get; protected set; } = null!;
50+
51+
/// <summary>
52+
/// Gets the id of the <see cref="V1WorkflowInstance"/> that has been executed
53+
/// </summary>
54+
public virtual string WorkflowInstanceId { get; protected set; } = null!;
55+
56+
}
57+
58+
/// <summary>
59+
/// Represents the service used to handle <see cref="V1CompleteScheduleOccurenceCommand"/>s
60+
/// </summary>
61+
public class V1CompleteScheduleOccurenceCommandHandler
62+
: CommandHandlerBase,
63+
ICommandHandler<V1CompleteScheduleOccurenceCommand, Integration.Models.V1Schedule>
64+
{
65+
66+
/// <summary>
67+
/// Initializes a new <see cref="V1CompleteScheduleOccurenceCommandHandler"/>
68+
/// </summary>
69+
/// <param name="loggerFactory">The service used to create <see cref="ILogger"/>s</param>
70+
/// <param name="mediator">The service used to mediate calls</param>
71+
/// <param name="mapper">The service used to map objects</param>
72+
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
73+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
74+
public V1CompleteScheduleOccurenceCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
75+
: base(loggerFactory, mediator, mapper)
76+
{
77+
this.Schedules = schedules;
78+
this.BackgroundJobManager = backgroundJobManager;
79+
}
80+
81+
/// <summary>
82+
/// Gets the <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s
83+
/// </summary>
84+
protected IRepository<V1Schedule> Schedules { get; }
85+
86+
/// <summary>
87+
/// Gets the service used to manage background jobs
88+
/// </summary>
89+
protected IBackgroundJobManager BackgroundJobManager { get; }
90+
91+
/// <inheritdoc/>
92+
public virtual async Task<IOperationResult<Integration.Models.V1Schedule>> HandleAsync(V1CompleteScheduleOccurenceCommand command, CancellationToken cancellationToken = default)
93+
{
94+
var schedule = await this.Schedules.FindAsync(command.ScheduleId, cancellationToken);
95+
if (schedule == null) throw DomainException.NullReference(typeof(V1Schedule), command.ScheduleId);
96+
schedule.CompleteOccurence(command.WorkflowInstanceId);
97+
await this.Schedules.UpdateAsync(schedule, cancellationToken);
98+
await this.Schedules.SaveChangesAsync(cancellationToken);
99+
if(schedule.Definition.Type == ScheduleDefinitionType.Interval && schedule.NextOccurenceAt.HasValue) await this.BackgroundJobManager.ScheduleJobAsync(schedule, cancellationToken);
100+
return this.Ok(this.Mapper.Map<Integration.Models.V1Schedule>(schedule));
101+
}
102+
103+
}

src/core/Synapse.Application/Commands/Schedules/v1/V1CreateScheduleCommand.cs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,20 @@ protected V1CreateScheduleCommand() { }
3636
/// <summary>
3737
/// Initializes a new <see cref="V1CreateScheduleCommand"/>
3838
/// </summary>
39-
/// <param name="type">The type of the <see cref="V1Schedule"/> to create</param>
39+
/// <param name="activationType">The type of the <see cref="V1Schedule"/> to create</param>
4040
/// <param name="definition">The definition of the <see cref="V1Schedule"/> to create</param>
4141
/// <param name="workflowId">The id of the <see cref="V1Workflow"/> to schedule</param>
42-
public V1CreateScheduleCommand(V1ScheduleType type, ScheduleDefinition definition, string workflowId)
42+
public V1CreateScheduleCommand(V1ScheduleActivationType activationType, ScheduleDefinition definition, string workflowId)
4343
{
44-
this.Type = type;
44+
this.ActivationType = activationType;
4545
this.Definition = definition;
4646
this.WorkflowId = workflowId;
4747
}
4848

4949
/// <summary>
5050
/// Gets the type of the <see cref="V1Schedule"/> to create
5151
/// </summary>
52-
public virtual V1ScheduleType Type { get; protected set; }
52+
public virtual V1ScheduleActivationType ActivationType { get; protected set; }
5353

5454
/// <summary>
5555
/// Gets the definition of the <see cref="V1Schedule"/> to create
@@ -79,11 +79,13 @@ public class V1CreateScheduleCommandHandler
7979
/// <param name="mapper">The service used to map objects</param>
8080
/// <param name="workflows">The <see cref="IRepository"/> used to manage <see cref="V1Workflow"/>s</param>
8181
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
82-
public V1CreateScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Workflow> workflows, IRepository<V1Schedule> schedules)
82+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
83+
public V1CreateScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Workflow> workflows, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
8384
: base(loggerFactory, mediator, mapper)
8485
{
8586
this.Workflows = workflows;
8687
this.Schedules = schedules;
88+
this.BackgroundJobManager = backgroundJobManager;
8789
}
8890

8991
/// <summary>
@@ -96,15 +98,21 @@ public V1CreateScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator me
9698
/// </summary>
9799
protected IRepository<V1Schedule> Schedules { get; }
98100

101+
/// <summary>
102+
/// Gets the service used to manage background jobs
103+
/// </summary>
104+
protected IBackgroundJobManager BackgroundJobManager { get; }
105+
99106
/// <inheritdoc/>
100107
public virtual async Task<IOperationResult<Integration.Models.V1Schedule>> HandleAsync(V1CreateScheduleCommand command, CancellationToken cancellationToken = default)
101108
{
102109
var workflowId = (await this.Mediator.ExecuteAndUnwrapAsync(Queries.Workflows.V1GetWorkflowByIdQuery.Parse(command.WorkflowId), cancellationToken))?.Id;
103110
if(string.IsNullOrWhiteSpace(workflowId)) throw DomainException.NullReference(typeof(V1Workflow), command.WorkflowId);
104111
var workflow = await this.Workflows.FindAsync(workflowId, cancellationToken);
105112
if (workflow == null) throw DomainException.NullReference(typeof(V1Workflow), workflowId);
106-
var schedule = await this.Schedules.AddAsync(new(command.Type, command.Definition, workflow), cancellationToken);
113+
var schedule = await this.Schedules.AddAsync(new(command.ActivationType, command.Definition, workflow), cancellationToken);
107114
await this.Schedules.SaveChangesAsync(cancellationToken);
115+
if (schedule.NextOccurenceAt.HasValue) await this.BackgroundJobManager.ScheduleJobAsync(schedule, cancellationToken);
108116
return this.Ok(this.Mapper.Map<Integration.Models.V1Schedule>(schedule));
109117
}
110118

src/core/Synapse.Application/Commands/Schedules/v1/V1MakeScheduleObsoleteCommand.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,24 @@ public class V1MakeScheduleObsoleteCommandHandler
6161
/// <param name="mediator">The service used to mediate calls</param>
6262
/// <param name="mapper">The service used to map objects</param>
6363
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
64-
public V1MakeScheduleObsoleteCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules)
64+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
65+
public V1MakeScheduleObsoleteCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
6566
: base(loggerFactory, mediator, mapper)
6667
{
6768
this.Schedules = schedules;
69+
this.BackgroundJobManager = backgroundJobManager;
6870
}
6971

7072
/// <summary>
7173
/// Gets the <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s
7274
/// </summary>
7375
protected IRepository<V1Schedule> Schedules { get; }
7476

77+
/// <summary>
78+
/// Gets the service used to manage background jobs
79+
/// </summary>
80+
protected IBackgroundJobManager BackgroundJobManager { get; }
81+
7582
/// <inheritdoc/>
7683
public virtual async Task<IOperationResult> HandleAsync(V1MakeScheduleObsoleteCommand command, CancellationToken cancellationToken = default)
7784
{
@@ -80,6 +87,7 @@ public virtual async Task<IOperationResult> HandleAsync(V1MakeScheduleObsoleteCo
8087
schedule.MakeObsolete();
8188
await this.Schedules.UpdateAsync(schedule, cancellationToken);
8289
await this.Schedules.SaveChangesAsync(cancellationToken);
90+
await this.BackgroundJobManager.CancelJobAsync(schedule.Id, cancellationToken);
8391
return this.Ok();
8492
}
8593

src/core/Synapse.Application/Commands/Schedules/v1/V1ResumeScheduleCommand.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,24 @@ public class V1ResumeScheduleCommandHandler
6363
/// <param name="mediator">The service used to mediate calls</param>
6464
/// <param name="mapper">The service used to map objects</param>
6565
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
66-
public V1ResumeScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules)
66+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
67+
public V1ResumeScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
6768
: base(loggerFactory, mediator, mapper)
6869
{
6970
this.Schedules = schedules;
71+
this.BackgroundJobManager = backgroundJobManager;
7072
}
7173

7274
/// <summary>
7375
/// Gets the <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s
7476
/// </summary>
7577
protected IRepository<V1Schedule> Schedules { get; }
7678

79+
/// <summary>
80+
/// Gets the service used to manage background jobs
81+
/// </summary>
82+
protected IBackgroundJobManager BackgroundJobManager { get; }
83+
7784
/// <inheritdoc/>
7885
public virtual async Task<IOperationResult<Integration.Models.V1Schedule>> HandleAsync(V1ResumeScheduleCommand command, CancellationToken cancellationToken = default)
7986
{
@@ -82,6 +89,7 @@ public V1ResumeScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator me
8289
schedule.Resume();
8390
schedule = await this.Schedules.UpdateAsync(schedule, cancellationToken);
8491
await this.Schedules.SaveChangesAsync(cancellationToken);
92+
if (schedule.NextOccurenceAt.HasValue) await this.BackgroundJobManager.ScheduleJobAsync(schedule, cancellationToken);
8593
return this.Ok(this.Mapper.Map<Integration.Models.V1Schedule>(schedule));
8694
}
8795

src/core/Synapse.Application/Commands/Schedules/v1/V1RetireScheduleCommand.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
namespace Synapse.Application.Commands.Schedules
1919
{
20+
2021
/// <summary>
2122
/// Represents the <see cref="ICommand"/> used to retire a <see cref="V1Schedule"/>
2223
/// </summary>
@@ -61,17 +62,24 @@ public class V1RetireScheduleCommandHandler
6162
/// <param name="mediator">The service used to mediate calls</param>
6263
/// <param name="mapper">The service used to map objects</param>
6364
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
64-
public V1RetireScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules)
65+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
66+
public V1RetireScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
6567
: base(loggerFactory, mediator, mapper)
6668
{
6769
this.Schedules = schedules;
70+
this.BackgroundJobManager = backgroundJobManager;
6871
}
6972

7073
/// <summary>
7174
/// Gets the <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s
7275
/// </summary>
7376
protected IRepository<V1Schedule> Schedules { get; }
7477

78+
/// <summary>
79+
/// Gets the service used to manage background jobs
80+
/// </summary>
81+
protected IBackgroundJobManager BackgroundJobManager { get; }
82+
7583
/// <inheritdoc/>
7684
public virtual async Task<IOperationResult> HandleAsync(V1RetireScheduleCommand command, CancellationToken cancellationToken = default)
7785
{
@@ -80,6 +88,7 @@ public virtual async Task<IOperationResult> HandleAsync(V1RetireScheduleCommand
8088
schedule.Retire();
8189
await this.Schedules.UpdateAsync(schedule, cancellationToken);
8290
await this.Schedules.SaveChangesAsync(cancellationToken);
91+
await this.BackgroundJobManager.CancelJobAsync(schedule.Id, cancellationToken);
8392
return this.Ok();
8493
}
8594

src/core/Synapse.Application/Commands/Schedules/v1/V1SuspendScheduleCommand.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,24 @@ public class V1SuspendScheduleCommandHandler
6161
/// <param name="mediator">The service used to mediate calls</param>
6262
/// <param name="mapper">The service used to map objects</param>
6363
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
64-
public V1SuspendScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules)
64+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
65+
public V1SuspendScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
6566
: base(loggerFactory, mediator, mapper)
6667
{
6768
this.Schedules = schedules;
69+
this.BackgroundJobManager = backgroundJobManager;
6870
}
6971

7072
/// <summary>
7173
/// Gets the <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s
7274
/// </summary>
7375
protected IRepository<V1Schedule> Schedules { get; }
7476

77+
/// <summary>
78+
/// Gets the service used to manage background jobs
79+
/// </summary>
80+
protected IBackgroundJobManager BackgroundJobManager { get; }
81+
7582
/// <inheritdoc/>
7683
public virtual async Task<IOperationResult> HandleAsync(V1SuspendScheduleCommand command, CancellationToken cancellationToken = default)
7784
{
@@ -80,6 +87,7 @@ public virtual async Task<IOperationResult> HandleAsync(V1SuspendScheduleCommand
8087
schedule.Suspend();
8188
await this.Schedules.UpdateAsync(schedule, cancellationToken);
8289
await this.Schedules.SaveChangesAsync(cancellationToken);
90+
await this.BackgroundJobManager.CancelJobAsync(schedule.Id, cancellationToken);
8391
return this.Ok();
8492
}
8593

src/core/Synapse.Application/Commands/Schedules/v1/V1TriggerScheduleCommand.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
*
1616
*/
1717

18+
using ServerlessWorkflow.Sdk;
19+
using Synapse.Application.Commands.WorkflowInstances;
20+
1821
namespace Synapse.Application.Commands.Schedules
1922
{
2023

@@ -62,25 +65,34 @@ public class V1TriggerScheduleCommandHandler
6265
/// <param name="mediator">The service used to mediate calls</param>
6366
/// <param name="mapper">The service used to map objects</param>
6467
/// <param name="schedules">The <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s</param>
65-
public V1TriggerScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules)
68+
/// <param name="backgroundJobManager">The service used to manage background jobs</param>
69+
public V1TriggerScheduleCommandHandler(ILoggerFactory loggerFactory, IMediator mediator, IMapper mapper, IRepository<V1Schedule> schedules, IBackgroundJobManager backgroundJobManager)
6670
: base(loggerFactory, mediator, mapper)
6771
{
6872
this.Schedules = schedules;
73+
this.BackgroundJobManager = backgroundJobManager;
6974
}
7075

7176
/// <summary>
7277
/// Gets the <see cref="IRepository"/> used to manage <see cref="V1Schedule"/>s
7378
/// </summary>
7479
protected IRepository<V1Schedule> Schedules { get; }
7580

81+
/// <summary>
82+
/// Gets the service used to manage background jobs
83+
/// </summary>
84+
protected IBackgroundJobManager BackgroundJobManager { get; }
85+
7686
/// <inheritdoc/>
7787
public virtual async Task<IOperationResult<Integration.Models.V1Schedule>> HandleAsync(V1TriggerScheduleCommand command, CancellationToken cancellationToken = default)
7888
{
7989
var schedule = await this.Schedules.FindAsync(command.ScheduleId, cancellationToken);
8090
if (schedule == null) throw DomainException.NullReference(typeof(V1Schedule), command.ScheduleId);
81-
schedule.Trigger();
91+
var workflowInstance = await this.Mediator.ExecuteAndUnwrapAsync(new V1CreateWorkflowInstanceCommand(schedule.WorkflowId, V1WorkflowInstanceActivationType.Schedule, null, null, true, null));
92+
schedule.Occur(workflowInstance.Id);
8293
schedule = await this.Schedules.UpdateAsync(schedule, cancellationToken);
8394
await this.Schedules.SaveChangesAsync(cancellationToken);
95+
if (schedule.Definition.Type == ScheduleDefinitionType.Cron && schedule.NextOccurenceAt.HasValue) await this.BackgroundJobManager.ScheduleJobAsync(schedule, cancellationToken);
8496
return this.Ok(this.Mapper.Map<Integration.Models.V1Schedule>(schedule));
8597
}
8698

0 commit comments

Comments
 (0)