Skip to content

Commit 9daccb2

Browse files
authored
Propagate shadow worker start error (#1068)
1 parent fff7f18 commit 9daccb2

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

internal/internal_worker.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,7 @@ func (aw *aggregatedWorker) Start() error {
864864
}
865865
return err
866866
}
867+
aw.logger.Info("Started Session Worker")
867868
}
868869

869870
if !isInterfaceNil(aw.shadowWorker) {
@@ -880,7 +881,9 @@ func (aw *aggregatedWorker) Start() error {
880881
if !isInterfaceNil(aw.sessionWorker) {
881882
aw.sessionWorker.Stop()
882883
}
884+
return err
883885
}
886+
aw.logger.Info("Started Shadow Worker")
884887
}
885888

886889
return nil

internal/internal_worker_test.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func testActivityMultipleArgsWithStruct(_ context.Context, i int, s testActivity
194194
}
195195

196196
func (s *internalWorkerTestSuite) TestCreateWorker() {
197-
worker := createWorkerWithThrottle(s.service, float64(500.0), nil)
197+
worker := createWorkerWithThrottle(s.service, float64(500.0), nil, nil)
198198
err := worker.Start()
199199
require.NoError(s.T(), err)
200200
time.Sleep(time.Millisecond * 200)
@@ -209,6 +209,14 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithDataConverter() {
209209
worker.Stop()
210210
}
211211

212+
func (s *internalWorkerTestSuite) TestCreateShadowWorker() {
213+
worker := createShadowWorker(s.service, &ShadowOptions{})
214+
s.Nil(worker.workflowWorker)
215+
s.Nil(worker.activityWorker)
216+
s.Nil(worker.locallyDispatchedActivityWorker)
217+
s.Nil(worker.sessionWorker)
218+
}
219+
212220
func (s *internalWorkerTestSuite) TestCreateWorkerRun() {
213221
// Create service endpoint
214222
mockCtrl := gomock.NewController(s.T())
@@ -278,6 +286,24 @@ func (s *internalWorkerTestSuite) TestWorkerStartFailsWithInvalidDomain() {
278286
}
279287
}
280288

289+
func (s *internalWorkerTestSuite) TestStartShadowWorkerFailWithInvalidOptions() {
290+
invalidOptions := []*ShadowOptions{
291+
{
292+
Mode: ShadowModeContinuous,
293+
},
294+
{
295+
WorkflowQuery: "workflow query",
296+
WorkflowTypes: []string{"workflowTypeName"},
297+
},
298+
}
299+
300+
for _, opt := range invalidOptions {
301+
worker := createShadowWorker(s.service, opt)
302+
err := worker.Start()
303+
assert.Error(s.T(), err, "worker.Start() should fail given invalid shadow options")
304+
}
305+
}
306+
281307
func ofPollForActivityTaskRequest(tps float64) gomock.Matcher {
282308
return &mockPollForActivityTaskRequest{tps: tps}
283309
}
@@ -298,12 +324,24 @@ func (m *mockPollForActivityTaskRequest) String() string {
298324
return "PollForActivityTaskRequest"
299325
}
300326

301-
func createWorker(service *workflowservicetest.MockClient) *aggregatedWorker {
302-
return createWorkerWithThrottle(service, float64(0.0), nil)
327+
func createWorker(
328+
service *workflowservicetest.MockClient,
329+
) *aggregatedWorker {
330+
return createWorkerWithThrottle(service, float64(0.0), nil, nil)
331+
}
332+
333+
func createShadowWorker(
334+
service *workflowservicetest.MockClient,
335+
shadowOptions *ShadowOptions,
336+
) *aggregatedWorker {
337+
return createWorkerWithThrottle(service, float64(0.0), nil, shadowOptions)
303338
}
304339

305340
func createWorkerWithThrottle(
306-
service *workflowservicetest.MockClient, activitiesPerSecond float64, dc DataConverter,
341+
service *workflowservicetest.MockClient,
342+
activitiesPerSecond float64,
343+
dc DataConverter,
344+
shadowOptions *ShadowOptions,
307345
) *aggregatedWorker {
308346
domain := "testDomain"
309347
domainStatus := shared.DomainStatusRegistered
@@ -342,6 +380,11 @@ func createWorkerWithThrottle(
342380
}
343381
workerOptions.EnableSessionWorker = true
344382

383+
if shadowOptions != nil {
384+
workerOptions.EnableShadowWorker = true
385+
workerOptions.ShadowOptions = shadowOptions
386+
}
387+
345388
// Start Worker.
346389
worker := NewWorker(
347390
service,
@@ -351,8 +394,10 @@ func createWorkerWithThrottle(
351394
return worker
352395
}
353396

354-
func createWorkerWithDataConverter(service *workflowservicetest.MockClient) *aggregatedWorker {
355-
return createWorkerWithThrottle(service, float64(0.0), newTestDataConverter())
397+
func createWorkerWithDataConverter(
398+
service *workflowservicetest.MockClient,
399+
) *aggregatedWorker {
400+
return createWorkerWithThrottle(service, float64(0.0), newTestDataConverter(), nil)
356401
}
357402

358403
func (s *internalWorkerTestSuite) testCompleteActivityHelper(opt *ClientOptions) {

0 commit comments

Comments
 (0)