Skip to content

Commit 144dc73

Browse files
committed
create first version of high level client
1 parent 1f28357 commit 144dc73

14 files changed

+387
-14
lines changed
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
using System.Diagnostics;
2+
using System.Net.Http.Headers;
3+
using FlowableExternalWorkerClient.Rest;
4+
5+
namespace FlowableExternalWorkerClient.Client;
6+
7+
public class ExternalWorkerClient : IExternalWorkerClient
8+
{
9+
private FlowableExternalWorkerRestClient _flowableExternalWorkerRestClient;
10+
11+
public ExternalWorkerClient(
12+
string flowableHost = "https://cloud.flowable.com/work",
13+
string? workerId = null,
14+
AuthenticationHeaderValue? authentication = null,
15+
IHttpClientCustomizer? customizer = null
16+
)
17+
{
18+
_flowableExternalWorkerRestClient = new FlowableExternalWorkerRestClient(
19+
flowableHost,
20+
workerId ?? GetDefaultWorkerId(),
21+
authentication,
22+
customizer
23+
);
24+
}
25+
26+
public IExternalWorkerSubscription Subscribe(
27+
string topic,
28+
IExternalWorkerCallbackHandler callbackHandler,
29+
string lockDuration = "PT1M",
30+
int numberOfRetries = 5,
31+
string? scopeType = null,
32+
int waitPeriodSeconds = 30,
33+
int numberOfTasks = 1
34+
)
35+
{
36+
var externalWorkerSubscription = new ExternalWorkerSubscription(
37+
_flowableExternalWorkerRestClient,
38+
topic,
39+
callbackHandler,
40+
lockDuration,
41+
numberOfRetries,
42+
scopeType,
43+
waitPeriodSeconds,
44+
numberOfTasks
45+
);
46+
externalWorkerSubscription.Start();
47+
return externalWorkerSubscription;
48+
}
49+
50+
protected string GetDefaultWorkerId()
51+
{
52+
return System.Net.Dns.GetHostName() + "-" + Process.GetCurrentProcess().Id;
53+
}
54+
}
55+
56+
class ExternalWorkerSubscription : IExternalWorkerSubscription
57+
{
58+
protected bool Subscribed = true;
59+
protected readonly Thread Thread;
60+
protected readonly FlowableExternalWorkerRestClient FlowableExternalWorkerRestClient;
61+
protected readonly string Topic;
62+
protected readonly IExternalWorkerCallbackHandler ExternalWorkerCallbackHandler;
63+
protected readonly string LockDuration;
64+
protected readonly int NumberOfRetries;
65+
protected readonly string? ScopeType;
66+
protected int WaitPeriodSeconds;
67+
protected readonly int NumberOfTasks;
68+
69+
70+
public ExternalWorkerSubscription(
71+
FlowableExternalWorkerRestClient flowableExternalWorkerRestClient,
72+
string topic,
73+
IExternalWorkerCallbackHandler callbackHandler,
74+
string lockDuration,
75+
int numberOfRetries,
76+
string? scopeType,
77+
int waitPeriodSeconds,
78+
int numberOfTasks
79+
)
80+
{
81+
Thread = new Thread(DoConsume);
82+
FlowableExternalWorkerRestClient = flowableExternalWorkerRestClient;
83+
Topic = topic;
84+
ExternalWorkerCallbackHandler = callbackHandler;
85+
LockDuration = lockDuration;
86+
NumberOfRetries = numberOfRetries;
87+
ScopeType = scopeType;
88+
WaitPeriodSeconds = waitPeriodSeconds;
89+
NumberOfTasks = numberOfTasks;
90+
}
91+
92+
public async void DoConsume()
93+
{
94+
do
95+
{
96+
var acquiredJobs = await FlowableExternalWorkerRestClient.AcquireJobs(
97+
Topic, LockDuration, NumberOfTasks, NumberOfRetries, null, ScopeType
98+
);
99+
foreach (var job in acquiredJobs)
100+
{
101+
var workResult = ExternalWorkerCallbackHandler.Handle(job, new WorkResultBuilder(job));
102+
if (workResult == null)
103+
{
104+
await FlowableExternalWorkerRestClient.CompleteJob(job.Id);
105+
}
106+
else
107+
{
108+
await workResult.Execute(FlowableExternalWorkerRestClient);
109+
}
110+
}
111+
112+
// Only wait in case we did not process jobs, otherwise continue directly
113+
if (acquiredJobs.Count == 0)
114+
{
115+
Thread.Sleep(WaitPeriodSeconds * 1000);
116+
}
117+
} while (Subscribed);
118+
}
119+
120+
public void Start()
121+
{
122+
Thread.Start();
123+
}
124+
125+
public void Unsubscribe()
126+
{
127+
Subscribed = false;
128+
Thread.Join();
129+
}
130+
}
131+
132+
class WorkResultBuilder : IWorkResultBuilder
133+
{
134+
protected readonly ExternalWorkerAcquireJobResponse Job;
135+
136+
public WorkResultBuilder(ExternalWorkerAcquireJobResponse job)
137+
{
138+
Job = job;
139+
}
140+
141+
public WorkResultSuccess Success()
142+
{
143+
return new WorkResultSuccess(Job);
144+
}
145+
146+
public WorkResultFailure Failure()
147+
{
148+
return new WorkResultFailure(Job);
149+
}
150+
151+
public WorkResultBpmnError BpmnError()
152+
{
153+
return new WorkResultBpmnError(Job);
154+
}
155+
156+
public WorkResultCmmnTerminate CmmnTerminate()
157+
{
158+
return new WorkResultCmmnTerminate(Job);
159+
}
160+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using FlowableExternalWorkerClient.Rest;
2+
3+
namespace FlowableExternalWorkerClient.Client;
4+
5+
public interface IExternalWorkerCallbackHandler
6+
{
7+
IWorkResult? Handle(ExternalWorkerAcquireJobResponse job, IWorkResultBuilder work);
8+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace FlowableExternalWorkerClient.Client;
2+
3+
public interface IExternalWorkerClient
4+
{
5+
IExternalWorkerSubscription Subscribe(
6+
string topic,
7+
IExternalWorkerCallbackHandler callbackHandler,
8+
string lockDuration = "PT1M",
9+
int numberOfRetries = 5,
10+
string? scopeType = null,
11+
int waitPeriodSeconds = 30,
12+
int numberOfTasks = 1
13+
);
14+
15+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
namespace FlowableExternalWorkerClient.Client;
2+
3+
public interface IExternalWorkerSubscription
4+
{
5+
void Unsubscribe();
6+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace FlowableExternalWorkerClient.Client;
2+
3+
public interface IWorkResultBuilder
4+
{
5+
WorkResultSuccess Success();
6+
7+
WorkResultFailure Failure();
8+
9+
WorkResultBpmnError BpmnError();
10+
11+
WorkResultCmmnTerminate CmmnTerminate();
12+
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
using FlowableExternalWorkerClient.Rest;
2+
3+
namespace FlowableExternalWorkerClient.Client;
4+
5+
public interface IWorkResult
6+
{
7+
Task Execute(IFlowableExternalWorkerRestClient flowableExternalWorkerRestClient);
8+
}
9+
10+
public class WorkResultSuccess : WorkResultWithVariables<WorkResultSuccess>
11+
{
12+
protected readonly string _jobId;
13+
14+
public WorkResultSuccess(ExternalWorkerAcquireJobResponse job)
15+
{
16+
_jobId = job.Id;
17+
}
18+
19+
public override Task Execute(IFlowableExternalWorkerRestClient flowableExternalWorkerRestClient)
20+
{
21+
return flowableExternalWorkerRestClient.CompleteJob(_jobId, _variables);
22+
}
23+
24+
protected override WorkResultSuccess GetThis()
25+
{
26+
return this;
27+
}
28+
}
29+
30+
public class WorkResultFailure : IWorkResult
31+
{
32+
protected readonly string _jobId;
33+
protected string? _errorMessage;
34+
protected string? _errorDetails;
35+
protected int? _retries;
36+
protected string? _retryTimeout;
37+
38+
public WorkResultFailure(ExternalWorkerAcquireJobResponse job)
39+
{
40+
_jobId = job.Id;
41+
}
42+
43+
WorkResultFailure ErrorMessage(string errorMessage)
44+
{
45+
_errorMessage = errorMessage;
46+
return this;
47+
}
48+
49+
WorkResultFailure ErrorDetails(string errorDetails)
50+
{
51+
_errorDetails = errorDetails;
52+
return this;
53+
}
54+
55+
WorkResultFailure Retries(int retries)
56+
{
57+
_retries = retries;
58+
return this;
59+
}
60+
61+
WorkResultFailure RetryTimeout(string retryTimeout)
62+
{
63+
_retryTimeout = retryTimeout;
64+
return this;
65+
}
66+
67+
public Task Execute(IFlowableExternalWorkerRestClient flowableExternalWorkerRestClient)
68+
{
69+
return flowableExternalWorkerRestClient.FailJob(_jobId, _errorMessage, _errorDetails, _retries, _retryTimeout);
70+
}
71+
}
72+
73+
public class WorkResultBpmnError : WorkResultWithVariables<WorkResultBpmnError>
74+
{
75+
protected readonly string _jobId;
76+
protected string? _errorCode;
77+
78+
public WorkResultBpmnError(ExternalWorkerAcquireJobResponse job)
79+
{
80+
_jobId = job.Id;
81+
}
82+
83+
public WorkResultBpmnError ErrorCode(string errorCode)
84+
{
85+
_errorCode = errorCode;
86+
return this;
87+
}
88+
89+
protected override WorkResultBpmnError GetThis()
90+
{
91+
return this;
92+
}
93+
94+
public override Task Execute(IFlowableExternalWorkerRestClient flowableExternalWorkerRestClient)
95+
{
96+
return flowableExternalWorkerRestClient.JobWithBpmnError(_jobId, _variables, _errorCode);
97+
}
98+
}
99+
100+
public class WorkResultCmmnTerminate : WorkResultWithVariables<WorkResultCmmnTerminate>
101+
{
102+
private string _jobId;
103+
104+
public WorkResultCmmnTerminate(ExternalWorkerAcquireJobResponse job)
105+
{
106+
_jobId = job.Id;
107+
}
108+
109+
protected override WorkResultCmmnTerminate GetThis()
110+
{
111+
return this;
112+
}
113+
114+
public override Task Execute(IFlowableExternalWorkerRestClient flowableExternalWorkerRestClient)
115+
{
116+
return flowableExternalWorkerRestClient.JobWithCmmnTerminate(_jobId, _variables);
117+
}
118+
}
119+
120+
public abstract class WorkResultWithVariables<T> : IWorkResult where T : WorkResultWithVariables<T>
121+
{
122+
123+
protected List<EngineRestVariable> _variables = new();
124+
125+
protected abstract T GetThis();
126+
127+
public abstract Task Execute(IFlowableExternalWorkerRestClient flowableExternalWorkerRestClient);
128+
129+
public T Variable(string name, string value)
130+
{
131+
_variables.Add(new EngineRestVariable(name, "string", value));
132+
return GetThis();
133+
}
134+
135+
public T Variable(string name, int value)
136+
{
137+
_variables.Add(new EngineRestVariable(name, "integer", value));
138+
return GetThis();
139+
}
140+
141+
public T Variable(string name, float value)
142+
{
143+
_variables.Add(new EngineRestVariable(name, "double", value));
144+
return GetThis();
145+
}
146+
147+
public T Variable(string name, double value)
148+
{
149+
_variables.Add(new EngineRestVariable(name, "double", value));
150+
return GetThis();
151+
}
152+
153+
public T Variable(string name, bool value)
154+
{
155+
_variables.Add(new EngineRestVariable(name, "boolean", value));
156+
return GetThis();
157+
}
158+
159+
public T Variable(string name, object value)
160+
{
161+
_variables.Add(new EngineRestVariable(name, "json", value));
162+
return GetThis();
163+
}
164+
165+
}

FlowableExternalWorkerClient/EngineRestVariable.cs renamed to FlowableExternalWorkerClient/Rest/EngineRestVariable.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
namespace FlowableExternalWorkerClient;
1+
namespace FlowableExternalWorkerClient.Rest;
22

33
public class EngineRestVariable
44
{
55
public string Name { get; }
66
public string Type { get; }
7-
public string? Value { get; }
7+
public object? Value { get; }
88
public string? ValueUrl { get; }
99

10-
public EngineRestVariable(string name, string type, string? value, string? valueUrl = null)
10+
public EngineRestVariable(string name, string type, object? value, string? valueUrl = null)
1111
{
1212
Name = name;
1313
Type = type;

0 commit comments

Comments
 (0)