From b4ee4af666f0099d9637e6163ab9417446d0c68a Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Fri, 10 Jan 2025 16:16:24 +0100 Subject: [PATCH 1/8] [Wf-Diagnostics] create messaging client only if not initialised --- service/worker/diagnostics/activities.go | 10 ++--- service/worker/diagnostics/module.go | 48 +++++++++++++----------- service/worker/service.go | 15 ++++---- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index c5a548a24ac..b471961f6f0 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -97,12 +97,10 @@ func (w *dw) rootCauseIssues(ctx context.Context, info rootCauseIssuesParams) ([ } func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error { - client := w.newMessagingClient() - return emit(ctx, info, client) -} - -func (w *dw) newMessagingClient() messaging.Client { - return kafka.NewKafkaClient(&w.kafkaCfg, w.metricsClient, w.logger, w.tallyScope, true) + if w.messagingClient == nil { + w.messagingClient = kafka.NewKafkaClient(&w.kafkaCfg, w.metricsClient, w.logger, w.tallyScope, true) + } + return emit(ctx, info, w.messagingClient) } func emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error { diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go index b57f1904518..33aee028dec 100644 --- a/service/worker/diagnostics/module.go +++ b/service/worker/diagnostics/module.go @@ -24,6 +24,7 @@ package diagnostics import ( "context" + "github.com/uber/cadence/common/messaging" "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" @@ -46,36 +47,39 @@ type DiagnosticsWorkflow interface { } type dw struct { - svcClient workflowserviceclient.Interface - clientBean client.Bean - metricsClient metrics.Client - logger log.Logger - tallyScope tally.Scope - worker worker.Worker - kafkaCfg config.KafkaConfig - invariants []invariant.Invariant + svcClient workflowserviceclient.Interface + clientBean client.Bean + metricsClient metrics.Client + messagingClient messaging.Client + logger log.Logger + tallyScope tally.Scope + worker worker.Worker + kafkaCfg config.KafkaConfig + invariants []invariant.Invariant } type Params struct { - ServiceClient workflowserviceclient.Interface - ClientBean client.Bean - MetricsClient metrics.Client - Logger log.Logger - TallyScope tally.Scope - KafkaCfg config.KafkaConfig - Invariants []invariant.Invariant + ServiceClient workflowserviceclient.Interface + ClientBean client.Bean + MetricsClient metrics.Client + MessagingClient messaging.Client + Logger log.Logger + TallyScope tally.Scope + KafkaCfg config.KafkaConfig + Invariants []invariant.Invariant } // New creates a new diagnostics workflow. func New(params Params) DiagnosticsWorkflow { return &dw{ - svcClient: params.ServiceClient, - metricsClient: params.MetricsClient, - tallyScope: params.TallyScope, - clientBean: params.ClientBean, - logger: params.Logger, - kafkaCfg: params.KafkaCfg, - invariants: params.Invariants, + svcClient: params.ServiceClient, + metricsClient: params.MetricsClient, + messagingClient: params.MessagingClient, + tallyScope: params.TallyScope, + clientBean: params.ClientBean, + logger: params.Logger, + kafkaCfg: params.KafkaCfg, + invariants: params.Invariants, } } diff --git a/service/worker/service.go b/service/worker/service.go index cb48c30e5f1..fbb8d64fb57 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -343,13 +343,14 @@ func (s *Service) startFixerWorkflowWorker() { func (s *Service) startDiagnostics() { params := diagnostics.Params{ - ServiceClient: s.params.PublicClient, - MetricsClient: s.GetMetricsClient(), - TallyScope: s.params.MetricScope, - ClientBean: s.GetClientBean(), - Logger: s.GetLogger(), - KafkaCfg: s.params.KafkaConfig, - Invariants: s.params.DiagnosticsInvariants, + ServiceClient: s.params.PublicClient, + MetricsClient: s.GetMetricsClient(), + MessagingClient: s.GetMessagingClient(), + TallyScope: s.params.MetricScope, + ClientBean: s.GetClientBean(), + Logger: s.GetLogger(), + KafkaCfg: s.params.KafkaConfig, + Invariants: s.params.DiagnosticsInvariants, } if err := diagnostics.New(params).Start(); err != nil { s.Stop() From 53aafeaa60f08826dc0d31b28ebc55ed09802b8e Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 09:01:29 +0100 Subject: [PATCH 2/8] Update module.go --- service/worker/diagnostics/module.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go index 33aee028dec..8ee855828ed 100644 --- a/service/worker/diagnostics/module.go +++ b/service/worker/diagnostics/module.go @@ -24,7 +24,6 @@ package diagnostics import ( "context" - "github.com/uber/cadence/common/messaging" "github.com/opentracing/opentracing-go" "github.com/uber-go/tally" @@ -37,6 +36,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/service/worker/diagnostics/invariant" ) From 8e1ca02076208ce638acfc16ee674232514cbffc Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 09:27:21 +0100 Subject: [PATCH 3/8] Update activities.go --- service/worker/diagnostics/activities.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index b471961f6f0..6670d2e3348 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -26,7 +26,6 @@ import ( "context" "github.com/uber/cadence/common/messaging" - "github.com/uber/cadence/common/messaging/kafka" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/worker/diagnostics/analytics" "github.com/uber/cadence/service/worker/diagnostics/invariant" @@ -98,7 +97,8 @@ func (w *dw) rootCauseIssues(ctx context.Context, info rootCauseIssuesParams) ([ func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error { if w.messagingClient == nil { - w.messagingClient = kafka.NewKafkaClient(&w.kafkaCfg, w.metricsClient, w.logger, w.tallyScope, true) + // skip emitting logs if messaging client is not provided since it is optional + return nil } return emit(ctx, info, w.messagingClient) } From 185e9808d3aa5a6b6b4f929ecf9fa6c0a8d712f9 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 09:29:40 +0100 Subject: [PATCH 4/8] remove kafka config requirement --- service/worker/diagnostics/module.go | 4 ---- service/worker/service.go | 1 - 2 files changed, 5 deletions(-) diff --git a/service/worker/diagnostics/module.go b/service/worker/diagnostics/module.go index 8ee855828ed..2d33903a890 100644 --- a/service/worker/diagnostics/module.go +++ b/service/worker/diagnostics/module.go @@ -34,7 +34,6 @@ import ( "github.com/uber/cadence/client" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" @@ -54,7 +53,6 @@ type dw struct { logger log.Logger tallyScope tally.Scope worker worker.Worker - kafkaCfg config.KafkaConfig invariants []invariant.Invariant } @@ -65,7 +63,6 @@ type Params struct { MessagingClient messaging.Client Logger log.Logger TallyScope tally.Scope - KafkaCfg config.KafkaConfig Invariants []invariant.Invariant } @@ -78,7 +75,6 @@ func New(params Params) DiagnosticsWorkflow { tallyScope: params.TallyScope, clientBean: params.ClientBean, logger: params.Logger, - kafkaCfg: params.KafkaCfg, invariants: params.Invariants, } } diff --git a/service/worker/service.go b/service/worker/service.go index fbb8d64fb57..6c87f95de2a 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -349,7 +349,6 @@ func (s *Service) startDiagnostics() { TallyScope: s.params.MetricScope, ClientBean: s.GetClientBean(), Logger: s.GetLogger(), - KafkaCfg: s.params.KafkaConfig, Invariants: s.params.DiagnosticsInvariants, } if err := diagnostics.New(params).Start(); err != nil { From 144e1f91be071ee2376053fb86078a19edc4daec Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 09:35:17 +0100 Subject: [PATCH 5/8] Update activities.go --- service/worker/diagnostics/activities.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index 6670d2e3348..1be7b845127 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -24,6 +24,7 @@ package diagnostics import ( "context" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/types" @@ -98,6 +99,7 @@ func (w *dw) rootCauseIssues(ctx context.Context, info rootCauseIssuesParams) ([ func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsageData) error { if w.messagingClient == nil { // skip emitting logs if messaging client is not provided since it is optional + w.logger.Error("messaging client is not provided, skipping emitting wf-diagnostics usage logs", tag.WorkflowDomainName(info.Domain)) return nil } return emit(ctx, info, w.messagingClient) From 1869d5ed747b0b50c9ea5fd53040802974ea616e Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 09:47:21 +0100 Subject: [PATCH 6/8] Update activities.go --- service/worker/diagnostics/activities.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index 1be7b845127..56c7a43c757 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -24,8 +24,8 @@ package diagnostics import ( "context" - "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/worker/diagnostics/analytics" From 01f8a46be3df0f0c8e6810e44208794d6393fc36 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 21:34:28 +0100 Subject: [PATCH 7/8] update emitlogs activity --- service/worker/diagnostics/activities.go | 8 +++++--- service/worker/diagnostics/activities_test.go | 3 ++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/service/worker/diagnostics/activities.go b/service/worker/diagnostics/activities.go index 56c7a43c757..41831b30c47 100644 --- a/service/worker/diagnostics/activities.go +++ b/service/worker/diagnostics/activities.go @@ -102,13 +102,15 @@ func (w *dw) emitUsageLogs(ctx context.Context, info analytics.WfDiagnosticsUsag w.logger.Error("messaging client is not provided, skipping emitting wf-diagnostics usage logs", tag.WorkflowDomainName(info.Domain)) return nil } - return emit(ctx, info, w.messagingClient) + return w.emit(ctx, info, w.messagingClient) } -func emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error { +func (w *dw) emit(ctx context.Context, info analytics.WfDiagnosticsUsageData, client messaging.Client) error { producer, err := client.NewProducer(WfDiagnosticsAppName) if err != nil { - return err + // skip emitting logs if producer cannot be created since it is optional + w.logger.Error("producer creation failed, skipping emitting wf-diagnostics usage logs", tag.WorkflowDomainName(info.Domain)) + return nil } emitter := analytics.NewEmitter(analytics.EmitterParams{ Producer: producer, diff --git a/service/worker/diagnostics/activities_test.go b/service/worker/diagnostics/activities_test.go index 0602359dc2a..923e0428e08 100644 --- a/service/worker/diagnostics/activities_test.go +++ b/service/worker/diagnostics/activities_test.go @@ -140,11 +140,12 @@ func Test__rootCauseIssues(t *testing.T) { func Test__emit(t *testing.T) { ctrl := gomock.NewController(t) + dwtest := testDiagnosticWorkflow(t) mockClient := messaging.NewMockClient(ctrl) mockProducer := messaging.NewMockProducer(ctrl) mockProducer.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil) mockClient.EXPECT().NewProducer(WfDiagnosticsAppName).Return(mockProducer, nil) - err := emit(context.Background(), analytics.WfDiagnosticsUsageData{}, mockClient) + err := dwtest.emit(context.Background(), analytics.WfDiagnosticsUsageData{}, mockClient) require.NoError(t, err) } From abe293afebf4c1371b1934666cacd646ea99eb40 Mon Sep 17 00:00:00 2001 From: sankari gopalakrishnan Date: Mon, 13 Jan 2025 21:35:29 +0100 Subject: [PATCH 8/8] Update parent_workflow.go --- service/worker/diagnostics/parent_workflow.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/worker/diagnostics/parent_workflow.go b/service/worker/diagnostics/parent_workflow.go index b4a0321143a..eec990c955c 100644 --- a/service/worker/diagnostics/parent_workflow.go +++ b/service/worker/diagnostics/parent_workflow.go @@ -84,7 +84,7 @@ func (w *dw) DiagnosticsStarterWorkflow(ctx workflow.Context, params Diagnostics StartToCloseTimeout: time.Second * 5, } activityCtx := workflow.WithActivityOptions(ctx, activityOptions) - err = workflow.ExecuteActivity(activityCtx, w.emitUsageLogs, analytics.WfDiagnosticsUsageData{ + err = workflow.ExecuteActivity(activityCtx, emitUsageLogsActivity, analytics.WfDiagnosticsUsageData{ Domain: params.Domain, WorkflowID: params.WorkflowID, RunID: params.RunID,