diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index c5a548a24ac..41831b30c47 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -25,8 +25,8 @@ package diagnostics import ( "context" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" - "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/worker/diagnostics/analytics" "github.com/uber/cadence/service/worker/diagnostics/invariant" @@ -97,18 +97,20 @@ func (w *dw) rootCauseIssues(ctx context.Context, info rootCauseIssuesParams) ([ } func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error { - client := w.newMessagingClient() - return emit(ctx, info, client) -} - -func (w *dw) newMessagingClient() messaging.Client { - return kafka.NewKafkaClient(&w.kafkaCfg, w.metricsClient, w.logger, w.tallyScope, true) + if w.messagingClient == nil { + // skip emitting logs if messaging client is not provided since it is optional + w.logger.Error("messaging client is not provided, skipping emitting wf-diagnostics usage logs", tag.WorkflowDomainName(info.Domain)) + return nil + } + return w.emit(ctx, info, w.messagingClient) } -func emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error { +func (w *dw) emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error { producer, err := client.NewProducer(WfDiagnosticsAppName) if err != nil { - return err + // skip emitting logs if producer cannot be created since it is optional + w.logger.Error("producer creation failed, skipping emitting wf-diagnostics usage logs", tag.WorkflowDomainName(info.Domain)) + return nil } emitter := analytics.NewEmitter(analytics.EmitterParams{ Producer: producer, diff --git a/service/worker/diagnostics/activities_test.go b/service/worker/diagnostics/activities_test.go index 0602359dc2a..923e0428e08 100644 --- a/service/worker/diagnostics/activities_test.go +++ b/service/worker/diagnostics/activities_test.go @@ -140,11 +140,12 @@ func Test__rootCauseIssues(t *testing.T) { func Test__emit(t *testing.T) { ctrl := gomock.NewController(t) + dwtest := testDiagnosticWorkflow(t) mockClient := messaging.NewMockClient(ctrl) mockProducer := messaging.NewMockProducer(ctrl) mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil) mockClient.EXPECT().NewProducer(WfDiagnosticsAppName).Return(mockProducer, nil) - err := emit(context.Background(), analytics.WfDiagnosticsUsageData{}, mockClient) + err := dwtest.emit(context.Background(), analytics.WfDiagnosticsUsageData{}, mockClient) require.NoError(t, err) } diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go index b57f1904518..2d33903a890 100644 --- a/service/worker/diagnostics/module.go +++ b/service/worker/diagnostics/module.go @@ -34,8 +34,8 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/worker/diagnostics/invariant" ) @@ -46,36 +46,36 @@ type DiagnosticsWorkflow interface { } type dw struct { - svcClient workflowserviceclient.Interface - clientBean client.Bean - metricsClient metrics.Client - logger log.Logger - tallyScope tally.Scope - worker worker.Worker - kafkaCfg config.KafkaConfig - invariants []invariant.Invariant + svcClient workflowserviceclient.Interface + clientBean client.Bean + metricsClient metrics.Client + messagingClient messaging.Client + logger log.Logger + tallyScope tally.Scope + worker worker.Worker + invariants []invariant.Invariant } type Params struct { - ServiceClient workflowserviceclient.Interface - ClientBean client.Bean - MetricsClient metrics.Client - Logger log.Logger - TallyScope tally.Scope - KafkaCfg config.KafkaConfig - Invariants []invariant.Invariant + ServiceClient workflowserviceclient.Interface + ClientBean client.Bean + MetricsClient metrics.Client + MessagingClient messaging.Client + Logger log.Logger + TallyScope tally.Scope + Invariants []invariant.Invariant } // New creates a new diagnostics workflow. func New(params Params) DiagnosticsWorkflow { return &dw{ - svcClient: params.ServiceClient, - metricsClient: params.MetricsClient, - tallyScope: params.TallyScope, - clientBean: params.ClientBean, - logger: params.Logger, - kafkaCfg: params.KafkaCfg, - invariants: params.Invariants, + svcClient: params.ServiceClient, + metricsClient: params.MetricsClient, + messagingClient: params.MessagingClient, + tallyScope: params.TallyScope, + clientBean: params.ClientBean, + logger: params.Logger, + invariants: params.Invariants, } } diff --git a/service/worker/diagnostics/parent_workflow.go b/service/worker/diagnostics/parent_workflow.go index b4a0321143a..eec990c955c 100644 --- a/service/worker/diagnostics/parent_workflow.go +++ b/service/worker/diagnostics/parent_workflow.go @@ -84,7 +84,7 @@ func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params Diagnostics StartToCloseTimeout: time.Second * 5, } activityCtx := workflow.WithActivityOptions(ctx, activityOptions) - err = workflow.ExecuteActivity(activityCtx, w.emitUsageLogs, analytics.WfDiagnosticsUsageData{ + err = workflow.ExecuteActivity(activityCtx, emitUsageLogsActivity, analytics.WfDiagnosticsUsageData{ Domain: params.Domain, WorkflowID: params.WorkflowID, RunID: params.RunID, diff --git a/service/worker/service.go b/service/worker/service.go index cb48c30e5f1..6c87f95de2a 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -343,13 +343,13 @@ func (s *Service) startFixerWorkflowWorker() { func (s *Service) startDiagnostics() { params := diagnostics.Params{ - ServiceClient: s.params.PublicClient, - MetricsClient: s.GetMetricsClient(), - TallyScope: s.params.MetricScope, - ClientBean: s.GetClientBean(), - Logger: s.GetLogger(), - KafkaCfg: s.params.KafkaConfig, - Invariants: s.params.DiagnosticsInvariants, + ServiceClient: s.params.PublicClient, + MetricsClient: s.GetMetricsClient(), + MessagingClient: s.GetMessagingClient(), + TallyScope: s.params.MetricScope, + ClientBean: s.GetClientBean(), + Logger: s.GetLogger(), + Invariants: s.params.DiagnosticsInvariants, } if err := diagnostics.New(params).Start(); err != nil { s.Stop()