Skip to content

Commit f067000

Browse files
authored
Add server side cron schedule support (#248)
* Add cron parser library and plumb through cronSchedule in workflow options * Add cronSchedule in start workflow params and last result in sync decision context * Add test framework change and unit test for cron schedule * Fix execution timer delay calculation in test framework * Implement getLasCompletionResult in test framework * Add java docs * Address review comments * Fix unit test
1 parent 7cd5617 commit f067000

20 files changed

+464
-145
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ dependencies {
5151
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
5252
compile group: 'com.uber.m3', name: 'tally-core', version: '0.2.3'
5353
compile group: 'com.google.guava', name: 'guava', version: '27.0.1-jre'
54+
compile group: 'com.cronutils', name: 'cron-utils', version: '8.0.0'
5455
testCompile group: 'junit', name: 'junit', version: '4.12'
5556
testCompile group: 'com.googlecode.junit-toolbox', name: 'junit-toolbox', version: '2.4'
5657
testCompile group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

src/main/java/com/uber/cadence/client/WorkflowOptions.java

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@
1919

2020
import static com.uber.cadence.internal.common.OptionsUtils.roundUpToSeconds;
2121

22+
import com.cronutils.model.Cron;
23+
import com.cronutils.model.CronType;
24+
import com.cronutils.model.definition.CronDefinition;
25+
import com.cronutils.model.definition.CronDefinitionBuilder;
26+
import com.cronutils.parser.CronParser;
27+
import com.google.common.base.Strings;
2228
import com.uber.cadence.ChildPolicy;
2329
import com.uber.cadence.WorkflowIdReusePolicy;
30+
import com.uber.cadence.common.CronSchedule;
2431
import com.uber.cadence.common.MethodRetry;
2532
import com.uber.cadence.common.RetryOptions;
2633
import com.uber.cadence.internal.common.OptionsUtils;
@@ -31,13 +38,14 @@
3138
public final class WorkflowOptions {
3239

3340
public static WorkflowOptions merge(
34-
WorkflowMethod a, MethodRetry methodRetry, WorkflowOptions o) {
41+
WorkflowMethod a, MethodRetry methodRetry, CronSchedule cronSchedule, WorkflowOptions o) {
3542
if (a == null) {
3643
return new WorkflowOptions.Builder(o).validateBuildWithDefaults();
3744
}
3845
if (o == null) {
3946
o = new WorkflowOptions.Builder().build();
4047
}
48+
String cronAnnotation = cronSchedule == null ? "" : cronSchedule.value();
4149
return new WorkflowOptions.Builder()
4250
.setWorkflowIdReusePolicy(
4351
OptionsUtils.merge(
@@ -53,6 +61,7 @@ public static WorkflowOptions merge(
5361
.setTaskList(OptionsUtils.merge(a.taskList(), o.getTaskList(), String.class))
5462
.setChildPolicy(o.getChildPolicy())
5563
.setRetryOptions(RetryOptions.merge(methodRetry, o.getRetryOptions()))
64+
.setCronSchedule(OptionsUtils.merge(cronAnnotation, o.getCronSchedule(), String.class))
5665
.validateBuildWithDefaults();
5766
}
5867

@@ -72,6 +81,8 @@ public static final class Builder {
7281

7382
private RetryOptions retryOptions;
7483

84+
private String cronSchedule;
85+
7586
public Builder() {}
7687

7788
public Builder(WorkflowOptions o) {
@@ -85,6 +96,7 @@ public Builder(WorkflowOptions o) {
8596
this.taskList = o.taskList;
8697
this.childPolicy = o.childPolicy;
8798
this.retryOptions = o.retryOptions;
99+
this.cronSchedule = o.cronSchedule;
88100
}
89101

90102
/**
@@ -164,6 +176,11 @@ public Builder setRetryOptions(RetryOptions retryOptions) {
164176
return this;
165177
}
166178

179+
public Builder setCronSchedule(String cronSchedule) {
180+
this.cronSchedule = cronSchedule;
181+
return this;
182+
}
183+
167184
public WorkflowOptions build() {
168185
return new WorkflowOptions(
169186
workflowId,
@@ -172,7 +189,8 @@ public WorkflowOptions build() {
172189
taskStartToCloseTimeout,
173190
taskList,
174191
childPolicy,
175-
retryOptions);
192+
retryOptions,
193+
cronSchedule);
176194
}
177195

178196
/**
@@ -200,6 +218,14 @@ public WorkflowOptions validateBuildWithDefaults() {
200218
"RetryOptions must specify either expiration or maximum attempts");
201219
}
202220
}
221+
222+
if (!Strings.isNullOrEmpty(cronSchedule)) {
223+
CronDefinition cronDefinition = CronDefinitionBuilder.instanceDefinitionFor(CronType.UNIX);
224+
CronParser parser = new CronParser(cronDefinition);
225+
Cron cron = parser.parse(cronSchedule);
226+
cron.validate();
227+
}
228+
203229
return new WorkflowOptions(
204230
workflowId,
205231
policy,
@@ -208,7 +234,8 @@ public WorkflowOptions validateBuildWithDefaults() {
208234
taskStartToCloseTimeout, OptionsUtils.DEFAULT_TASK_START_TO_CLOSE_TIMEOUT),
209235
taskList,
210236
childPolicy,
211-
retryOptions);
237+
retryOptions,
238+
cronSchedule);
212239
}
213240
}
214241

@@ -226,21 +253,25 @@ public WorkflowOptions validateBuildWithDefaults() {
226253

227254
private RetryOptions retryOptions;
228255

256+
private String cronSchedule;
257+
229258
private WorkflowOptions(
230259
String workflowId,
231260
WorkflowIdReusePolicy workflowIdReusePolicy,
232261
Duration executionStartToCloseTimeout,
233262
Duration taskStartToCloseTimeout,
234263
String taskList,
235264
ChildPolicy childPolicy,
236-
RetryOptions retryOptions) {
265+
RetryOptions retryOptions,
266+
String cronSchedule) {
237267
this.workflowId = workflowId;
238268
this.workflowIdReusePolicy = workflowIdReusePolicy;
239269
this.executionStartToCloseTimeout = executionStartToCloseTimeout;
240270
this.taskStartToCloseTimeout = taskStartToCloseTimeout;
241271
this.taskList = taskList;
242272
this.childPolicy = childPolicy;
243273
this.retryOptions = retryOptions;
274+
this.cronSchedule = cronSchedule;
244275
}
245276

246277
public String getWorkflowId() {
@@ -271,6 +302,10 @@ public RetryOptions getRetryOptions() {
271302
return retryOptions;
272303
}
273304

305+
public String getCronSchedule() {
306+
return cronSchedule;
307+
}
308+
274309
@Override
275310
public boolean equals(Object o) {
276311
if (this == o) return true;
@@ -282,7 +317,8 @@ public boolean equals(Object o) {
282317
&& Objects.equals(taskStartToCloseTimeout, that.taskStartToCloseTimeout)
283318
&& Objects.equals(taskList, that.taskList)
284319
&& childPolicy == that.childPolicy
285-
&& Objects.equals(retryOptions, that.retryOptions);
320+
&& Objects.equals(retryOptions, that.retryOptions)
321+
&& Objects.equals(cronSchedule, that.cronSchedule);
286322
}
287323

288324
@Override
@@ -294,7 +330,8 @@ public int hashCode() {
294330
taskStartToCloseTimeout,
295331
taskList,
296332
childPolicy,
297-
retryOptions);
333+
retryOptions,
334+
cronSchedule);
298335
}
299336

300337
@Override
@@ -316,6 +353,9 @@ public String toString() {
316353
+ childPolicy
317354
+ ", retryOptions="
318355
+ retryOptions
356+
+ ", cronSchedule='"
357+
+ cronSchedule
358+
+ '\''
319359
+ '}';
320360
}
321361
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.common;
19+
20+
import java.lang.annotation.ElementType;
21+
import java.lang.annotation.Retention;
22+
import java.lang.annotation.RetentionPolicy;
23+
import java.lang.annotation.Target;
24+
25+
/**
26+
* CronSchedule - Optional cron schedule for workflow. If a cron schedule is specified, the workflow
27+
* will run as a cron based on the schedule. The scheduling will be based on UTC time. Schedule for
28+
* next run only happen after the current run is completed/failed/timeout. If a RetryPolicy is also
29+
* supplied, and the workflow failed or timeout, the workflow will be retried based on the retry
30+
* policy. While the workflow is retrying, it won't schedule its next run. If next schedule is due
31+
* while workflow is running (or retrying), then it will skip that schedule. Cron workflow will not
32+
* stop until it is terminated or cancelled.
33+
*/
34+
@Retention(RetentionPolicy.RUNTIME)
35+
@Target(ElementType.METHOD)
36+
public @interface CronSchedule {
37+
String value();
38+
}

src/main/java/com/uber/cadence/internal/common/StartWorkflowExecutionParameters.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public final class StartWorkflowExecutionParameters {
4848

4949
private RetryParameters retryParameters;
5050

51+
private String cronSchedule;
52+
5153
/**
5254
* Returns the value of the WorkflowId property for this object.
5355
*
@@ -282,13 +284,20 @@ public void setRetryParameters(RetryParameters retryParameters) {
282284
this.retryParameters = retryParameters;
283285
}
284286

287+
public String getCronSchedule() {
288+
return cronSchedule;
289+
}
290+
291+
public void setCronSchedule(String cronSchedule) {
292+
this.cronSchedule = cronSchedule;
293+
}
294+
285295
public StartWorkflowExecutionParameters withRetryParameters(RetryParameters retryParameters) {
286296
this.retryParameters = retryParameters;
287297
return this;
288298
}
289299

290-
public static StartWorkflowExecutionParameters createStartWorkflowExecutionParametersFromOptions(
291-
WorkflowOptions options) {
300+
public static StartWorkflowExecutionParameters fromWorkflowOptions(WorkflowOptions options) {
292301
StartWorkflowExecutionParameters parameters = new StartWorkflowExecutionParameters();
293302
parameters.setExecutionStartToCloseTimeoutSeconds(
294303
getSeconds(options.getExecutionStartToCloseTimeout()));
@@ -314,6 +323,10 @@ public static StartWorkflowExecutionParameters createStartWorkflowExecutionParam
314323
}
315324
parameters.setRetryParameters(rp);
316325
}
326+
327+
if (options.getCronSchedule() != null) {
328+
parameters.setCronSchedule(options.getCronSchedule());
329+
}
317330
return parameters;
318331
}
319332

@@ -347,6 +360,9 @@ public String toString() {
347360
+ workflowIdReusePolicy
348361
+ ", retryParameters="
349362
+ retryParameters
363+
+ ", cronSchedule='"
364+
+ cronSchedule
365+
+ '\''
350366
+ '}';
351367
}
352368

@@ -363,7 +379,8 @@ public boolean equals(Object o) {
363379
&& Arrays.equals(input, that.input)
364380
&& childPolicy == that.childPolicy
365381
&& workflowIdReusePolicy == that.workflowIdReusePolicy
366-
&& Objects.equals(retryParameters, that.retryParameters);
382+
&& Objects.equals(retryParameters, that.retryParameters)
383+
&& Objects.equals(cronSchedule, that.cronSchedule);
367384
}
368385

369386
@Override
@@ -377,21 +394,9 @@ public int hashCode() {
377394
taskStartToCloseTimeoutSeconds,
378395
childPolicy,
379396
workflowIdReusePolicy,
380-
retryParameters);
397+
retryParameters,
398+
cronSchedule);
381399
result = 31 * result + Arrays.hashCode(input);
382400
return result;
383401
}
384-
385-
public StartWorkflowExecutionParameters copy() {
386-
StartWorkflowExecutionParameters result = new StartWorkflowExecutionParameters();
387-
result.setInput(input);
388-
result.setExecutionStartToCloseTimeoutSeconds(executionStartToCloseTimeoutSeconds);
389-
result.setTaskStartToCloseTimeoutSeconds(taskStartToCloseTimeoutSeconds);
390-
result.setTaskList(taskList);
391-
result.setWorkflowId(workflowId);
392-
result.setWorkflowType(workflowType);
393-
result.setChildPolicy(childPolicy);
394-
result.setRetryParameters(retryParameters.copy());
395-
return result;
396-
}
397402
}

src/main/java/com/uber/cadence/internal/external/GenericWorkflowClientExternalImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.external;
1919

20+
import com.google.common.base.Strings;
2021
import com.uber.cadence.*;
2122
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
2223
import com.uber.cadence.internal.common.RetryParameters;
@@ -109,6 +110,9 @@ private WorkflowExecution startWorkflowInternal(StartWorkflowExecutionParameters
109110
.setNonRetriableErrorReasons(retryParameters.getNonRetriableErrorReasons());
110111
request.setRetryPolicy(retryPolicy);
111112
}
113+
if (!Strings.isNullOrEmpty(startParameters.getCronSchedule())) {
114+
request.setCronSchedule(startParameters.getCronSchedule());
115+
}
112116

113117
// if(startParameters.getChildPolicy() != null) {
114118
// request.setChildPolicy(startParameters.getChildPolicy());

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public Thread newThread(Runnable r) {
175175

176176
private static SyncDecisionContext newDummySyncDecisionContext() {
177177
return new SyncDecisionContext(
178-
new DummyDecisionContext(), JsonDataConverter.getInstance(), (next) -> next);
178+
new DummyDecisionContext(), JsonDataConverter.getInstance(), (next) -> next, null);
179179
}
180180

181181
SyncDecisionContext getDecisionContext() {

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,13 @@ final class SyncDecisionContext implements WorkflowInterceptor {
7979
private final WorkflowInterceptor headInterceptor;
8080
private final WorkflowTimers timers = new WorkflowTimers();
8181
private final Map<String, Functions.Func1<byte[], byte[]>> queryCallbacks = new HashMap<>();
82+
private final byte[] lastCompletionResult;
8283

8384
public SyncDecisionContext(
8485
DecisionContext context,
8586
DataConverter converter,
86-
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory) {
87+
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
88+
byte[] lastCompletionResult) {
8789
this.context = context;
8890
this.converter = converter;
8991
WorkflowInterceptor interceptor = interceptorFactory.apply(this);
@@ -92,6 +94,7 @@ public SyncDecisionContext(
9294
interceptor = this;
9395
}
9496
this.headInterceptor = interceptor;
97+
this.lastCompletionResult = lastCompletionResult;
9598
}
9699

97100
/**
@@ -593,4 +596,13 @@ public Scope getMetricsScope() {
593596
public boolean isLoggingEnabledInReplay() {
594597
return context.getEnableLoggingInReplay();
595598
}
599+
600+
public <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
601+
if (lastCompletionResult == null || lastCompletionResult.length == 0) {
602+
return null;
603+
}
604+
605+
DataConverter dataConverter = getDataConverter();
606+
return dataConverter.fromData(lastCompletionResult, resultClass, resultType);
607+
}
596608
}

src/main/java/com/uber/cadence/internal/sync/SyncWorkflow.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,13 +78,18 @@ public void start(HistoryEvent event, DecisionContext context) {
7878
if (workflow == null) {
7979
throw new IllegalArgumentException("Unknown workflow type: " + workflowType);
8080
}
81-
SyncDecisionContext syncContext =
82-
new SyncDecisionContext(context, dataConverter, interceptorFactory);
8381
if (event.getEventType() != EventType.WorkflowExecutionStarted) {
8482
throw new IllegalArgumentException(
8583
"first event is not WorkflowExecutionStarted, but " + event.getEventType());
8684
}
8785

86+
SyncDecisionContext syncContext =
87+
new SyncDecisionContext(
88+
context,
89+
dataConverter,
90+
interceptorFactory,
91+
event.getWorkflowExecutionStartedEventAttributes().getLastCompletionResult());
92+
8893
workflowProc =
8994
new WorkflowRunnable(
9095
syncContext, workflow, event.getWorkflowExecutionStartedEventAttributes());

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,4 +359,8 @@ public static Logger getLogger(String name) {
359359
return new ReplayAwareLogger(
360360
logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
361361
}
362+
363+
public static <R> R getLastCompletionResult(Class<R> resultClass, Type resultType) {
364+
return getRootDecisionContext().getLastCompletionResult(resultClass, resultType);
365+
}
362366
}

0 commit comments

Comments
 (0)