Skip to content

Commit 301b9fd

Browse files
committed
Add draft API
1 parent db42f91 commit 301b9fd

File tree

5 files changed

+38
-5
lines changed

5 files changed

+38
-5
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace WorkflowCore.Interface
2+
{
3+
public interface ITransaction
4+
{
5+
/// <summary>
6+
/// The transaction session specific for each storage provider.
7+
/// </summary>
8+
object Session { get; }
9+
}
10+
}

src/WorkflowCore/Interface/Persistence/IWorkflowRepository.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace WorkflowCore.Interface
77
{
88
public interface IWorkflowRepository
99
{
10-
Task<string> CreateNewWorkflow(WorkflowInstance workflow);
10+
Task<string> CreateNewWorkflow(WorkflowInstance workflow, ITransaction transaction = null);
1111

1212
Task PersistWorkflow(WorkflowInstance workflow);
1313

src/WorkflowCore/Services/WorkflowController.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public Task<string> StartWorkflow<TData>(string workflowId, TData data = null, s
5353
return StartWorkflow(workflowId, null, data, reference);
5454
}
5555

56-
public async Task<string> StartWorkflow<TData>(string workflowId, int? version, TData data = null, string reference=null)
56+
public async Task<string> StartWorkflow<TData>(string workflowId, int? version, TData data = null, string reference=null, ITransaction transaction = null)
5757
where TData : class, new()
5858
{
5959

@@ -91,7 +91,7 @@ public async Task<string> StartWorkflow<TData>(string workflowId, int? version,
9191
await middlewareRunner.RunPreMiddleware(wf, def);
9292
}
9393

94-
string id = await _persistenceStore.CreateNewWorkflow(wf);
94+
string id = await _persistenceStore.CreateNewWorkflow(wf, transaction);
9595
await _queueProvider.QueueWork(id, QueueType.Workflow);
9696
await _queueProvider.QueueWork(id, QueueType.Index);
9797
await _eventHub.PublishNotification(new WorkflowStarted

src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,17 @@ static void CreateIndexes(MongoPersistenceProvider instance)
122122

123123
private IMongoCollection<ExecutionError> ExecutionErrors => _database.GetCollection<ExecutionError>("wfc.execution_errors");
124124

125-
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
125+
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, ITransaction transaction = null)
126126
{
127-
await WorkflowInstances.InsertOneAsync(workflow);
127+
if (transaction?.Session is IClientSessionHandle clientSessionHandle)
128+
{
129+
await WorkflowInstances.InsertOneAsync(clientSessionHandle, workflow);
130+
}
131+
else
132+
{
133+
await WorkflowInstances.InsertOneAsync(workflow);
134+
}
135+
128136
return workflow.Id;
129137
}
130138

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using MongoDB.Driver;
2+
using WorkflowCore.Interface;
3+
4+
namespace WorkflowCore.Persistence.MongoDB.Services
5+
{
6+
public class MongoTransaction : ITransaction
7+
{
8+
public MongoTransaction(IClientSessionHandle session)
9+
{
10+
Session = session;
11+
}
12+
13+
public object Session { get; }
14+
}
15+
}

0 commit comments

Comments
 (0)