Skip to content

Commit ca6e026

Browse files
Decouple Client from worker part of SDK (#1100)
1 parent 16d20b1 commit ca6e026

File tree

4 files changed

+47
-24
lines changed

4 files changed

+47
-24
lines changed

internal/activity.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,8 @@ type ServiceInvoker interface {
294294
// without detail.
295295
BackgroundHeartbeat() error
296296
Close(flushBufferedHeartbeat bool)
297-
GetClient(domain string, options *ClientOptions) Client
297+
298+
SignalWorkflow(ctx context.Context, domain, workflowID, runID, signalName string, signalInput []byte) error
298299
}
299300

300301
// WithActivityTask adds activity specific information into context.

internal/internal_task_handlers.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,8 +1812,8 @@ func (i *cadenceInvoker) Close(flushBufferedHeartbeat bool) {
18121812
}
18131813
}
18141814

1815-
func (i *cadenceInvoker) GetClient(domain string, options *ClientOptions) Client {
1816-
return NewClient(i.service, domain, options)
1815+
func (i *cadenceInvoker) SignalWorkflow(ctx context.Context, domain, workflowID, runID, signalName string, signalInput []byte) error {
1816+
return signalWorkflow(ctx, i.service, i.identity, domain, workflowID, runID, signalName, signalInput)
18171817
}
18181818

18191819
func newServiceInvoker(
@@ -1968,6 +1968,34 @@ func createNewDecision(decisionType s.DecisionType) *s.Decision {
19681968
DecisionType: common.DecisionTypePtr(decisionType),
19691969
}
19701970
}
1971+
func signalWorkflow(
1972+
ctx context.Context,
1973+
service workflowserviceclient.Interface,
1974+
identity string,
1975+
domain string,
1976+
workflowID string,
1977+
runID string,
1978+
signalName string,
1979+
signalInput []byte,
1980+
) error {
1981+
request := &s.SignalWorkflowExecutionRequest{
1982+
Domain: common.StringPtr(domain),
1983+
WorkflowExecution: &s.WorkflowExecution{
1984+
WorkflowId: common.StringPtr(workflowID),
1985+
RunId: getRunID(runID),
1986+
},
1987+
SignalName: common.StringPtr(signalName),
1988+
Input: signalInput,
1989+
Identity: common.StringPtr(identity),
1990+
}
1991+
1992+
return backoff.Retry(ctx,
1993+
func() error {
1994+
tchCtx, cancel, opt := newChannelContext(ctx)
1995+
defer cancel()
1996+
return service.SignalWorkflowExecution(tchCtx, request, opt...)
1997+
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
1998+
}
19711999

19722000
func recordActivityHeartbeat(
19732001
ctx context.Context,

internal/internal_workflow_client.go

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -326,24 +326,7 @@ func (wc *workflowClient) SignalWorkflow(ctx context.Context, workflowID string,
326326
if err != nil {
327327
return err
328328
}
329-
330-
request := &s.SignalWorkflowExecutionRequest{
331-
Domain: common.StringPtr(wc.domain),
332-
WorkflowExecution: &s.WorkflowExecution{
333-
WorkflowId: common.StringPtr(workflowID),
334-
RunId: getRunID(runID),
335-
},
336-
SignalName: common.StringPtr(signalName),
337-
Input: input,
338-
Identity: common.StringPtr(wc.identity),
339-
}
340-
341-
return backoff.Retry(ctx,
342-
func() error {
343-
tchCtx, cancel, opt := newChannelContext(ctx)
344-
defer cancel()
345-
return wc.workflowService.SignalWorkflowExecution(tchCtx, request, opt...)
346-
}, createDynamicServiceRetryPolicy(ctx), isServiceTransientError)
329+
return signalWorkflow(ctx, wc.workflowService, wc.identity, wc.domain, workflowID, runID, signalName, input)
347330
}
348331

349332
// SignalWithStartWorkflow sends a signal to a running workflow.

internal/session.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -540,9 +540,20 @@ func (env *sessionEnvironmentImpl) AddSessionToken() {
540540

541541
func (env *sessionEnvironmentImpl) SignalCreationResponse(ctx context.Context, sessionID string) error {
542542
activityEnv := getActivityEnv(ctx)
543-
client := activityEnv.serviceInvoker.GetClient(activityEnv.workflowDomain, &ClientOptions{})
544-
return client.SignalWorkflow(ctx, activityEnv.workflowExecution.ID, activityEnv.workflowExecution.RunID,
545-
sessionID, env.getCreationResponse())
543+
544+
signalInput, err := encodeArg(getDefaultDataConverter(), env.getCreationResponse())
545+
if err != nil {
546+
return err
547+
}
548+
549+
return activityEnv.serviceInvoker.SignalWorkflow(
550+
ctx,
551+
activityEnv.workflowDomain,
552+
activityEnv.workflowExecution.ID,
553+
activityEnv.workflowExecution.RunID,
554+
sessionID,
555+
signalInput,
556+
)
546557
}
547558

548559
func (env *sessionEnvironmentImpl) getCreationResponse() *sessionCreationResponse {

0 commit comments

Comments
 (0)