Skip to content

Commit 1f28357

Browse files
committed
add basic REST API wrapper for External Worker API
0 parents  commit 1f28357

12 files changed

+694
-0
lines changed

.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
bin/
2+
obj/
3+
/packages/
4+
riderModule.iml
5+
/_ReSharper.Caches/
6+
.idea
7+
*.DotSettings.user

FlowableExternalWorkerClient.sln

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "FlowableExternalWorkerClient", "FlowableExternalWorkerClient\FlowableExternalWorkerClient.csproj", "{8BF2F4EF-0DE1-4F68-AB1A-7DAB7DB4F2ED}"
4+
EndProject
5+
Global
6+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
7+
Debug|Any CPU = Debug|Any CPU
8+
Release|Any CPU = Release|Any CPU
9+
EndGlobalSection
10+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
11+
{8BF2F4EF-0DE1-4F68-AB1A-7DAB7DB4F2ED}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
12+
{8BF2F4EF-0DE1-4F68-AB1A-7DAB7DB4F2ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
13+
{8BF2F4EF-0DE1-4F68-AB1A-7DAB7DB4F2ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
14+
{8BF2F4EF-0DE1-4F68-AB1A-7DAB7DB4F2ED}.Release|Any CPU.Build.0 = Release|Any CPU
15+
EndGlobalSection
16+
EndGlobal
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace FlowableExternalWorkerClient;
2+
3+
public class EngineRestVariable
4+
{
5+
public string Name { get; }
6+
public string Type { get; }
7+
public string? Value { get; }
8+
public string? ValueUrl { get; }
9+
10+
public EngineRestVariable(string name, string type, string? value, string? valueUrl = null)
11+
{
12+
Name = name;
13+
Type = type;
14+
Value = value;
15+
ValueUrl = valueUrl;
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
namespace FlowableExternalWorkerClient;
2+
3+
public class ExternalWorkerAcquireJobResponse : ExternalWorkerJobResponse
4+
{
5+
public List<EngineRestVariable> Variables { get; }
6+
7+
public ExternalWorkerAcquireJobResponse(string id, string url, string correlationId, string? processInstanceId,
8+
string? executionId, string? scopeId, string? subScopeId, string? scopeDefinitionId, string? scopeType,
9+
string elementId, string elementName, int retries, string? exceptionMessage, DateTime? dueDate,
10+
DateTime createTime, string tenantId, string? lockOwner, DateTime? lockExpirationTime,
11+
List<EngineRestVariable> variables) : base(id, url, correlationId, processInstanceId, executionId, scopeId,
12+
subScopeId, scopeDefinitionId, scopeType, elementId, elementName, retries, exceptionMessage, dueDate,
13+
createTime, tenantId, lockOwner, lockExpirationTime)
14+
{
15+
Variables = variables;
16+
}
17+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
namespace FlowableExternalWorkerClient;
2+
3+
public class ExternalWorkerJobResponse
4+
{
5+
public string Id { get; }
6+
7+
public string Url { get; }
8+
9+
public string CorrelationId { get; }
10+
11+
public string? ProcessInstanceId { get; }
12+
13+
public string? ExecutionId { get; }
14+
15+
public string? ScopeId { get; }
16+
17+
public string? SubScopeId { get; }
18+
19+
public string? ScopeDefinitionId { get; }
20+
21+
public string? ScopeType { get; }
22+
23+
public string ElementId { get; }
24+
25+
public string ElementName { get; }
26+
27+
public int Retries { get; }
28+
29+
public string? ExceptionMessage { get; }
30+
31+
public DateTime? DueDate { get; }
32+
33+
public DateTime CreateTime { get; }
34+
35+
public string TenantId { get; }
36+
37+
public string? LockOwner { get; }
38+
39+
public DateTime? LockExpirationTime { get; }
40+
41+
public ExternalWorkerJobResponse(string id, string url, string correlationId, string? processInstanceId,
42+
string? executionId, string? scopeId, string? subScopeId, string? scopeDefinitionId, string? scopeType,
43+
string elementId, string elementName, int retries, string? exceptionMessage, DateTime? dueDate,
44+
DateTime createTime, string tenantId, string? lockOwner, DateTime? lockExpirationTime)
45+
{
46+
Id = id;
47+
Url = url;
48+
CorrelationId = correlationId;
49+
ProcessInstanceId = processInstanceId;
50+
ExecutionId = executionId;
51+
ScopeId = scopeId;
52+
SubScopeId = subScopeId;
53+
ScopeDefinitionId = scopeDefinitionId;
54+
ScopeType = scopeType;
55+
ElementId = elementId;
56+
ElementName = elementName;
57+
Retries = retries;
58+
ExceptionMessage = exceptionMessage;
59+
DueDate = dueDate;
60+
CreateTime = createTime;
61+
TenantId = tenantId;
62+
LockOwner = lockOwner;
63+
LockExpirationTime = lockExpirationTime;
64+
}
65+
66+
public override string ToString()
67+
{
68+
return $"{nameof(Id)}: {Id}";
69+
}
70+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net7.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
</PropertyGroup>
8+
9+
</Project>
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
using System.Net;
2+
using System.Net.Http.Headers;
3+
using System.Text;
4+
using System.Text.Json;
5+
6+
namespace FlowableExternalWorkerClient;
7+
8+
public class FlowableExternalWorkerRestClient : IFlowableExternalWorkerRestClient
9+
{
10+
private const string _jobApi = "/external-job-api";
11+
12+
private readonly HttpClient _httpClient;
13+
private readonly string _flowableHost;
14+
private readonly string _workerId;
15+
16+
private readonly JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions
17+
{
18+
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
19+
IncludeFields = true
20+
};
21+
22+
public FlowableExternalWorkerRestClient(
23+
string flowableHost,
24+
string workerId,
25+
AuthenticationHeaderValue? authentication = null,
26+
IHttpClientCustomizer? customizer = null
27+
)
28+
{
29+
_httpClient = new HttpClient();
30+
if (authentication != null)
31+
{
32+
_httpClient.DefaultRequestHeaders.Authorization = authentication;
33+
}
34+
35+
if (customizer != null)
36+
{
37+
customizer.Customize(_httpClient);
38+
}
39+
40+
_flowableHost = flowableHost;
41+
_workerId = workerId;
42+
}
43+
44+
public async Task<Page<ExternalWorkerJobResponse>> ListJobs()
45+
{
46+
var response = await _httpClient.GetStreamAsync(_flowableHost + _jobApi + "/jobs");
47+
return await JsonSerializer.DeserializeAsync<Page<ExternalWorkerJobResponse>>(response, _jsonSerializerOptions)
48+
?? throw new Exception("Failed to convert response to JSON structure");
49+
}
50+
51+
public async Task<ExternalWorkerJobResponse> GetJob(string jobId)
52+
{
53+
var response = await _httpClient.GetStreamAsync(_flowableHost + _jobApi + "/jobs/" + jobId);
54+
return await JsonSerializer.DeserializeAsync<ExternalWorkerJobResponse>(response, _jsonSerializerOptions)
55+
?? throw new Exception("Failed to convert response to JSON structure");
56+
}
57+
58+
public async Task<List<ExternalWorkerAcquireJobResponse>> AcquireJobs(string topic, string lockDuration,
59+
int numberOfTasks = 1, int numberOfRetries = 5,
60+
string? workerId = null, string? scopeType = null)
61+
{
62+
var acquireJobRequest = new AcquireJobRequest(
63+
topic,
64+
lockDuration,
65+
numberOfTasks,
66+
numberOfRetries,
67+
workerId ?? _workerId,
68+
scopeType
69+
);
70+
var requestBody = PrepareRequestBody(acquireJobRequest);
71+
72+
var response = await _httpClient.PostAsync(_flowableHost + _jobApi + "/acquire/jobs", requestBody);
73+
await VerifyStatusCode(response, HttpStatusCode.OK);
74+
75+
var responseStream = await response.Content.ReadAsStreamAsync();
76+
77+
return await JsonSerializer.DeserializeAsync<List<ExternalWorkerAcquireJobResponse>>(
78+
responseStream, _jsonSerializerOptions
79+
) ?? throw new Exception("Failed to convert response to JSON structure");
80+
}
81+
82+
public async Task CompleteJob(string jobId, List<EngineRestVariable>? variables = null, string? workerId = null)
83+
{
84+
var completeJobRequest = new CompleteJobRequest(
85+
workerId ?? _workerId,
86+
variables
87+
);
88+
89+
var requestBody = PrepareRequestBody(completeJobRequest);
90+
var response = await _httpClient.PostAsync(
91+
_flowableHost + _jobApi + "/acquire/jobs/" + jobId + "/complete",
92+
requestBody
93+
);
94+
95+
await VerifyStatusCode(response, HttpStatusCode.NoContent);
96+
}
97+
98+
public async Task JobWithBpmnError(string jobId, List<EngineRestVariable>? variables = null,
99+
string? errorCode = null,
100+
string? workerId = null)
101+
{
102+
var jobWithBpmnErrorRequest = new JobWithBpmnErrorRequest(
103+
workerId ?? _workerId,
104+
errorCode,
105+
variables
106+
);
107+
108+
var requestBody = PrepareRequestBody(jobWithBpmnErrorRequest);
109+
var response = await _httpClient.PostAsync(
110+
_flowableHost + _jobApi + "/acquire/jobs/" + jobId + "/bpmnError",
111+
requestBody
112+
);
113+
114+
await VerifyStatusCode(response, HttpStatusCode.NoContent);
115+
}
116+
117+
public async Task JobWithCmmnTerminate(string jobId, List<EngineRestVariable>? variables = null, string? workerId = null)
118+
{
119+
var cmmnWithTerminateRequest = new CmmnWithTerminateRequest(
120+
workerId ?? _workerId,
121+
variables
122+
);
123+
124+
var requestBody = PrepareRequestBody(cmmnWithTerminateRequest);
125+
var response = await _httpClient.PostAsync(
126+
_flowableHost + _jobApi + "/acquire/jobs/" + jobId + "/cmmnTerminate",
127+
requestBody
128+
);
129+
130+
await VerifyStatusCode(response, HttpStatusCode.NoContent);
131+
}
132+
133+
public async Task FailJob(string jobId, string? errorMessage = null, string? errorDetails = null,
134+
int? retries = null,
135+
string? retryTimeout = null, string? workerId = null)
136+
{
137+
var failJobRequest = new FailJobRequest(
138+
workerId ?? _workerId,
139+
errorMessage,
140+
errorDetails,
141+
retries,
142+
retryTimeout
143+
);
144+
145+
var requestBody = PrepareRequestBody(failJobRequest);
146+
var response = await _httpClient.PostAsync(
147+
_flowableHost + _jobApi + "/acquire/jobs/" + jobId + "/fail",
148+
requestBody
149+
);
150+
151+
await VerifyStatusCode(response, HttpStatusCode.NoContent);
152+
}
153+
154+
protected async Task VerifyStatusCode(HttpResponseMessage response, HttpStatusCode expectedStatusCode)
155+
{
156+
if (response.StatusCode != expectedStatusCode)
157+
{
158+
var body = Encoding.Default.GetString(await response.Content.ReadAsByteArrayAsync());
159+
throw new Exception(
160+
"Failed to execute request with status code '" + response.StatusCode + "' and body: '" + body + "'"
161+
);
162+
}
163+
}
164+
165+
protected StringContent PrepareRequestBody<T>(T requestObject)
166+
{
167+
var serializedAcquireJob = JsonSerializer.Serialize(requestObject, _jsonSerializerOptions);
168+
var stringContent = new StringContent(serializedAcquireJob, Encoding.UTF8, "application/json");
169+
return stringContent;
170+
}
171+
}
172+
173+
class AcquireJobRequest
174+
{
175+
public string Topic;
176+
public string LockDuration;
177+
public int NumberOfTasks;
178+
public int NumberOfRetries;
179+
public string WorkerId;
180+
public string? ScopeType;
181+
182+
public AcquireJobRequest(string topic, string lockDuration, int numberOfTasks, int numberOfRetries, string workerId,
183+
string? scopeType)
184+
{
185+
Topic = topic;
186+
LockDuration = lockDuration;
187+
NumberOfTasks = numberOfTasks;
188+
NumberOfRetries = numberOfRetries;
189+
WorkerId = workerId;
190+
ScopeType = scopeType;
191+
}
192+
}
193+
194+
class CompleteJobRequest
195+
{
196+
public string WorkerId;
197+
public List<EngineRestVariable>? Variables;
198+
199+
public CompleteJobRequest(string workerId, List<EngineRestVariable>? variables)
200+
{
201+
WorkerId = workerId;
202+
Variables = variables;
203+
}
204+
}
205+
206+
class JobWithBpmnErrorRequest
207+
{
208+
public string WorkerId;
209+
public string? ErrorCode;
210+
public List<EngineRestVariable>? Variables;
211+
212+
public JobWithBpmnErrorRequest(string workerId, string? errorCode, List<EngineRestVariable>? variables)
213+
{
214+
WorkerId = workerId;
215+
ErrorCode = errorCode;
216+
Variables = variables;
217+
}
218+
}
219+
220+
class CmmnWithTerminateRequest
221+
{
222+
public string WorkerId;
223+
public List<EngineRestVariable>? Variables;
224+
225+
public CmmnWithTerminateRequest(string workerId, List<EngineRestVariable>? variables)
226+
{
227+
WorkerId = workerId;
228+
Variables = variables;
229+
}
230+
}
231+
232+
class FailJobRequest
233+
{
234+
public string WorkerId;
235+
public string? ErrorMessage;
236+
public string? ErrorDetails;
237+
public int? Retries;
238+
public string? RetryTimeout;
239+
240+
public FailJobRequest(string workerId, string? errorMessage, string? errorDetails, int? retries,
241+
string? retryTimeout)
242+
{
243+
WorkerId = workerId;
244+
ErrorMessage = errorMessage;
245+
ErrorDetails = errorDetails;
246+
Retries = retries;
247+
RetryTimeout = retryTimeout;
248+
}
249+
}

0 commit comments

Comments
 (0)