Skip to content

Commit 4eafd4a

Browse files
authored
Fixed fx.In embedding in output types (#8584)
## What changed? `fx.In` shouldn't be embedded in the `{Invocation,Backoff}TaskExecutor` because these are output parameters. ## Why? These will cause tests to fail once mutable state is integrated. ## How did you test it? - [ ] built - [ ] run locally and tested manually - [ ] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) ## Potential risks No
1 parent 1304595 commit 4eafd4a

File tree

4 files changed

+69
-57
lines changed

4 files changed

+69
-57
lines changed

chasm/lib/callback/chasm_invocation.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,32 @@ func (c chasmInvocation) Invoke(
5959
// Get back the base64-encoded ComponentRef from the header.
6060
encodedRef, ok := c.nexus.GetHeader()[commonnexus.CallbackTokenHeader]
6161
if !ok {
62-
return invocationResultFail{logInternalError(e.Logger, "callback missing token", nil)}
62+
return invocationResultFail{logInternalError(e.logger, "callback missing token", nil)}
6363
}
6464

6565
decodedRef, err := base64.RawURLEncoding.DecodeString(encodedRef)
6666
if err != nil {
67-
return invocationResultFail{logInternalError(e.Logger, "failed to decode CHASM ComponentRef", err)}
67+
return invocationResultFail{logInternalError(e.logger, "failed to decode CHASM ComponentRef", err)}
6868
}
6969

7070
// Validate that the bytes are a valid ChasmComponentRef
7171
ref := &persistencespb.ChasmComponentRef{}
7272
err = proto.Unmarshal(decodedRef, ref)
7373
if err != nil {
74-
return invocationResultFail{logInternalError(e.Logger, "failed to unmarshal CHASM ComponentRef", err)}
74+
return invocationResultFail{logInternalError(e.logger, "failed to unmarshal CHASM ComponentRef", err)}
7575
}
7676

7777
request, err := c.getHistoryRequest(decodedRef)
7878
if err != nil {
79-
return invocationResultFail{logInternalError(e.Logger, "failed to build history request: %v", err)}
79+
return invocationResultFail{logInternalError(e.logger, "failed to build history request: %v", err)}
8080
}
8181

8282
// RPC to History for cross-shard completion delivery.
83-
_, err = e.HistoryClient.CompleteNexusOperationChasm(ctx, request)
83+
_, err = e.historyClient.CompleteNexusOperationChasm(ctx, request)
8484
if err != nil {
85-
msg := logInternalError(e.Logger, "failed to complete Nexus operation: %v", err)
85+
msg := logInternalError(e.logger, "failed to complete Nexus operation: %v", err)
8686
if isRetryableRPCResponse(err) {
87-
return invocationResultRetry{err: msg, retryPolicy: e.Config.RetryPolicy()}
87+
return invocationResultRetry{err: msg, retryPolicy: e.config.RetryPolicy()}
8888
}
8989
return invocationResultFail{msg}
9090
}

chasm/lib/callback/executors.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,14 @@ type HTTPCallerProvider func(queues.NamespaceIDAndDestination) HTTPCaller
2323

2424
func NewInvocationTaskExecutor(opts InvocationTaskExecutorOptions) *InvocationTaskExecutor {
2525
return &InvocationTaskExecutor{
26-
InvocationTaskExecutorOptions: opts,
26+
config: opts.Config,
27+
namespaceRegistry: opts.NamespaceRegistry,
28+
metricsHandler: opts.MetricsHandler,
29+
logger: opts.Logger,
30+
httpCallerProvider: opts.HTTPCallerProvider,
31+
httpTraceProvider: opts.HTTPTraceProvider,
32+
historyClient: opts.HistoryClient,
33+
chasmEngine: opts.ChasmEngine,
2734
}
2835
}
2936

@@ -41,7 +48,14 @@ type InvocationTaskExecutorOptions struct {
4148
}
4249

4350
type InvocationTaskExecutor struct {
44-
InvocationTaskExecutorOptions
51+
config *Config
52+
namespaceRegistry namespace.Registry
53+
metricsHandler metrics.Handler
54+
logger log.Logger
55+
httpCallerProvider HTTPCallerProvider
56+
httpTraceProvider commonnexus.HTTPClientTraceProvider
57+
historyClient resource.HistoryClient
58+
chasmEngine chasm.Engine
4559
}
4660

4761
func (e InvocationTaskExecutor) Execute(ctx context.Context, ref chasm.ComponentRef, attrs chasm.TaskAttributes, task *callbackspb.InvocationTask) error {
@@ -106,7 +120,7 @@ func (e InvocationTaskExecutor) Invoke(
106120
taskAttr chasm.TaskAttributes,
107121
task *callbackspb.InvocationTask,
108122
) error {
109-
ns, err := e.NamespaceRegistry.GetNamespaceByID(namespace.ID(ref.NamespaceID))
123+
ns, err := e.namespaceRegistry.GetNamespaceByID(namespace.ID(ref.NamespaceID))
110124
if err != nil {
111125
return fmt.Errorf("failed to get namespace by ID: %w", err)
112126
}
@@ -123,7 +137,7 @@ func (e InvocationTaskExecutor) Invoke(
123137

124138
callCtx, cancel := context.WithTimeout(
125139
ctx,
126-
e.Config.RequestTimeout(ns.Name().String(), taskAttr.Destination),
140+
e.config.RequestTimeout(ns.Name().String(), taskAttr.Destination),
127141
)
128142
defer cancel()
129143

@@ -138,7 +152,9 @@ func (e InvocationTaskExecutor) Invoke(
138152
}
139153

140154
type BackoffTaskExecutor struct {
141-
BackoffTaskExecutorOptions
155+
config *Config
156+
metricsHandler metrics.Handler
157+
logger log.Logger
142158
}
143159

144160
type BackoffTaskExecutorOptions struct {
@@ -151,7 +167,9 @@ type BackoffTaskExecutorOptions struct {
151167

152168
func NewBackoffTaskExecutor(opts BackoffTaskExecutorOptions) *BackoffTaskExecutor {
153169
return &BackoffTaskExecutor{
154-
BackoffTaskExecutorOptions: opts,
170+
config: opts.Config,
171+
metricsHandler: opts.MetricsHandler,
172+
logger: opts.Logger,
155173
}
156174
}
157175

chasm/lib/callback/executors_test.go

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -229,21 +229,19 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
229229
})
230230

231231
executor := InvocationTaskExecutor{
232-
InvocationTaskExecutorOptions: InvocationTaskExecutorOptions{
233-
Config: &Config{
234-
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
235-
RetryPolicy: func() backoff.RetryPolicy {
236-
return backoff.NewExponentialRetryPolicy(time.Second)
237-
},
238-
},
239-
NamespaceRegistry: nsRegistry,
240-
MetricsHandler: metricsHandler,
241-
Logger: logger,
242-
HTTPCallerProvider: func(nid queues.NamespaceIDAndDestination) HTTPCaller {
243-
return tc.caller
232+
config: &Config{
233+
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
234+
RetryPolicy: func() backoff.RetryPolicy {
235+
return backoff.NewExponentialRetryPolicy(time.Second)
244236
},
245-
ChasmEngine: mockEngine,
246237
},
238+
namespaceRegistry: nsRegistry,
239+
metricsHandler: metricsHandler,
240+
logger: logger,
241+
httpCallerProvider: func(nid queues.NamespaceIDAndDestination) HTTPCaller {
242+
return tc.caller
243+
},
244+
chasmEngine: mockEngine,
247245
}
248246

249247
// Create ComponentRef
@@ -318,15 +316,13 @@ func TestProcessBackoffTask(t *testing.T) {
318316
}
319317

320318
executor := BackoffTaskExecutor{
321-
BackoffTaskExecutorOptions: BackoffTaskExecutorOptions{
322-
Config: &Config{
323-
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
324-
RetryPolicy: func() backoff.RetryPolicy {
325-
return backoff.NewExponentialRetryPolicy(time.Second)
326-
},
319+
config: &Config{
320+
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
321+
RetryPolicy: func() backoff.RetryPolicy {
322+
return backoff.NewExponentialRetryPolicy(time.Second)
327323
},
328-
Logger: logger,
329324
},
325+
logger: logger,
330326
}
331327

332328
// Execute the backoff task
@@ -644,19 +640,17 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
644640
})
645641

646642
executor := InvocationTaskExecutor{
647-
InvocationTaskExecutorOptions: InvocationTaskExecutorOptions{
648-
Config: &Config{
649-
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
650-
RetryPolicy: func() backoff.RetryPolicy {
651-
return backoff.NewExponentialRetryPolicy(time.Second)
652-
},
643+
config: &Config{
644+
RequestTimeout: dynamicconfig.GetDurationPropertyFnFilteredByDestination(time.Second),
645+
RetryPolicy: func() backoff.RetryPolicy {
646+
return backoff.NewExponentialRetryPolicy(time.Second)
653647
},
654-
NamespaceRegistry: nsRegistry,
655-
MetricsHandler: metrics.NoopMetricsHandler,
656-
Logger: logger,
657-
HistoryClient: historyClient,
658-
ChasmEngine: mockEngine,
659648
},
649+
namespaceRegistry: nsRegistry,
650+
metricsHandler: metrics.NoopMetricsHandler,
651+
logger: logger,
652+
historyClient: historyClient,
653+
chasmEngine: mockEngine,
660654
}
661655

662656
// Create ComponentRef

chasm/lib/callback/nexus_invocation.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func (n nexusInvocation) Invoke(
6868
task *callbackspb.InvocationTask,
6969
taskAttr chasm.TaskAttributes,
7070
) invocationResult {
71-
if e.HTTPTraceProvider != nil {
72-
traceLogger := log.With(e.Logger,
71+
if e.httpTraceProvider != nil {
72+
traceLogger := log.With(e.logger,
7373
tag.WorkflowNamespace(ns.Name().String()),
7474
tag.Operation("CompleteNexusOperation"),
7575
tag.NewStringTag("destination", taskAttr.Destination),
@@ -78,7 +78,7 @@ func (n nexusInvocation) Invoke(
7878
tag.AttemptStart(time.Now().UTC()),
7979
tag.Attempt(n.attempt),
8080
)
81-
if trace := e.HTTPTraceProvider.NewTrace(n.attempt, traceLogger); trace != nil {
81+
if trace := e.httpTraceProvider.NewTrace(n.attempt, traceLogger); trace != nil {
8282
ctx = httptrace.WithClientTrace(ctx, trace)
8383
}
8484
}
@@ -96,7 +96,7 @@ func (n nexusInvocation) Invoke(
9696
request.Header.Set(k, v)
9797
}
9898

99-
caller := e.HTTPCallerProvider(queues.NamespaceIDAndDestination{
99+
caller := e.httpCallerProvider(queues.NamespaceIDAndDestination{
100100
NamespaceID: ns.ID().String(),
101101
Destination: taskAttr.Destination,
102102
})
@@ -107,12 +107,12 @@ func (n nexusInvocation) Invoke(
107107
namespaceTag := metrics.NamespaceTag(ns.Name().String())
108108
destTag := metrics.DestinationTag(taskAttr.Destination)
109109
statusCodeTag := metrics.OutcomeTag(outcomeTag(ctx, response, err))
110-
e.MetricsHandler.Counter(RequestCounter.Name()).Record(1, namespaceTag, destTag, statusCodeTag)
111-
e.MetricsHandler.Timer(RequestLatencyHistogram.Name()).Record(time.Since(startTime), namespaceTag, destTag, statusCodeTag)
110+
e.metricsHandler.Counter(RequestCounter.Name()).Record(1, namespaceTag, destTag, statusCodeTag)
111+
e.metricsHandler.Timer(RequestLatencyHistogram.Name()).Record(time.Since(startTime), namespaceTag, destTag, statusCodeTag)
112112

113113
if err != nil {
114-
e.Logger.Error("Callback request failed with error", tag.Error(err))
115-
return invocationResultRetry{err: err, retryPolicy: e.Config.RetryPolicy()}
114+
e.logger.Error("Callback request failed with error", tag.Error(err))
115+
return invocationResultRetry{err: err, retryPolicy: e.config.RetryPolicy()}
116116
}
117117

118118
if response.StatusCode >= 200 && response.StatusCode < 300 {
@@ -121,18 +121,18 @@ func (n nexusInvocation) Invoke(
121121
// propagate errors to the machine.
122122
if _, err = io.Copy(io.Discard, response.Body); err == nil {
123123
if err = response.Body.Close(); err != nil {
124-
e.Logger.Error("Callback request failed with error", tag.Error(err))
125-
return invocationResultRetry{err: err, retryPolicy: e.Config.RetryPolicy()}
124+
e.logger.Error("Callback request failed with error", tag.Error(err))
125+
return invocationResultRetry{err: err, retryPolicy: e.config.RetryPolicy()}
126126
}
127127
}
128128
return invocationResultOK{}
129129
}
130130

131131
retryable := isRetryableHTTPResponse(response)
132-
err = readHandlerErrFromResponse(response, e.Logger)
133-
e.Logger.Error("Callback request failed", tag.Error(err), tag.NewStringTag("status", response.Status), tag.NewBoolTag("retryable", retryable))
132+
err = readHandlerErrFromResponse(response, e.logger)
133+
e.logger.Error("Callback request failed", tag.Error(err), tag.NewStringTag("status", response.Status), tag.NewBoolTag("retryable", retryable))
134134
if retryable {
135-
return invocationResultRetry{err: err, retryPolicy: e.Config.RetryPolicy()}
135+
return invocationResultRetry{err: err, retryPolicy: e.config.RetryPolicy()}
136136
}
137137
return invocationResultFail{err}
138138
}

0 commit comments

Comments
 (0)