@@ -6,9 +6,11 @@ import (
6
6
"fmt"
7
7
"sort"
8
8
"strconv"
9
+ "sync"
9
10
"testing"
10
11
"time"
11
12
13
+ "github.com/google/uuid"
12
14
"github.com/stretchr/testify/assert"
13
15
"github.com/stretchr/testify/require"
14
16
@@ -259,7 +261,7 @@ func Test_ActivityRetries(t *testing.T) {
259
261
r := task .NewTaskRegistry ()
260
262
r .AddOrchestratorN ("ActivityRetries" , func (ctx * task.OrchestrationContext ) (any , error ) {
261
263
if err := ctx .CallActivity ("FailActivity" , task .WithActivityRetryPolicy (& task.RetryPolicy {
262
- MaxAttempts : 8 ,
264
+ MaxAttempts : 3 ,
263
265
InitialRetryInterval : 10 * time .Millisecond ,
264
266
})).Await (nil ); err != nil {
265
267
return nil , err
@@ -1340,6 +1342,153 @@ func Test_SingleActivity_ReuseInstanceIDError(t *testing.T) {
1340
1342
}
1341
1343
}
1342
1344
1345
+ func Test_TaskExecutionId (t * testing.T ) {
1346
+ t .Run ("SingleActivityWithRetry" , func (t * testing.T ) {
1347
+ // Registration
1348
+ r := task .NewTaskRegistry ()
1349
+ require .NoError (t , r .AddOrchestratorN ("TaskExecutionId" , func (ctx * task.OrchestrationContext ) (any , error ) {
1350
+ if err := ctx .CallActivity ("FailActivity" , task .WithActivityRetryPolicy (& task.RetryPolicy {
1351
+ MaxAttempts : 3 ,
1352
+ InitialRetryInterval : 10 * time .Millisecond ,
1353
+ })).Await (nil ); err != nil {
1354
+ return nil , err
1355
+ }
1356
+ return nil , nil
1357
+ }))
1358
+
1359
+ executionMap := make (map [string ]int )
1360
+ var executionId string
1361
+ require .NoError (t , r .AddActivityN ("FailActivity" , func (ctx task.ActivityContext ) (any , error ) {
1362
+ executionMap [ctx .GetTaskExecutionId ()] = executionMap [ctx .GetTaskExecutionId ()] + 1
1363
+ executionId = ctx .GetTaskExecutionId ()
1364
+ if executionMap [ctx .GetTaskExecutionId ()] == 3 {
1365
+ return nil , nil
1366
+ }
1367
+ return nil , errors .New ("activity failure" )
1368
+ }))
1369
+
1370
+ // Initialization
1371
+ ctx := context .Background ()
1372
+
1373
+ client , worker := initTaskHubWorker (ctx , r )
1374
+ defer worker .Shutdown (ctx )
1375
+
1376
+ // Run the orchestration
1377
+ id , err := client .ScheduleNewOrchestration (ctx , "TaskExecutionId" )
1378
+ require .NoError (t , err )
1379
+
1380
+ metadata , err := client .WaitForOrchestrationCompletion (ctx , id )
1381
+ require .NoError (t , err )
1382
+
1383
+ assert .Equal (t , protos .OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED , metadata .RuntimeStatus )
1384
+ // With 3 max attempts there will be two retries with 10 millis delay before each
1385
+ require .GreaterOrEqual (t , metadata .LastUpdatedAt .AsTime (), metadata .CreatedAt .AsTime ().Add (2 * 10 * time .Millisecond ))
1386
+ assert .NotEmpty (t , executionId )
1387
+ assert .Equal (t , 1 , len (executionMap ))
1388
+ assert .Equal (t , 3 , executionMap [executionId ])
1389
+ })
1390
+
1391
+ t .Run ("ParallelActivityWithRetry" , func (t * testing.T ) {
1392
+ // Registration
1393
+ r := task .NewTaskRegistry ()
1394
+ require .NoError (t , r .AddOrchestratorN ("TaskExecutionId" , func (ctx * task.OrchestrationContext ) (any , error ) {
1395
+ t1 := ctx .CallActivity ("FailActivity" , task .WithActivityRetryPolicy (& task.RetryPolicy {
1396
+ MaxAttempts : 3 ,
1397
+ InitialRetryInterval : 10 * time .Millisecond ,
1398
+ }))
1399
+
1400
+ t2 := ctx .CallActivity ("FailActivity" , task .WithActivityRetryPolicy (& task.RetryPolicy {
1401
+ MaxAttempts : 3 ,
1402
+ InitialRetryInterval : 10 * time .Millisecond ,
1403
+ }))
1404
+
1405
+ err := t1 .Await (nil )
1406
+ if err != nil {
1407
+ return nil , err
1408
+ }
1409
+
1410
+ err = t2 .Await (nil )
1411
+ if err != nil {
1412
+ return nil , err
1413
+ }
1414
+
1415
+ return nil , nil
1416
+ }))
1417
+
1418
+ executionMap := make (map [string ]int )
1419
+
1420
+ lock := sync.Mutex {}
1421
+ require .NoError (t , r .AddActivityN ("FailActivity" , func (ctx task.ActivityContext ) (any , error ) {
1422
+ lock .Lock ()
1423
+ defer lock .Unlock ()
1424
+ executionMap [ctx .GetTaskExecutionId ()] = executionMap [ctx .GetTaskExecutionId ()] + 1
1425
+ if executionMap [ctx .GetTaskExecutionId ()] == 3 {
1426
+ return nil , nil
1427
+ }
1428
+ return nil , errors .New ("activity failure" )
1429
+ }))
1430
+
1431
+ // Initialization
1432
+ ctx := context .Background ()
1433
+
1434
+ client , worker := initTaskHubWorker (ctx , r )
1435
+ defer worker .Shutdown (ctx )
1436
+
1437
+ // Run the orchestration
1438
+ id , err := client .ScheduleNewOrchestration (ctx , "TaskExecutionId" )
1439
+ require .NoError (t , err )
1440
+
1441
+ metadata , err := client .WaitForOrchestrationCompletion (ctx , id )
1442
+ require .NoError (t , err )
1443
+
1444
+ assert .Equal (t , protos .OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED , metadata .RuntimeStatus )
1445
+ // With 3 max attempts there will be two retries with 10 millis delay before each
1446
+ require .GreaterOrEqual (t , metadata .LastUpdatedAt .AsTime (), metadata .CreatedAt .AsTime ().Add (2 * 10 * time .Millisecond ))
1447
+
1448
+ assert .Equal (t , 2 , len (executionMap ))
1449
+ for k , v := range executionMap {
1450
+ assert .NotEmpty (t , k )
1451
+ assert .Equal (t , 3 , v )
1452
+ }
1453
+
1454
+ })
1455
+
1456
+ t .Run ("SingleActivityWithNoRetry" , func (t * testing.T ) {
1457
+ // Registration
1458
+ r := task .NewTaskRegistry ()
1459
+ require .NoError (t , r .AddOrchestratorN ("TaskExecutionId" , func (ctx * task.OrchestrationContext ) (any , error ) {
1460
+ if err := ctx .CallActivity ("Activity" ).Await (nil ); err != nil {
1461
+ return nil , err
1462
+ }
1463
+ return nil , nil
1464
+ }))
1465
+
1466
+ var executionId string
1467
+ require .NoError (t , r .AddActivityN ("Activity" , func (ctx task.ActivityContext ) (any , error ) {
1468
+ executionId = ctx .GetTaskExecutionId ()
1469
+ return nil , nil
1470
+ }))
1471
+
1472
+ // Initialization
1473
+ ctx := t .Context ()
1474
+
1475
+ client , worker := initTaskHubWorker (ctx , r )
1476
+ defer worker .Shutdown (ctx )
1477
+
1478
+ // Run the orchestration
1479
+ id , err := client .ScheduleNewOrchestration (ctx , "TaskExecutionId" )
1480
+ require .NoError (t , err )
1481
+
1482
+ metadata , err := client .WaitForOrchestrationCompletion (ctx , id )
1483
+ require .NoError (t , err )
1484
+
1485
+ assert .Equal (t , protos .OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED , metadata .RuntimeStatus )
1486
+ assert .NotEmpty (t , executionId )
1487
+ uuid .MustParse (executionId )
1488
+ })
1489
+
1490
+ }
1491
+
1343
1492
func initTaskHubWorker (ctx context.Context , r * task.TaskRegistry , opts ... backend.NewTaskWorkerOptions ) (backend.TaskHubClient , backend.TaskHubWorker ) {
1344
1493
// TODO: Switch to options pattern
1345
1494
logger := backend .DefaultLogger ()
0 commit comments