Skip to content

Commit 905fc37

Browse files
authored
Merge branch 'master' into newer-tools
2 parents 6d3dcb6 + ebb27dd commit 905fc37

File tree

2 files changed

+30
-27
lines changed

2 files changed

+30
-27
lines changed

internal/error.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,16 @@ func (e *ContinueAsNewError) Args() []interface{} {
365365
return e.args
366366
}
367367

368+
// Input return serialized workflow argument
369+
func (e *ContinueAsNewError) Input() []byte {
370+
return e.params.input
371+
}
372+
373+
// Header return the header to start a workflow
374+
func (e *ContinueAsNewError) Header() *shared.Header {
375+
return e.params.header
376+
}
377+
368378
// newTerminatedError creates NewTerminatedError instance
369379
func newTerminatedError() *TerminatedError {
370380
return &TerminatedError{}

internal/internal_workers_test.go

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -347,14 +347,7 @@ func (s *WorkersTestSuite) TestLongRunningDecisionTask() {
347347
)
348348
worker.RegisterActivity(localActivitySleep)
349349

350-
worker.Start()
351-
// wait for test to complete
352-
select {
353-
case <-doneCh:
354-
break
355-
case <-time.After(time.Second * 4):
356-
}
357-
worker.Stop()
350+
startWorkerAndWait(s, worker, &doneCh)
358351

359352
s.True(isWorkflowCompleted)
360353
s.Equal(2, localActivityCalledCount)
@@ -532,9 +525,7 @@ func (s *WorkersTestSuite) TestQueryTask_WorkflowCacheEvicted() {
532525
RegisterActivityOptions{Name: activityType},
533526
)
534527

535-
worker.Start()
536-
<-doneCh
537-
worker.Stop()
528+
startWorkerAndWait(s, worker, &doneCh)
538529
}
539530

540531
func (s *WorkersTestSuite) TestMultipleLocalActivities() {
@@ -654,14 +645,7 @@ func (s *WorkersTestSuite) TestMultipleLocalActivities() {
654645
)
655646
worker.RegisterActivity(localActivitySleep)
656647

657-
worker.Start()
658-
// wait for test to complete
659-
select {
660-
case <-doneCh:
661-
break
662-
case <-time.After(time.Second * 5):
663-
}
664-
worker.Stop()
648+
startWorkerAndWait(s, worker, &doneCh)
665649

666650
s.True(isWorkflowCompleted)
667651
s.Equal(2, localActivityCalledCount)
@@ -771,14 +755,7 @@ func (s *WorkersTestSuite) TestLocallyDispatchedActivity() {
771755
)
772756
worker.RegisterActivityWithOptions(activitySleep, RegisterActivityOptions{Name: "activitySleep"})
773757

774-
worker.Start()
775-
// wait for test to complete
776-
select {
777-
case <-doneCh:
778-
break
779-
case <-time.After(1 * time.Second):
780-
}
781-
worker.Stop()
758+
startWorkerAndWait(s, worker, &doneCh)
782759

783760
s.True(isActivityResponseCompleted)
784761
s.Equal(1, activityCalledCount)
@@ -889,6 +866,8 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
889866
worker.Start()
890867

891868
// wait for test to complete
869+
// This test currently never completes, however after the timeout the asserts are true
870+
// so the test passes, I believe this is an error.
892871
select {
893872
case <-doneCh:
894873
break
@@ -900,3 +879,17 @@ func (s *WorkersTestSuite) TestMultipleLocallyDispatchedActivity() {
900879
s.True(activityResponseCompletedCount > 0)
901880
s.True(activityCalledCount > 0)
902881
}
882+
883+
// wait for test to complete - timeout and fail after 10 seconds to not block execution of other tests
884+
func startWorkerAndWait(s *WorkersTestSuite, worker *aggregatedWorker, doneCh *chan struct{}) {
885+
s.T().Helper()
886+
worker.Start()
887+
// wait for test to complete
888+
select {
889+
case <-*doneCh:
890+
return
891+
case <-time.After(10 * time.Second):
892+
s.Fail("Test timed out")
893+
}
894+
worker.Stop()
895+
}

0 commit comments

Comments
 (0)