diff --git a/Makefile b/Makefile index 8f3d04067..9dd876954 100644 --- a/Makefile +++ b/Makefile @@ -252,7 +252,7 @@ $Q +$(MAKE) --no-print-directory $(addprefix $(BUILD)/,$(1)) endef .PHONY: build -build: $(BUILD)/fmt ## ensure all packages build +build: $(BUILD)/lint ## ensure all packages build go build ./... $Q # caution: some errors are reported on stdout for some reason go test -exec true ./... >/dev/null diff --git a/internal/compatibility/adapter.go b/internal/compatibility/adapter.go index 62cbe2e71..0fa095dc5 100644 --- a/internal/compatibility/adapter.go +++ b/internal/compatibility/adapter.go @@ -145,8 +145,8 @@ func (a thrift2protoAdapter) RecordActivityTaskHeartbeat(ctx context.Context, re } func (a thrift2protoAdapter) RecordActivityTaskHeartbeatByID(ctx context.Context, request *shared.RecordActivityTaskHeartbeatByIDRequest, opts ...yarpc.CallOption) (*shared.RecordActivityTaskHeartbeatResponse, error) { - response, err := a.worker.RecordActivityTaskHeartbeatByID(ctx, proto.RecordActivityTaskHeartbeatByIdRequest(request), opts...) - return thrift.RecordActivityTaskHeartbeatByIdResponse(response), thrift.Error(err) + response, err := a.worker.RecordActivityTaskHeartbeatByID(ctx, proto.RecordActivityTaskHeartbeatByIDRequest(request), opts...) + return thrift.RecordActivityTaskHeartbeatByIDResponse(response), thrift.Error(err) } func (a thrift2protoAdapter) RegisterDomain(ctx context.Context, request *shared.RegisterDomainRequest, opts ...yarpc.CallOption) error { @@ -175,7 +175,7 @@ func (a thrift2protoAdapter) RespondActivityTaskCanceled(ctx context.Context, re } func (a thrift2protoAdapter) RespondActivityTaskCanceledByID(ctx context.Context, request *shared.RespondActivityTaskCanceledByIDRequest, opts ...yarpc.CallOption) error { - _, err := a.worker.RespondActivityTaskCanceledByID(ctx, proto.RespondActivityTaskCanceledByIdRequest(request), opts...) + _, err := a.worker.RespondActivityTaskCanceledByID(ctx, proto.RespondActivityTaskCanceledByIDRequest(request), opts...) return thrift.Error(err) } @@ -185,7 +185,7 @@ func (a thrift2protoAdapter) RespondActivityTaskCompleted(ctx context.Context, r } func (a thrift2protoAdapter) RespondActivityTaskCompletedByID(ctx context.Context, request *shared.RespondActivityTaskCompletedByIDRequest, opts ...yarpc.CallOption) error { - _, err := a.worker.RespondActivityTaskCompletedByID(ctx, proto.RespondActivityTaskCompletedByIdRequest(request), opts...) + _, err := a.worker.RespondActivityTaskCompletedByID(ctx, proto.RespondActivityTaskCompletedByIDRequest(request), opts...) return thrift.Error(err) } @@ -195,7 +195,7 @@ func (a thrift2protoAdapter) RespondActivityTaskFailed(ctx context.Context, requ } func (a thrift2protoAdapter) RespondActivityTaskFailedByID(ctx context.Context, request *shared.RespondActivityTaskFailedByIDRequest, opts ...yarpc.CallOption) error { - _, err := a.worker.RespondActivityTaskFailedByID(ctx, proto.RespondActivityTaskFailedByIdRequest(request), opts...) + _, err := a.worker.RespondActivityTaskFailedByID(ctx, proto.RespondActivityTaskFailedByIDRequest(request), opts...) return thrift.Error(err) } @@ -272,7 +272,7 @@ type domainAPIthriftAdapter struct { service workflowserviceclient.Interface } -func NewDomainAPITriftAdapter(thrift workflowserviceclient.Interface) domainAPIthriftAdapter { +func NewDomainAPITriftAdapter(thrift workflowserviceclient.Interface) domainAPIthriftAdapter { //revive:disable-line:unexported-return this is not used at all, consider removing return domainAPIthriftAdapter{thrift} } @@ -301,7 +301,7 @@ type workflowAPIthriftAdapter struct { service workflowserviceclient.Interface } -func NewWorkflowAPITriftAdapter(thrift workflowserviceclient.Interface) workflowAPIthriftAdapter { +func NewWorkflowAPITriftAdapter(thrift workflowserviceclient.Interface) workflowAPIthriftAdapter { //revive:disable-line:unexported-return this is not used at all, consider removing return workflowAPIthriftAdapter{thrift} } @@ -362,7 +362,7 @@ type workerAPIthriftAdapter struct { service workflowserviceclient.Interface } -func NewWorkerAPITriftAdapter(thrift workflowserviceclient.Interface) workerAPIthriftAdapter { +func NewWorkerAPITriftAdapter(thrift workflowserviceclient.Interface) workerAPIthriftAdapter { //revive:disable-line:unexported-return this is not used at all, consider removing return workerAPIthriftAdapter{thrift} } @@ -387,7 +387,7 @@ func (a workerAPIthriftAdapter) RespondActivityTaskCompleted(ctx context.Context return &apiv1.RespondActivityTaskCompletedResponse{}, proto.Error(err) } func (a workerAPIthriftAdapter) RespondActivityTaskCompletedByID(ctx context.Context, request *apiv1.RespondActivityTaskCompletedByIDRequest, opts ...yarpc.CallOption) (*apiv1.RespondActivityTaskCompletedByIDResponse, error) { - err := a.service.RespondActivityTaskCompletedByID(ctx, thrift.RespondActivityTaskCompletedByIdRequest(request), opts...) + err := a.service.RespondActivityTaskCompletedByID(ctx, thrift.RespondActivityTaskCompletedByIDRequest(request), opts...) return &apiv1.RespondActivityTaskCompletedByIDResponse{}, proto.Error(err) } func (a workerAPIthriftAdapter) RespondActivityTaskFailed(ctx context.Context, request *apiv1.RespondActivityTaskFailedRequest, opts ...yarpc.CallOption) (*apiv1.RespondActivityTaskFailedResponse, error) { @@ -395,7 +395,7 @@ func (a workerAPIthriftAdapter) RespondActivityTaskFailed(ctx context.Context, r return &apiv1.RespondActivityTaskFailedResponse{}, proto.Error(err) } func (a workerAPIthriftAdapter) RespondActivityTaskFailedByID(ctx context.Context, request *apiv1.RespondActivityTaskFailedByIDRequest, opts ...yarpc.CallOption) (*apiv1.RespondActivityTaskFailedByIDResponse, error) { - err := a.service.RespondActivityTaskFailedByID(ctx, thrift.RespondActivityTaskFailedByIdRequest(request), opts...) + err := a.service.RespondActivityTaskFailedByID(ctx, thrift.RespondActivityTaskFailedByIDRequest(request), opts...) return &apiv1.RespondActivityTaskFailedByIDResponse{}, proto.Error(err) } func (a workerAPIthriftAdapter) RespondActivityTaskCanceled(ctx context.Context, request *apiv1.RespondActivityTaskCanceledRequest, opts ...yarpc.CallOption) (*apiv1.RespondActivityTaskCanceledResponse, error) { @@ -403,7 +403,7 @@ func (a workerAPIthriftAdapter) RespondActivityTaskCanceled(ctx context.Context, return &apiv1.RespondActivityTaskCanceledResponse{}, proto.Error(err) } func (a workerAPIthriftAdapter) RespondActivityTaskCanceledByID(ctx context.Context, request *apiv1.RespondActivityTaskCanceledByIDRequest, opts ...yarpc.CallOption) (*apiv1.RespondActivityTaskCanceledByIDResponse, error) { - err := a.service.RespondActivityTaskCanceledByID(ctx, thrift.RespondActivityTaskCanceledByIdRequest(request), opts...) + err := a.service.RespondActivityTaskCanceledByID(ctx, thrift.RespondActivityTaskCanceledByIDRequest(request), opts...) return &apiv1.RespondActivityTaskCanceledByIDResponse{}, proto.Error(err) } func (a workerAPIthriftAdapter) RecordActivityTaskHeartbeat(ctx context.Context, request *apiv1.RecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption) (*apiv1.RecordActivityTaskHeartbeatResponse, error) { @@ -411,8 +411,8 @@ func (a workerAPIthriftAdapter) RecordActivityTaskHeartbeat(ctx context.Context, return proto.RecordActivityTaskHeartbeatResponse(response), proto.Error(err) } func (a workerAPIthriftAdapter) RecordActivityTaskHeartbeatByID(ctx context.Context, request *apiv1.RecordActivityTaskHeartbeatByIDRequest, opts ...yarpc.CallOption) (*apiv1.RecordActivityTaskHeartbeatByIDResponse, error) { - response, err := a.service.RecordActivityTaskHeartbeatByID(ctx, thrift.RecordActivityTaskHeartbeatByIdRequest(request), opts...) - return proto.RecordActivityTaskHeartbeatByIdResponse(response), proto.Error(err) + response, err := a.service.RecordActivityTaskHeartbeatByID(ctx, thrift.RecordActivityTaskHeartbeatByIDRequest(request), opts...) + return proto.RecordActivityTaskHeartbeatByIDResponse(response), proto.Error(err) } func (a workerAPIthriftAdapter) RespondQueryTaskCompleted(ctx context.Context, request *apiv1.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) (*apiv1.RespondQueryTaskCompletedResponse, error) { err := a.service.RespondQueryTaskCompleted(ctx, thrift.RespondQueryTaskCompletedRequest(request), opts...) @@ -427,7 +427,7 @@ type visibilityAPIthriftAdapter struct { service workflowserviceclient.Interface } -func NewVisibilityAPITriftAdapter(thrift workflowserviceclient.Interface) visibilityAPIthriftAdapter { +func NewVisibilityAPITriftAdapter(thrift workflowserviceclient.Interface) visibilityAPIthriftAdapter { //revive:disable-line:unexported-return this is not used at all, consider removing return visibilityAPIthriftAdapter{thrift} } diff --git a/internal/compatibility/api_test.go b/internal/compatibility/api_test.go index 38a5cb2fe..fbd014708 100644 --- a/internal/compatibility/api_test.go +++ b/internal/compatibility/api_test.go @@ -384,12 +384,12 @@ func TestQueryWorkflowResponse(t *testing.T) { } func TestRecordActivityTaskHeartbeatByIDRequest(t *testing.T) { for _, item := range []*apiv1.RecordActivityTaskHeartbeatByIDRequest{nil, {}, &testdata.RecordActivityTaskHeartbeatByIDRequest} { - assert.Equal(t, item, proto.RecordActivityTaskHeartbeatByIdRequest(thrift.RecordActivityTaskHeartbeatByIdRequest(item))) + assert.Equal(t, item, proto.RecordActivityTaskHeartbeatByIDRequest(thrift.RecordActivityTaskHeartbeatByIDRequest(item))) } } func TestRecordActivityTaskHeartbeatByIDResponse(t *testing.T) { for _, item := range []*apiv1.RecordActivityTaskHeartbeatByIDResponse{nil, {}, &testdata.RecordActivityTaskHeartbeatByIDResponse} { - assert.Equal(t, item, proto.RecordActivityTaskHeartbeatByIdResponse(thrift.RecordActivityTaskHeartbeatByIdResponse(item))) + assert.Equal(t, item, proto.RecordActivityTaskHeartbeatByIDResponse(thrift.RecordActivityTaskHeartbeatByIDResponse(item))) } } func TestRecordActivityTaskHeartbeatRequest(t *testing.T) { @@ -454,7 +454,7 @@ func TestResetWorkflowExecutionResponse(t *testing.T) { } func TestRespondActivityTaskCanceledByIDRequest(t *testing.T) { for _, item := range []*apiv1.RespondActivityTaskCanceledByIDRequest{nil, {}, &testdata.RespondActivityTaskCanceledByIDRequest} { - assert.Equal(t, item, proto.RespondActivityTaskCanceledByIdRequest(thrift.RespondActivityTaskCanceledByIdRequest(item))) + assert.Equal(t, item, proto.RespondActivityTaskCanceledByIDRequest(thrift.RespondActivityTaskCanceledByIDRequest(item))) } } func TestRespondActivityTaskCanceledRequest(t *testing.T) { @@ -464,7 +464,7 @@ func TestRespondActivityTaskCanceledRequest(t *testing.T) { } func TestRespondActivityTaskCompletedByIDRequest(t *testing.T) { for _, item := range []*apiv1.RespondActivityTaskCompletedByIDRequest{nil, {}, &testdata.RespondActivityTaskCompletedByIDRequest} { - assert.Equal(t, item, proto.RespondActivityTaskCompletedByIdRequest(thrift.RespondActivityTaskCompletedByIdRequest(item))) + assert.Equal(t, item, proto.RespondActivityTaskCompletedByIDRequest(thrift.RespondActivityTaskCompletedByIDRequest(item))) } } func TestRespondActivityTaskCompletedRequest(t *testing.T) { @@ -474,7 +474,7 @@ func TestRespondActivityTaskCompletedRequest(t *testing.T) { } func TestRespondActivityTaskFailedByIDRequest(t *testing.T) { for _, item := range []*apiv1.RespondActivityTaskFailedByIDRequest{nil, {}, &testdata.RespondActivityTaskFailedByIDRequest} { - assert.Equal(t, item, proto.RespondActivityTaskFailedByIdRequest(thrift.RespondActivityTaskFailedByIdRequest(item))) + assert.Equal(t, item, proto.RespondActivityTaskFailedByIDRequest(thrift.RespondActivityTaskFailedByIDRequest(item))) } } func TestRespondActivityTaskFailedRequest(t *testing.T) { @@ -589,7 +589,7 @@ func TestSupportedClientVersions(t *testing.T) { } func TestTaskIDBlock(t *testing.T) { for _, item := range []*apiv1.TaskIDBlock{nil, {}, &testdata.TaskIDBlock} { - assert.Equal(t, item, proto.TaskIdBlock(thrift.TaskIdBlock(item))) + assert.Equal(t, item, proto.TaskIDBlock(thrift.TaskIDBlock(item))) } } func TestTaskList(t *testing.T) { @@ -656,13 +656,13 @@ func TestWorkflowExecution(t *testing.T) { for _, item := range []*apiv1.WorkflowExecution{nil, {}, &testdata.WorkflowExecution} { assert.Equal(t, item, proto.WorkflowExecution(thrift.WorkflowExecution(item))) } - assert.Empty(t, thrift.WorkflowId(nil)) - assert.Empty(t, thrift.RunId(nil)) + assert.Empty(t, thrift.WorkflowID(nil)) + assert.Empty(t, thrift.RunID(nil)) } func TestExternalExecutionInfo(t *testing.T) { assert.Nil(t, proto.ExternalExecutionInfo(nil, nil)) assert.Nil(t, thrift.ExternalWorkflowExecution(nil)) - assert.Nil(t, thrift.ExternalInitiatedId(nil)) + assert.Nil(t, thrift.ExternalInitiatedID(nil)) assert.Panics(t, func() { proto.ExternalExecutionInfo(nil, common.Int64Ptr(testdata.EventID1)) }) assert.Panics(t, func() { proto.ExternalExecutionInfo(thrift.WorkflowExecution(&testdata.WorkflowExecution), nil) }) info := proto.ExternalExecutionInfo(thrift.WorkflowExecution(&testdata.WorkflowExecution), common.Int64Ptr(testdata.EventID1)) diff --git a/internal/compatibility/enum_test.go b/internal/compatibility/enum_test.go index 3f383c3ab..632634d4d 100644 --- a/internal/compatibility/enum_test.go +++ b/internal/compatibility/enum_test.go @@ -330,8 +330,8 @@ func TestWorkflowIDReusePolicy(t *testing.T) { apiv1.WorkflowIdReusePolicy_WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, apiv1.WorkflowIdReusePolicy_WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING, } { - assert.Equal(t, item, proto.WorkflowIdReusePolicy(thrift.WorkflowIdReusePolicy(item))) + assert.Equal(t, item, proto.WorkflowIDReusePolicy(thrift.WorkflowIDReusePolicy(item))) } - assert.Panics(t, func() { proto.WorkflowIdReusePolicy(shared.WorkflowIdReusePolicy(UnknownValue).Ptr()) }) - assert.Panics(t, func() { thrift.WorkflowIdReusePolicy(apiv1.WorkflowIdReusePolicy(UnknownValue)) }) + assert.Panics(t, func() { proto.WorkflowIDReusePolicy(shared.WorkflowIdReusePolicy(UnknownValue).Ptr()) }) + assert.Panics(t, func() { thrift.WorkflowIDReusePolicy(apiv1.WorkflowIdReusePolicy(UnknownValue)) }) } diff --git a/internal/compatibility/proto/decision.go b/internal/compatibility/proto/decision.go index e33de03bf..eddd01b61 100644 --- a/internal/compatibility/proto/decision.go +++ b/internal/compatibility/proto/decision.go @@ -157,7 +157,7 @@ func Decision(d *shared.Decision) *apiv1.Decision { TaskStartToCloseTimeout: secondsToDuration(attr.TaskStartToCloseTimeoutSeconds), ParentClosePolicy: ParentClosePolicy(attr.ParentClosePolicy), Control: attr.Control, - WorkflowIdReusePolicy: WorkflowIdReusePolicy(attr.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(attr.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(attr.RetryPolicy), CronSchedule: attr.GetCronSchedule(), Header: Header(attr.Header), diff --git a/internal/compatibility/proto/enum.go b/internal/compatibility/proto/enum.go index 3b42a8992..4c31ce5bb 100644 --- a/internal/compatibility/proto/enum.go +++ b/internal/compatibility/proto/enum.go @@ -106,7 +106,7 @@ func ContinueAsNewInitiator(t *shared.ContinueAsNewInitiator) apiv1.ContinueAsNe panic("unexpected enum value") } -func WorkflowIdReusePolicy(t *shared.WorkflowIdReusePolicy) apiv1.WorkflowIdReusePolicy { +func WorkflowIDReusePolicy(t *shared.WorkflowIdReusePolicy) apiv1.WorkflowIdReusePolicy { if t == nil { return apiv1.WorkflowIdReusePolicy_WORKFLOW_ID_REUSE_POLICY_INVALID } diff --git a/internal/compatibility/proto/history.go b/internal/compatibility/proto/history.go index 4fb3ab5a5..ac5ed2ba3 100644 --- a/internal/compatibility/proto/history.go +++ b/internal/compatibility/proto/history.go @@ -534,7 +534,7 @@ func StartChildWorkflowExecutionInitiatedEventAttributes(t *shared.StartChildWor ParentClosePolicy: ParentClosePolicy(t.ParentClosePolicy), Control: t.Control, DecisionTaskCompletedEventId: t.GetDecisionTaskCompletedEventId(), - WorkflowIdReusePolicy: WorkflowIdReusePolicy(t.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(t.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(t.RetryPolicy), CronSchedule: t.GetCronSchedule(), Header: Header(t.Header), diff --git a/internal/compatibility/proto/request.go b/internal/compatibility/proto/request.go index 495fb8a0e..984244436 100644 --- a/internal/compatibility/proto/request.go +++ b/internal/compatibility/proto/request.go @@ -188,7 +188,7 @@ func QueryWorkflowRequest(t *shared.QueryWorkflowRequest) *apiv1.QueryWorkflowRe } } -func RecordActivityTaskHeartbeatByIdRequest(t *shared.RecordActivityTaskHeartbeatByIDRequest) *apiv1.RecordActivityTaskHeartbeatByIDRequest { +func RecordActivityTaskHeartbeatByIDRequest(t *shared.RecordActivityTaskHeartbeatByIDRequest) *apiv1.RecordActivityTaskHeartbeatByIDRequest { if t == nil { return nil } @@ -269,7 +269,7 @@ func ResetWorkflowExecutionRequest(t *shared.ResetWorkflowExecutionRequest) *api } } -func RespondActivityTaskCanceledByIdRequest(t *shared.RespondActivityTaskCanceledByIDRequest) *apiv1.RespondActivityTaskCanceledByIDRequest { +func RespondActivityTaskCanceledByIDRequest(t *shared.RespondActivityTaskCanceledByIDRequest) *apiv1.RespondActivityTaskCanceledByIDRequest { if t == nil { return nil } @@ -293,7 +293,7 @@ func RespondActivityTaskCanceledRequest(t *shared.RespondActivityTaskCanceledReq } } -func RespondActivityTaskCompletedByIdRequest(t *shared.RespondActivityTaskCompletedByIDRequest) *apiv1.RespondActivityTaskCompletedByIDRequest { +func RespondActivityTaskCompletedByIDRequest(t *shared.RespondActivityTaskCompletedByIDRequest) *apiv1.RespondActivityTaskCompletedByIDRequest { if t == nil { return nil } @@ -317,7 +317,7 @@ func RespondActivityTaskCompletedRequest(t *shared.RespondActivityTaskCompletedR } } -func RespondActivityTaskFailedByIdRequest(t *shared.RespondActivityTaskFailedByIDRequest) *apiv1.RespondActivityTaskFailedByIDRequest { +func RespondActivityTaskFailedByIDRequest(t *shared.RespondActivityTaskFailedByIDRequest) *apiv1.RespondActivityTaskFailedByIDRequest { if t == nil { return nil } @@ -413,7 +413,7 @@ func SignalWithStartWorkflowExecutionRequest(t *shared.SignalWithStartWorkflowEx TaskStartToCloseTimeout: secondsToDuration(t.TaskStartToCloseTimeoutSeconds), Identity: t.GetIdentity(), RequestId: t.GetRequestId(), - WorkflowIdReusePolicy: WorkflowIdReusePolicy(t.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(t.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(t.RetryPolicy), CronSchedule: t.GetCronSchedule(), Memo: Memo(t.Memo), @@ -468,7 +468,7 @@ func StartWorkflowExecutionRequest(t *shared.StartWorkflowExecutionRequest) *api TaskStartToCloseTimeout: secondsToDuration(t.TaskStartToCloseTimeoutSeconds), Identity: t.GetIdentity(), RequestId: t.GetRequestId(), - WorkflowIdReusePolicy: WorkflowIdReusePolicy(t.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(t.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(t.RetryPolicy), CronSchedule: t.GetCronSchedule(), Memo: Memo(t.Memo), diff --git a/internal/compatibility/proto/response.go b/internal/compatibility/proto/response.go index 83dd79e64..776ba843e 100644 --- a/internal/compatibility/proto/response.go +++ b/internal/compatibility/proto/response.go @@ -249,7 +249,7 @@ func QueryWorkflowResponse(t *shared.QueryWorkflowResponse) *apiv1.QueryWorkflow } } -func RecordActivityTaskHeartbeatByIdResponse(t *shared.RecordActivityTaskHeartbeatResponse) *apiv1.RecordActivityTaskHeartbeatByIDResponse { +func RecordActivityTaskHeartbeatByIDResponse(t *shared.RecordActivityTaskHeartbeatResponse) *apiv1.RecordActivityTaskHeartbeatByIDResponse { if t == nil { return nil } diff --git a/internal/compatibility/proto/types.go b/internal/compatibility/proto/types.go index 69c3b47b4..9cdb9b779 100644 --- a/internal/compatibility/proto/types.go +++ b/internal/compatibility/proto/types.go @@ -58,13 +58,13 @@ func WorkflowExecution(t *shared.WorkflowExecution) *apiv1.WorkflowExecution { } } -func WorkflowRunPair(workflowId, runId string) *apiv1.WorkflowExecution { - if workflowId == "" && runId == "" { +func WorkflowRunPair(workflowID, runID string) *apiv1.WorkflowExecution { + if workflowID == "" && runID == "" { return nil } return &apiv1.WorkflowExecution{ - WorkflowId: workflowId, - RunId: runId, + WorkflowId: workflowID, + RunId: runID, } } @@ -374,11 +374,11 @@ func TaskListStatus(t *shared.TaskListStatus) *apiv1.TaskListStatus { ReadLevel: t.GetReadLevel(), AckLevel: t.GetAckLevel(), RatePerSecond: t.GetRatePerSecond(), - TaskIdBlock: TaskIdBlock(t.TaskIDBlock), + TaskIdBlock: TaskIDBlock(t.TaskIDBlock), } } -func TaskIdBlock(t *shared.TaskIDBlock) *apiv1.TaskIDBlock { +func TaskIDBlock(t *shared.TaskIDBlock) *apiv1.TaskIDBlock { if t == nil { return nil } diff --git a/internal/compatibility/thrift/decision.go b/internal/compatibility/thrift/decision.go index 22e763d41..ded236485 100644 --- a/internal/compatibility/thrift/decision.go +++ b/internal/compatibility/thrift/decision.go @@ -103,8 +103,8 @@ func Decision(d *apiv1.Decision) *shared.Decision { a := attr.RequestCancelExternalWorkflowExecutionDecisionAttributes decision.RequestCancelExternalWorkflowExecutionDecisionAttributes = &shared.RequestCancelExternalWorkflowExecutionDecisionAttributes{ Domain: &a.Domain, - WorkflowId: WorkflowId(a.WorkflowExecution), - RunId: RunId(a.WorkflowExecution), + WorkflowId: WorkflowID(a.WorkflowExecution), + RunId: RunID(a.WorkflowExecution), Control: a.Control, ChildWorkflowOnly: &a.ChildWorkflowOnly, } @@ -150,7 +150,7 @@ func Decision(d *apiv1.Decision) *shared.Decision { TaskStartToCloseTimeoutSeconds: durationToSeconds(a.TaskStartToCloseTimeout), ParentClosePolicy: ParentClosePolicy(a.ParentClosePolicy), Control: a.Control, - WorkflowIdReusePolicy: WorkflowIdReusePolicy(a.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(a.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(a.RetryPolicy), CronSchedule: &a.CronSchedule, Header: Header(a.Header), diff --git a/internal/compatibility/thrift/enum.go b/internal/compatibility/thrift/enum.go index 71cb8a447..92d5a0f93 100644 --- a/internal/compatibility/thrift/enum.go +++ b/internal/compatibility/thrift/enum.go @@ -104,7 +104,7 @@ func ContinueAsNewInitiator(t apiv1.ContinueAsNewInitiator) *shared.ContinueAsNe panic("unexpected enum value") } -func WorkflowIdReusePolicy(t apiv1.WorkflowIdReusePolicy) *shared.WorkflowIdReusePolicy { +func WorkflowIDReusePolicy(t apiv1.WorkflowIdReusePolicy) *shared.WorkflowIdReusePolicy { switch t { case apiv1.WorkflowIdReusePolicy_WORKFLOW_ID_REUSE_POLICY_INVALID: return nil diff --git a/internal/compatibility/thrift/history.go b/internal/compatibility/thrift/history.go index c3789db81..1d4fe2f59 100644 --- a/internal/compatibility/thrift/history.go +++ b/internal/compatibility/thrift/history.go @@ -583,7 +583,7 @@ func StartChildWorkflowExecutionInitiatedEventAttributes(t *apiv1.StartChildWork ParentClosePolicy: ParentClosePolicy(t.ParentClosePolicy), Control: t.Control, DecisionTaskCompletedEventId: &t.DecisionTaskCompletedEventId, - WorkflowIdReusePolicy: WorkflowIdReusePolicy(t.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(t.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(t.RetryPolicy), CronSchedule: &t.CronSchedule, Header: Header(t.Header), @@ -644,7 +644,7 @@ func WorkflowExecutionCancelRequestedEventAttributes(t *apiv1.WorkflowExecutionC } return &shared.WorkflowExecutionCancelRequestedEventAttributes{ Cause: &t.Cause, - ExternalInitiatedEventId: ExternalInitiatedId(t.ExternalExecutionInfo), + ExternalInitiatedEventId: ExternalInitiatedID(t.ExternalExecutionInfo), ExternalWorkflowExecution: ExternalWorkflowExecution(t.ExternalExecutionInfo), Identity: &t.Identity, } @@ -723,7 +723,7 @@ func WorkflowExecutionStartedEventAttributes(t *apiv1.WorkflowExecutionStartedEv WorkflowType: WorkflowType(t.WorkflowType), ParentWorkflowDomain: ParentDomainName(t.ParentExecutionInfo), ParentWorkflowExecution: ParentWorkflowExecution(t.ParentExecutionInfo), - ParentInitiatedEventId: ParentInitiatedId(t.ParentExecutionInfo), + ParentInitiatedEventId: ParentInitiatedID(t.ParentExecutionInfo), TaskList: TaskList(t.TaskList), Input: Payload(t.Input), ExecutionStartToCloseTimeoutSeconds: durationToSeconds(t.ExecutionStartToCloseTimeout), diff --git a/internal/compatibility/thrift/request.go b/internal/compatibility/thrift/request.go index a15de3949..1a71d34db 100644 --- a/internal/compatibility/thrift/request.go +++ b/internal/compatibility/thrift/request.go @@ -189,14 +189,14 @@ func QueryWorkflowRequest(t *apiv1.QueryWorkflowRequest) *shared.QueryWorkflowRe } } -func RecordActivityTaskHeartbeatByIdRequest(t *apiv1.RecordActivityTaskHeartbeatByIDRequest) *shared.RecordActivityTaskHeartbeatByIDRequest { +func RecordActivityTaskHeartbeatByIDRequest(t *apiv1.RecordActivityTaskHeartbeatByIDRequest) *shared.RecordActivityTaskHeartbeatByIDRequest { if t == nil { return nil } return &shared.RecordActivityTaskHeartbeatByIDRequest{ Domain: &t.Domain, - WorkflowID: WorkflowId(t.WorkflowExecution), - RunID: RunId(t.WorkflowExecution), + WorkflowID: WorkflowID(t.WorkflowExecution), + RunID: RunID(t.WorkflowExecution), ActivityID: &t.ActivityId, Details: Payload(t.Details), Identity: &t.Identity, @@ -271,14 +271,14 @@ func ResetWorkflowExecutionRequest(t *apiv1.ResetWorkflowExecutionRequest) *shar } } -func RespondActivityTaskCanceledByIdRequest(t *apiv1.RespondActivityTaskCanceledByIDRequest) *shared.RespondActivityTaskCanceledByIDRequest { +func RespondActivityTaskCanceledByIDRequest(t *apiv1.RespondActivityTaskCanceledByIDRequest) *shared.RespondActivityTaskCanceledByIDRequest { if t == nil { return nil } return &shared.RespondActivityTaskCanceledByIDRequest{ Domain: &t.Domain, - WorkflowID: WorkflowId(t.WorkflowExecution), - RunID: RunId(t.WorkflowExecution), + WorkflowID: WorkflowID(t.WorkflowExecution), + RunID: RunID(t.WorkflowExecution), ActivityID: &t.ActivityId, Details: Payload(t.Details), Identity: &t.Identity, @@ -296,14 +296,14 @@ func RespondActivityTaskCanceledRequest(t *apiv1.RespondActivityTaskCanceledRequ } } -func RespondActivityTaskCompletedByIdRequest(t *apiv1.RespondActivityTaskCompletedByIDRequest) *shared.RespondActivityTaskCompletedByIDRequest { +func RespondActivityTaskCompletedByIDRequest(t *apiv1.RespondActivityTaskCompletedByIDRequest) *shared.RespondActivityTaskCompletedByIDRequest { if t == nil { return nil } return &shared.RespondActivityTaskCompletedByIDRequest{ Domain: &t.Domain, - WorkflowID: WorkflowId(t.WorkflowExecution), - RunID: RunId(t.WorkflowExecution), + WorkflowID: WorkflowID(t.WorkflowExecution), + RunID: RunID(t.WorkflowExecution), ActivityID: &t.ActivityId, Result: Payload(t.Result), Identity: &t.Identity, @@ -321,14 +321,14 @@ func RespondActivityTaskCompletedRequest(t *apiv1.RespondActivityTaskCompletedRe } } -func RespondActivityTaskFailedByIdRequest(t *apiv1.RespondActivityTaskFailedByIDRequest) *shared.RespondActivityTaskFailedByIDRequest { +func RespondActivityTaskFailedByIDRequest(t *apiv1.RespondActivityTaskFailedByIDRequest) *shared.RespondActivityTaskFailedByIDRequest { if t == nil { return nil } return &shared.RespondActivityTaskFailedByIDRequest{ Domain: &t.Domain, - WorkflowID: WorkflowId(t.WorkflowExecution), - RunID: RunId(t.WorkflowExecution), + WorkflowID: WorkflowID(t.WorkflowExecution), + RunID: RunID(t.WorkflowExecution), ActivityID: &t.ActivityId, Reason: FailureReason(t.Failure), Details: FailureDetails(t.Failure), @@ -426,7 +426,7 @@ func SignalWithStartWorkflowExecutionRequest(t *apiv1.SignalWithStartWorkflowExe request.TaskStartToCloseTimeoutSeconds = durationToSeconds(t.StartRequest.TaskStartToCloseTimeout) request.Identity = &t.StartRequest.Identity request.RequestId = &t.StartRequest.RequestId - request.WorkflowIdReusePolicy = WorkflowIdReusePolicy(t.StartRequest.WorkflowIdReusePolicy) + request.WorkflowIdReusePolicy = WorkflowIDReusePolicy(t.StartRequest.WorkflowIdReusePolicy) request.RetryPolicy = RetryPolicy(t.StartRequest.RetryPolicy) request.CronSchedule = &t.StartRequest.CronSchedule request.Memo = Memo(t.StartRequest.Memo) @@ -466,7 +466,7 @@ func StartWorkflowExecutionRequest(t *apiv1.StartWorkflowExecutionRequest) *shar TaskStartToCloseTimeoutSeconds: durationToSeconds(t.TaskStartToCloseTimeout), Identity: &t.Identity, RequestId: &t.RequestId, - WorkflowIdReusePolicy: WorkflowIdReusePolicy(t.WorkflowIdReusePolicy), + WorkflowIdReusePolicy: WorkflowIDReusePolicy(t.WorkflowIdReusePolicy), RetryPolicy: RetryPolicy(t.RetryPolicy), CronSchedule: &t.CronSchedule, Memo: Memo(t.Memo), diff --git a/internal/compatibility/thrift/response.go b/internal/compatibility/thrift/response.go index 89c585f5b..558bbb48d 100644 --- a/internal/compatibility/thrift/response.go +++ b/internal/compatibility/thrift/response.go @@ -249,7 +249,7 @@ func QueryWorkflowResponse(t *apiv1.QueryWorkflowResponse) *shared.QueryWorkflow } } -func RecordActivityTaskHeartbeatByIdResponse(t *apiv1.RecordActivityTaskHeartbeatByIDResponse) *shared.RecordActivityTaskHeartbeatResponse { +func RecordActivityTaskHeartbeatByIDResponse(t *apiv1.RecordActivityTaskHeartbeatByIDResponse) *shared.RecordActivityTaskHeartbeatResponse { if t == nil { return nil } diff --git a/internal/compatibility/thrift/types.go b/internal/compatibility/thrift/types.go index 616c76a56..1dd8ec5b3 100644 --- a/internal/compatibility/thrift/types.go +++ b/internal/compatibility/thrift/types.go @@ -72,14 +72,14 @@ func WorkflowExecution(t *apiv1.WorkflowExecution) *shared.WorkflowExecution { } } -func WorkflowId(t *apiv1.WorkflowExecution) *string { +func WorkflowID(t *apiv1.WorkflowExecution) *string { if t == nil { return nil } return &t.WorkflowId } -func RunId(t *apiv1.WorkflowExecution) *string { +func RunID(t *apiv1.WorkflowExecution) *string { if t == nil { return nil } @@ -334,7 +334,7 @@ func DataBlob(t *apiv1.DataBlob) *shared.DataBlob { } } -func ExternalInitiatedId(t *apiv1.ExternalExecutionInfo) *int64 { +func ExternalInitiatedID(t *apiv1.ExternalExecutionInfo) *int64 { if t == nil { return nil } @@ -391,11 +391,11 @@ func TaskListStatus(t *apiv1.TaskListStatus) *shared.TaskListStatus { ReadLevel: &t.ReadLevel, AckLevel: &t.AckLevel, RatePerSecond: &t.RatePerSecond, - TaskIDBlock: TaskIdBlock(t.TaskIdBlock), + TaskIDBlock: TaskIDBlock(t.TaskIdBlock), } } -func TaskIdBlock(t *apiv1.TaskIDBlock) *shared.TaskIDBlock { +func TaskIDBlock(t *apiv1.TaskIDBlock) *shared.TaskIDBlock { if t == nil { return nil } @@ -427,7 +427,7 @@ func WorkflowExecutionInfo(t *apiv1.WorkflowExecutionInfo) *shared.WorkflowExecu CloseTime: timeToUnixNano(t.CloseTime), CloseStatus: WorkflowExecutionCloseStatus(t.CloseStatus), HistoryLength: &t.HistoryLength, - ParentDomainId: ParentDomainId(t.ParentExecutionInfo), + ParentDomainId: ParentDomainID(t.ParentExecutionInfo), ParentExecution: ParentWorkflowExecution(t.ParentExecutionInfo), ExecutionTime: timeToUnixNano(t.ExecutionTime), Memo: Memo(t.Memo), @@ -438,7 +438,7 @@ func WorkflowExecutionInfo(t *apiv1.WorkflowExecutionInfo) *shared.WorkflowExecu } } -func ParentDomainId(pei *apiv1.ParentExecutionInfo) *string { +func ParentDomainID(pei *apiv1.ParentExecutionInfo) *string { if pei == nil { return nil } @@ -452,7 +452,7 @@ func ParentDomainName(pei *apiv1.ParentExecutionInfo) *string { return &pei.DomainName } -func ParentInitiatedId(pei *apiv1.ParentExecutionInfo) *int64 { +func ParentInitiatedID(pei *apiv1.ParentExecutionInfo) *int64 { if pei == nil { return nil } @@ -492,8 +492,8 @@ func PendingChildExecutionInfo(t *apiv1.PendingChildExecutionInfo) *shared.Pendi return nil } return &shared.PendingChildExecutionInfo{ - WorkflowID: WorkflowId(t.WorkflowExecution), - RunID: RunId(t.WorkflowExecution), + WorkflowID: WorkflowID(t.WorkflowExecution), + RunID: RunID(t.WorkflowExecution), WorkflowTypName: &t.WorkflowTypeName, InitiatedID: &t.InitiatedId, ParentClosePolicy: ParentClosePolicy(t.ParentClosePolicy), diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index b419d6931..5c5ee246b 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -143,7 +143,7 @@ type ( readyCh chan bool TaskToken []byte WorkflowExecution *shared.WorkflowExecution - ActivityId *string + ActivityID *string ActivityType *shared.ActivityType Input []byte ScheduledTimestamp *int64 diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index bc736cd5f..8ad408a07 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -215,9 +215,8 @@ func (ldat *locallyDispatchedActivityTunnel) getTask() *locallyDispatchedActivit case ready := <-task.readyCh: if ready { return task - } else { - return nil } + return nil case <-ldat.stopCh: return nil } @@ -512,7 +511,7 @@ func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Co // assume the activity type is in registry otherwise the activity would be failed and retried from server activityTask := &locallyDispatchedActivityTask{ readyCh: make(chan bool, 1), - ActivityId: attr.ActivityId, + ActivityID: attr.ActivityId, ActivityType: attr.ActivityType, Input: attr.Input, Header: attr.Header, @@ -537,7 +536,7 @@ func (wtp *workflowTaskPoller) handleDecisionTaskCompletedRequest(ctx context.Co for _, at := range activityTasks { started := false if response != nil && err == nil { - if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityId]; ok { + if adl, ok := response.ActivitiesToDispatchLocally[*at.ActivityID]; ok { at.ScheduledTimestamp = adl.ScheduledTimestamp at.StartedTimestamp = adl.StartedTimestamp at.ScheduledTimestampOfThisAttempt = adl.ScheduledTimestampOfThisAttempt @@ -1226,7 +1225,7 @@ func (atp *locallyDispatchedActivityTaskPoller) pollLocallyDispatchedActivity(ct atp.metricsScope.Counter(metrics.ActivityPollCounter).Inc(1) atp.metricsScope.Counter(metrics.LocallyDispatchedActivityPollSucceedCounter).Inc(1) response := &s.PollForActivityTaskResponse{} - response.ActivityId = task.ActivityId + response.ActivityId = task.ActivityID response.ActivityType = task.ActivityType response.Header = task.Header response.Input = task.Input diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index c6d5e0a55..2a2848350 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -217,10 +217,13 @@ func (s *internalWorkerTestSuite) TestCreateWorker_WithDataConverter() { func (s *internalWorkerTestSuite) TestCreateShadowWorker() { worker := createShadowWorker(s.T(), s.service, &ShadowOptions{}) - s.Nil(worker.workflowWorker) - s.Nil(worker.activityWorker) - s.Nil(worker.locallyDispatchedActivityWorker) - s.Nil(worker.sessionWorker) + w, ok := worker.(*aggregatedWorker) + s.True(ok) + s.NotNil(w) + s.Nil(w.workflowWorker) + s.Nil(w.activityWorker) + s.Nil(w.locallyDispatchedActivityWorker) + s.Nil(w.sessionWorker) } func (s *internalWorkerTestSuite) TestCreateWorker_WithAutoScaler() { @@ -261,9 +264,8 @@ func (s *internalWorkerTestSuite) TestCreateWorkerRun() { func (s *internalWorkerTestSuite) TestNoActivitiesOrWorkflows() { t := s.T() w := createWorker(s.T(), s.service) - w.registry = newRegistry() - assert.Empty(t, w.registry.getRegisteredActivities()) - assert.Empty(t, w.registry.GetRegisteredWorkflowTypes()) + assert.Empty(t, w.GetRegisteredActivities()) + assert.Empty(t, w.GetRegisteredWorkflows()) assert.NoError(t, w.Start()) w.Stop() } @@ -290,6 +292,8 @@ func (s *internalWorkerTestSuite) TestWorkerStartFailsWithInvalidDomain() { }).Times(2) worker := createWorker(s.T(), service) + worker.RegisterWorkflow(testWorkflowSample) // at least register one workflow otherwise workflow worker will not be started + worker.RegisterActivity(testActivity) // at least register one activity otherwise activity worker will not be started if tc.isErrFatal { err := worker.Start() assert.Error(t, err, "worker.start() MUST fail when domain is invalid") @@ -350,7 +354,7 @@ func (m *mockPollForActivityTaskRequest) String() string { func createWorker( t *testing.T, service *workflowservicetest.MockClient, -) *aggregatedWorker { +) Worker { return createWorkerWithThrottle(t, service, 0, WorkerOptions{}) } @@ -358,7 +362,7 @@ func createShadowWorker( t *testing.T, service *workflowservicetest.MockClient, shadowOptions *ShadowOptions, -) *aggregatedWorker { +) Worker { return createWorkerWithThrottle(t, service, 0, WorkerOptions{ EnableShadowWorker: true, ShadowOptions: *shadowOptions, @@ -370,7 +374,7 @@ func createWorkerWithThrottle( service *workflowservicetest.MockClient, activitiesPerSecond float64, workerOptions WorkerOptions, -) *aggregatedWorker { +) Worker { domain := "testDomain" domainStatus := shared.DomainStatusRegistered domainDesc := &shared.DescribeDomainResponse{ @@ -403,7 +407,7 @@ func createWorkerWithThrottle( workerOptions.WorkerActivitiesPerSecond = 20 workerOptions.TaskListActivitiesPerSecond = activitiesPerSecond workerOptions.Logger = testlogger.NewZap(t) - workerOptions.EnableSessionWorker = true + workerOptions.EnableSessionWorker = false // Start Worker. worker, err := NewWorker( @@ -418,21 +422,21 @@ func createWorkerWithThrottle( func createWorkerWithDataConverter( t *testing.T, service *workflowservicetest.MockClient, -) *aggregatedWorker { +) Worker { return createWorkerWithThrottle(t, service, 0, WorkerOptions{DataConverter: newTestDataConverter()}) } func createWorkerWithAutoscaler( t *testing.T, service *workflowservicetest.MockClient, -) *aggregatedWorker { +) Worker { return createWorkerWithThrottle(t, service, 0, WorkerOptions{FeatureFlags: FeatureFlags{PollerAutoScalerEnabled: true}}) } func createWorkerWithStrictNonDeterminismDisabled( t *testing.T, service *workflowservicetest.MockClient, -) *aggregatedWorker { +) Worker { return createWorkerWithThrottle(t, service, 0, WorkerOptions{WorkerBugPorts: WorkerBugPorts{DisableStrictNonDeterminismCheck: true}}) } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 024ec8a55..13a8bb9b5 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1723,13 +1723,13 @@ func (s *workflowClientTestSuite) TestCancelWorkflowBackwardsCompatible() { } type PartialCancelRequestMatcher struct { - wfId *string + wfID *string cause *string } -func newPartialCancelRequestMatcher(wfId *string, cause *string) gomock.Matcher { +func newPartialCancelRequestMatcher(wfID *string, cause *string) gomock.Matcher { return &PartialCancelRequestMatcher{ - wfId: wfId, + wfID: wfID, cause: cause, } } @@ -1740,7 +1740,7 @@ func (m *PartialCancelRequestMatcher) Matches(a interface{}) bool { return false } - return (aEx.Cause == m.cause || *aEx.Cause == *m.cause) && *aEx.WorkflowExecution.WorkflowId == *m.wfId + return (aEx.Cause == m.cause || *aEx.Cause == *m.cause) && *aEx.WorkflowExecution.WorkflowId == *m.wfID } func (m *PartialCancelRequestMatcher) String() string { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index c5a7b0aaf..5cfc5ef3d 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1656,10 +1656,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowContextPropagation() { cwo := ChildWorkflowOptions{ExecutionStartToCloseTimeout: time.Hour /* this is currently ignored by test suite */} ctx = WithChildWorkflowOptions(ctx, cwo) childCtx := WithValue(ctx, contextKey(testHeader), "test-data-for-child") - if err := ExecuteChildWorkflow(childCtx, childWorkflowFn).Get(childCtx, nil); err != nil { - return err - } - return nil + return ExecuteChildWorkflow(childCtx, childWorkflowFn).Get(childCtx, nil) } s.SetContextPropagators([]ContextPropagator{NewStringMapPropagator([]string{testHeader})}) diff --git a/internal/session_test.go b/internal/session_test.go index 44817da6d..eec96bae8 100644 --- a/internal/session_test.go +++ b/internal/session_test.go @@ -578,10 +578,7 @@ func (s *SessionTestSuite) TestUserTimerWithinSession() { defer CompleteSession(sessionCtx) - if err := NewTimer(sessionCtx, time.Hour).Get(sessionCtx, nil); err != nil { - return err - } - return nil + return NewTimer(sessionCtx, time.Hour).Get(sessionCtx, nil) } env := newTestWorkflowEnv(s.T()) @@ -615,10 +612,7 @@ func (s *SessionTestSuite) TestActivityRetryWithinSession() { defer CompleteSession(sessionCtx) - if err := ExecuteActivity(sessionCtx, testSessionActivity, "").Get(sessionCtx, nil); err != nil { - return err - } - return nil + return ExecuteActivity(sessionCtx, testSessionActivity, "").Get(sessionCtx, nil) } env := newTestWorkflowEnv(s.T()) diff --git a/internal/worker.go b/internal/worker.go index 004239057..6c8e5cbf4 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -38,6 +38,118 @@ import ( ) type ( + // Worker hosts workflow and activity implementations. + // Use worker.New(...) to create an instance. + Worker interface { + Registry + + // Start starts the worker in a non-blocking fashion + Start() error + // Run is a blocking start and cleans up resources when killed + // returns error only if it fails to start the worker + Run() error + // Stop cleans up any resources opened by worker + Stop() + } + + // Registry exposes registration functions to consumers. + Registry interface { + WorkflowRegistry + ActivityRegistry + } + + // WorkflowRegistry exposes workflow registration functions to consumers. + WorkflowRegistry interface { + // RegisterWorkflow - registers a workflow function with the worker. + // A workflow takes a workflow.Context and input and returns a (result, error) or just error. + // Examples: + // func sampleWorkflow(ctx workflow.Context, input []byte) (result []byte, err error) + // func sampleWorkflow(ctx workflow.Context, arg1 int, arg2 string) (result []byte, err error) + // func sampleWorkflow(ctx workflow.Context) (result []byte, err error) + // func sampleWorkflow(ctx workflow.Context, arg1 int) (result string, err error) + // Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. + // For global registration consider workflow.Register + // This method panics if workflowFunc doesn't comply with the expected format or tries to register the same workflow + RegisterWorkflow(w interface{}) + + // RegisterWorkflowWithOptions registers the workflow function with options. + // The user can use options to provide an external name for the workflow or leave it empty if no + // external name is required. This can be used as + // worker.RegisterWorkflowWithOptions(sampleWorkflow, RegisterWorkflowOptions{}) + // worker.RegisterWorkflowWithOptions(sampleWorkflow, RegisterWorkflowOptions{Name: "foo"}) + // This method panics if workflowFunc doesn't comply with the expected format or tries to register the same workflow + // type name twice. Use workflow.RegisterOptions.DisableAlreadyRegisteredCheck to allow multiple registrations. + RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions) + + // GetRegisteredWorkflows returns information on all workflows registered on the worker. + // the RegistryInfo interface can be used to read workflow names, paths or retrieve the workflow functions. + // The workflow name is by default the method name. However, if the workflow was registered + // with options (see Worker.RegisterWorkflowWithOptions), the workflow may have customized name. + // For chained registries, this returns a combined list of all registered workflows from the + // worker instance that calls this method to the global registry. In this case, the list may contain duplicate names. + GetRegisteredWorkflows() []RegistryWorkflowInfo + } + + // ActivityRegistry exposes activity registration functions to consumers. + ActivityRegistry interface { + // RegisterActivity - register an activity function or a pointer to a structure with the worker. + // An activity function takes a context and input and returns a (result, error) or just error. + // + // And activity struct is a structure with all its exported methods treated as activities. The default + // name of each activity is the method name. + // + // Examples: + // func sampleActivity(ctx context.Context, input []byte) (result []byte, err error) + // func sampleActivity(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) + // func sampleActivity(ctx context.Context) (err error) + // func sampleActivity() (result string, err error) + // func sampleActivity(arg1 bool) (result int, err error) + // func sampleActivity(arg1 bool) (err error) + // + // type Activities struct { + // // fields + // } + // func (a *Activities) SampleActivity1(ctx context.Context, arg1 int, arg2 string) (result *customerStruct, err error) { + // ... + // } + // + // func (a *Activities) SampleActivity2(ctx context.Context, arg1 int, arg2 *customerStruct) (result string, err error) { + // ... + // } + // + // Serialization of all primitive types, structures is supported ... except channels, functions, variadic, unsafe pointer. + // This method panics if activityFunc doesn't comply with the expected format or an activity with the same + // type name is registered more than once. + // For global registration consider activity.Register + RegisterActivity(a interface{}) + + // RegisterActivityWithOptions registers the activity function or struct pointer with options. + // The user can use options to provide an external name for the activity or leave it empty if no + // external name is required. This can be used as + // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{}) + // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{Name: "barExternal"}) + // When registering the structure that implements activities the name is used as a prefix that is + // prepended to the activity method name. + // worker.RegisterActivityWithOptions(&Activities{ ... }, RegisterActivityOptions{Name: "MyActivities_"}) + // To override each name of activities defined through a structure register the methods one by one: + // activities := &Activities{ ... } + // worker.RegisterActivityWithOptions(activities.SampleActivity1, RegisterActivityOptions{Name: "Sample1"}) + // worker.RegisterActivityWithOptions(activities.SampleActivity2, RegisterActivityOptions{Name: "Sample2"}) + // See RegisterActivity function for more info. + // The other use of options is to disable duplicated activity registration check + // which might be useful for integration tests. + // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true}) + RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions) + + // GetRegisteredActivities returns information on all activities registered on the worker. + // the RegistryInfo interface can be used to read activity names, paths or retrieve the activity functions. + // The activity name is by default the method name. However, if the workflow was registered + // with options (see Worker.RegisterWorkflowWithOptions), the workflow may have customized name. + // For chained registries, this returns a combined list of all registered activities from the + // worker instance that calls this method to the global registry. In this case, the list may contain duplicate names. + GetRegisteredActivities() []RegistryActivityInfo + } + // WorkerOptions is used to configure a worker instance. // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. @@ -330,7 +442,7 @@ func NewWorker( domain string, taskList string, options WorkerOptions, -) (*aggregatedWorker, error) { +) (Worker, error) { return newAggregatedWorker(service, domain, taskList, options) } diff --git a/internal/workflow.go b/internal/workflow.go index 0545f808b..e8b34aad0 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1105,8 +1105,10 @@ func getWorkflowHeader(ctx Context, ctxProps []ContextPropagator) *s.Header { // WorkflowInfo information about currently executing workflow type WorkflowInfo struct { - WorkflowExecution WorkflowExecution - OriginalRunId string // The original runID before resetting. Using it instead of current runID can make workflow decision determinstic after reset + WorkflowExecution WorkflowExecution + // The original runID before resetting. Using it instead of current runID can make workflow decision determinstic after reset + // revive:disable-next-line this is an public field that is already exposed + OriginalRunId string WorkflowType WorkflowType TaskListName string ExecutionStartToCloseTimeoutSeconds int32 diff --git a/test/integration_test.go b/test/integration_test.go index d201cd0fa..687191949 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -635,10 +635,10 @@ func (ts *IntegrationTestSuite) registerWorkflowsAndActivities(w worker.Worker) ts.activities.register(w) } -func (ts *IntegrationTestSuite) waitForWorkflowFinish(wid string, runId string) error { +func (ts *IntegrationTestSuite) waitForWorkflowFinish(wid string, runID string) error { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() - wfRun := ts.libClient.GetWorkflow(ctx, wid, runId) + wfRun := ts.libClient.GetWorkflow(ctx, wid, runID) return wfRun.Get(ctx, nil) } diff --git a/test/replaytests/workflows.go b/test/replaytests/workflows.go index a440751f7..e32124489 100644 --- a/test/replaytests/workflows.go +++ b/test/replaytests/workflows.go @@ -47,20 +47,19 @@ func Workflow(ctx workflow.Context, name string) error { v := workflow.GetVersion(ctx, "test-change", workflow.DefaultVersion, 1) if v == workflow.DefaultVersion { return errors.New("no default-version history") - } else { - err := workflow.ExecuteActivity(ctx, helloworldActivity, name).Get(ctx, &helloworldResult) - if err != nil { - logger.Error("First activity failed.", zap.Error(err)) - return err - } - err = workflow.ExecuteActivity(ctx, helloworldActivity, name).Get(ctx, &helloworldResult) - if err != nil { - logger.Error("Second activity failed.", zap.Error(err)) - return err - } } - err := workflow.ExecuteActivity(ctx, helloworldActivity, name).Get(ctx, &helloworldResult) + if err != nil { + logger.Error("First activity failed.", zap.Error(err)) + return err + } + err = workflow.ExecuteActivity(ctx, helloworldActivity, name).Get(ctx, &helloworldResult) + if err != nil { + logger.Error("Second activity failed.", zap.Error(err)) + return err + } + + err = workflow.ExecuteActivity(ctx, helloworldActivity, name).Get(ctx, &helloworldResult) if err != nil { logger.Error("Third activity failed.", zap.Error(err)) return err