@@ -399,7 +399,7 @@ public void testSync() {
399
399
"executeActivity TestActivities::activity2" );
400
400
}
401
401
402
- public static class TestActivityRetry implements TestWorkflow1 {
402
+ public static class TestActivityRetryWithMaxAttempts implements TestWorkflow1 {
403
403
404
404
@ Override
405
405
@ SuppressWarnings ("Finally" )
@@ -408,12 +408,9 @@ public String execute(String taskList) {
408
408
new ActivityOptions .Builder ()
409
409
.setTaskList (taskList )
410
410
.setHeartbeatTimeout (Duration .ofSeconds (5 ))
411
- .setScheduleToCloseTimeout (Duration .ofSeconds (5 ))
412
- .setScheduleToStartTimeout (Duration .ofSeconds (5 ))
413
- .setStartToCloseTimeout (Duration .ofSeconds (10 ))
411
+ .setScheduleToCloseTimeout (Duration .ofSeconds (3 ))
414
412
.setRetryOptions (
415
413
new RetryOptions .Builder ()
416
- .setExpiration (Duration .ofSeconds (100 ))
417
414
.setMaximumInterval (Duration .ofSeconds (1 ))
418
415
.setInitialInterval (Duration .ofSeconds (1 ))
419
416
.setMaximumAttempts (3 )
@@ -426,16 +423,62 @@ public String execute(String taskList) {
426
423
activities .heartbeatAndThrowIO ();
427
424
} finally {
428
425
if (Workflow .currentTimeMillis () - start < 2000 ) {
429
- throw new RuntimeException ("Activity retried without delay" );
426
+ fail ("Activity retried without delay" );
430
427
}
431
428
}
432
429
return "ignored" ;
433
430
}
434
431
}
435
432
436
433
@ Test
437
- public void testActivityRetry () {
438
- startWorkerFor (TestActivityRetry .class );
434
+ public void testActivityRetryWithMaxAttempts () {
435
+ startWorkerFor (TestActivityRetryWithMaxAttempts .class );
436
+ TestWorkflow1 workflowStub =
437
+ workflowClient .newWorkflowStub (
438
+ TestWorkflow1 .class , newWorkflowOptionsBuilder (taskList ).build ());
439
+ try {
440
+ workflowStub .execute (taskList );
441
+ fail ("unreachable" );
442
+ } catch (WorkflowException e ) {
443
+ assertTrue (e .getCause ().getCause () instanceof IOException );
444
+ }
445
+ assertEquals (activitiesImpl .toString (), 3 , activitiesImpl .invocations .size ());
446
+ }
447
+
448
+ public static class TestActivityRetryWithExpiration implements TestWorkflow1 {
449
+
450
+ @ Override
451
+ @ SuppressWarnings ("Finally" )
452
+ public String execute (String taskList ) {
453
+ ActivityOptions options =
454
+ new ActivityOptions .Builder ()
455
+ .setTaskList (taskList )
456
+ .setHeartbeatTimeout (Duration .ofSeconds (5 ))
457
+ .setScheduleToCloseTimeout (Duration .ofSeconds (3 ))
458
+ .setRetryOptions (
459
+ new RetryOptions .Builder ()
460
+ .setExpiration (Duration .ofSeconds (3 ))
461
+ .setMaximumInterval (Duration .ofSeconds (1 ))
462
+ .setInitialInterval (Duration .ofSeconds (1 ))
463
+ .setDoNotRetry (AssertionError .class )
464
+ .build ())
465
+ .build ();
466
+ TestActivities activities = Workflow .newActivityStub (TestActivities .class , options );
467
+ long start = Workflow .currentTimeMillis ();
468
+ try {
469
+ activities .heartbeatAndThrowIO ();
470
+ } finally {
471
+ if (Workflow .currentTimeMillis () - start < 2000 ) {
472
+ fail ("Activity retried without delay" );
473
+ }
474
+ }
475
+ return "ignored" ;
476
+ }
477
+ }
478
+
479
+ @ Test
480
+ public void testActivityRetryWithExiration () {
481
+ startWorkerFor (TestActivityRetryWithExpiration .class );
439
482
TestWorkflow1 workflowStub =
440
483
workflowClient .newWorkflowStub (
441
484
TestWorkflow1 .class , newWorkflowOptionsBuilder (taskList ).build ());
@@ -1241,17 +1284,17 @@ public void testMemo() {
1241
1284
startWorkerFor (TestMultiargsWorkflowsImpl .class );
1242
1285
WorkflowOptions workflowOptions = newWorkflowOptionsBuilder (taskList ).setMemo (memo ).build ();
1243
1286
TestMultiargsWorkflowsFunc stubF =
1244
- workflowClient .newWorkflowStub (TestMultiargsWorkflowsFunc .class , workflowOptions );
1287
+ workflowClient .newWorkflowStub (TestMultiargsWorkflowsFunc .class , workflowOptions );
1245
1288
WorkflowExecution executionF = WorkflowClient .start (stubF ::func );
1246
1289
1247
1290
GetWorkflowExecutionHistoryResponse historyResp =
1248
- WorkflowExecutionUtils .getHistoryPage (
1249
- new byte [] {}, testEnvironment .getWorkflowService (), DOMAIN , executionF );
1291
+ WorkflowExecutionUtils .getHistoryPage (
1292
+ new byte [] {}, testEnvironment .getWorkflowService (), DOMAIN , executionF );
1250
1293
HistoryEvent startEvent = historyResp .history .getEvents ().get (0 );
1251
1294
Memo memoFromEvent = startEvent .workflowExecutionStartedEventAttributes .getMemo ();
1252
1295
byte [] memoBytes = memoFromEvent .getFields ().get (testMemoKey ).array ();
1253
1296
String memoRetrieved =
1254
- JsonDataConverter .getInstance ().fromData (memoBytes , String .class , String .class );
1297
+ JsonDataConverter .getInstance ().fromData (memoBytes , String .class , String .class );
1255
1298
assertEquals (testMemoValue , memoRetrieved );
1256
1299
}
1257
1300
}
@@ -3761,7 +3804,9 @@ public void testGetVersion() {
3761
3804
}
3762
3805
3763
3806
static CompletableFuture <Boolean > executionStarted = new CompletableFuture <>();
3764
- public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflowSignaled {
3807
+
3808
+ public static class TestGetVersionWithoutDecisionEventWorkflowImpl
3809
+ implements TestWorkflowSignaled {
3765
3810
3766
3811
CompletablePromise <Boolean > signalReceived = Workflow .newPromise ();
3767
3812
@@ -3774,7 +3819,8 @@ public String execute() {
3774
3819
executionStarted .complete (true );
3775
3820
signalReceived .get ();
3776
3821
} else {
3777
- // Execute getVersion in replay mode. In this case we have no decision event, only a signal.
3822
+ // Execute getVersion in replay mode. In this case we have no decision event, only a
3823
+ // signal.
3778
3824
int version = Workflow .getVersion ("test_change" , Workflow .DEFAULT_VERSION , 1 );
3779
3825
if (version == Workflow .DEFAULT_VERSION ) {
3780
3826
signalReceived .get ();
0 commit comments