Skip to content

Commit b448f78

Browse files
committed
Standalone Activity: DescribeActivityExecution and GetActivityExecutionOutcome (#8771)
- Add new public gRPC methods `DescribeActivityExecution` and `GetActivityExecutionOutcome`. See temporalio/api#673 - These replace the previous `PollActivityExecution` - Respond to additional API changes: inlining of `ActivityOptions` - Configure quota for the new methods in their blocking and non-blocking forms - Update test suite - Implements agreed Standalone Activity design - [x] built - [x] covered by existing tests - [x] added new functional test(s)
1 parent 7ce67bd commit b448f78

30 files changed

+1228
-1023
lines changed

chasm/lib/activity/activity.go

Lines changed: 69 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -76,21 +76,18 @@ func NewStandaloneActivity(
7676
visibility := chasm.NewVisibilityWithData(
7777
ctx,
7878
request.GetSearchAttributes().GetIndexedFields(),
79-
request.GetMemo().GetFields(),
79+
nil,
8080
)
8181

82-
// TODO flatten this when API is updated
83-
options := request.GetOptions()
84-
8582
activity := &Activity{
8683
ActivityState: &activitypb.ActivityState{
8784
ActivityType: request.ActivityType,
88-
TaskQueue: options.GetTaskQueue(),
89-
ScheduleToCloseTimeout: options.GetScheduleToCloseTimeout(),
90-
ScheduleToStartTimeout: options.GetScheduleToStartTimeout(),
91-
StartToCloseTimeout: options.GetStartToCloseTimeout(),
92-
HeartbeatTimeout: options.GetHeartbeatTimeout(),
93-
RetryPolicy: options.GetRetryPolicy(),
85+
TaskQueue: request.GetTaskQueue(),
86+
ScheduleToCloseTimeout: request.GetScheduleToCloseTimeout(),
87+
ScheduleToStartTimeout: request.GetScheduleToStartTimeout(),
88+
StartToCloseTimeout: request.GetStartToCloseTimeout(),
89+
HeartbeatTimeout: request.GetHeartbeatTimeout(),
90+
RetryPolicy: request.GetRetryPolicy(),
9491
Priority: request.Priority,
9592
},
9693
LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{}),
@@ -103,7 +100,7 @@ func NewStandaloneActivity(
103100
Visibility: chasm.NewComponentField(ctx, visibility),
104101
}
105102

106-
activity.ScheduledTime = timestamppb.New(ctx.Now(activity))
103+
activity.ScheduleTime = timestamppb.New(ctx.Now(activity))
107104

108105
return activity, nil
109106
}
@@ -355,7 +352,6 @@ func (a *Activity) recordFailedAttempt(
355352
failure *failurepb.Failure,
356353
noRetriesLeft bool,
357354
) error {
358-
outcome := a.Outcome.Get(ctx)
359355
attempt := a.LastAttempt.Get(ctx)
360356
currentTime := timestamppb.New(ctx.Now(a))
361357

@@ -368,7 +364,6 @@ func (a *Activity) recordFailedAttempt(
368364
// If the activity has exhausted retries, mark the outcome failure as well but don't store duplicate failure info.
369365
// Also reset the retry interval as there won't be any more retries.
370366
if noRetriesLeft {
371-
outcome.Variant = &activitypb.ActivityOutcome_Failed_{}
372367
attempt.CurrentRetryInterval = nil
373368
} else {
374369
attempt.CurrentRetryInterval = durationpb.New(retryInterval)
@@ -407,7 +402,7 @@ func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInter
407402
return true, retryInterval, nil
408403
}
409404

410-
deadline := a.ScheduledTime.AsTime().Add(scheduleToClose)
405+
deadline := a.ScheduleTime.AsTime().Add(scheduleToClose)
411406
return ctx.Now(a).Add(retryInterval).Before(deadline), retryInterval, nil
412407
}
413408

@@ -486,83 +481,96 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
486481
Priority: a.GetPriority(),
487482
RunId: key.RunID,
488483
RunState: runState,
489-
ScheduledTime: a.GetScheduledTime(),
484+
ScheduleTime: a.GetScheduleTime(),
490485
Status: status,
491486
// TODO(dan): populate remaining fields
492487
}
493488

494489
return info, nil
495490
}
496491

497-
func (a *Activity) buildPollActivityExecutionResponse(
492+
func (a *Activity) buildDescribeActivityExecutionResponse(
498493
ctx chasm.Context,
499-
req *activitypb.PollActivityExecutionRequest,
500-
) (*activitypb.PollActivityExecutionResponse, error) {
494+
req *activitypb.DescribeActivityExecutionRequest,
495+
) (*activitypb.DescribeActivityExecutionResponse, error) {
501496
request := req.GetFrontendRequest()
502497

503498
token, err := ctx.Ref(a)
504499
if err != nil {
505500
return nil, err
506501
}
507502

508-
var info *activity.ActivityExecutionInfo
509-
if request.GetIncludeInfo() {
510-
info, err = a.buildActivityExecutionInfo(ctx)
511-
if err != nil {
512-
return nil, err
513-
}
503+
info, err := a.buildActivityExecutionInfo(ctx)
504+
if err != nil {
505+
return nil, err
514506
}
515507

516508
var input *commonpb.Payloads
517509
if request.GetIncludeInput() {
518-
activityRequest := a.RequestData.Get(ctx)
519-
input = activityRequest.GetInput()
510+
input = a.RequestData.Get(ctx).GetInput()
520511
}
521512

522-
response := &workflowservice.PollActivityExecutionResponse{
523-
Info: info,
524-
RunId: ctx.ExecutionKey().RunID,
525-
Input: input,
526-
StateChangeLongPollToken: token,
513+
response := &workflowservice.DescribeActivityExecutionResponse{
514+
Info: info,
515+
RunId: ctx.ExecutionKey().RunID,
516+
Input: input,
517+
LongPollToken: token,
527518
}
528519

529520
if request.GetIncludeOutcome() {
530-
activityOutcome := a.Outcome.Get(ctx)
531-
// There are two places where a failure might be stored but only one place where a
532-
// successful outcome is stored.
533-
if successful := activityOutcome.GetSuccessful(); successful != nil {
534-
response.Outcome = &workflowservice.PollActivityExecutionResponse_Result{
535-
Result: successful.GetOutput(),
536-
}
537-
} else if failure := activityOutcome.GetFailed().GetFailure(); failure != nil {
538-
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
539-
Failure: failure,
540-
}
541-
} else {
542-
shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED ||
543-
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT ||
544-
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED ||
545-
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)
546-
547-
if shouldHaveFailure {
548-
attempt := a.LastAttempt.Get(ctx)
549-
if details := attempt.GetLastFailureDetails(); details != nil {
550-
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
551-
Failure: details.GetFailure(),
552-
}
553-
}
554-
}
555-
}
521+
response.Outcome = a.outcome(ctx)
556522
}
557523

558-
return &activitypb.PollActivityExecutionResponse{
524+
return &activitypb.DescribeActivityExecutionResponse{
559525
FrontendResponse: response,
560526
}, nil
561527
}
562528

563-
// StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone
564-
// activities), it returns the activity itself.
565-
func (a *Activity) StoreOrSelf(ctx chasm.MutableContext) ActivityStore {
529+
func (a *Activity) buildGetActivityExecutionOutcomeResponse(
530+
ctx chasm.Context,
531+
) (*activitypb.GetActivityExecutionOutcomeResponse, error) {
532+
return &activitypb.GetActivityExecutionOutcomeResponse{
533+
FrontendResponse: &workflowservice.GetActivityExecutionOutcomeResponse{
534+
RunId: ctx.ExecutionKey().RunID,
535+
Outcome: a.outcome(ctx),
536+
},
537+
}, nil
538+
}
539+
540+
// outcome retrieves the activity outcome (result or failure) if the activity has completed.
541+
// Returns nil if the activity has not completed.
542+
func (a *Activity) outcome(ctx chasm.Context) *activity.ActivityExecutionOutcome {
543+
activityOutcome := a.Outcome.Get(ctx)
544+
// Check for successful outcome
545+
if successful := activityOutcome.GetSuccessful(); successful != nil {
546+
return &activity.ActivityExecutionOutcome{
547+
Value: &activity.ActivityExecutionOutcome_Result{Result: successful.GetOutput()},
548+
}
549+
}
550+
// Check for failure in outcome
551+
if failure := activityOutcome.GetFailed().GetFailure(); failure != nil {
552+
return &activity.ActivityExecutionOutcome{
553+
Value: &activity.ActivityExecutionOutcome_Failure{Failure: failure},
554+
}
555+
}
556+
// Check for failure in last attempt details
557+
shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED ||
558+
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT ||
559+
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED ||
560+
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)
561+
if shouldHaveFailure {
562+
if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil {
563+
return &activity.ActivityExecutionOutcome{
564+
Value: &activity.ActivityExecutionOutcome_Failure{Failure: details.GetFailure()},
565+
}
566+
}
567+
}
568+
return nil
569+
}
570+
571+
// StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g.
572+
// standalone activities), it returns the activity itself.
573+
func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore {
566574
store, ok := a.Store.TryGet(ctx)
567575
if ok {
568576
return store

chasm/lib/activity/frontend.go

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55

66
"github.com/google/uuid"
7+
apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas
78
commonpb "go.temporal.io/api/common/v1"
89
"go.temporal.io/api/serviceerror"
910
"go.temporal.io/api/workflowservice/v1"
@@ -19,9 +20,10 @@ import (
1920

2021
type FrontendHandler interface {
2122
StartActivityExecution(ctx context.Context, req *workflowservice.StartActivityExecutionRequest) (*workflowservice.StartActivityExecutionResponse, error)
23+
DescribeActivityExecution(ctx context.Context, req *workflowservice.DescribeActivityExecutionRequest) (*workflowservice.DescribeActivityExecutionResponse, error)
24+
GetActivityExecutionOutcome(ctx context.Context, req *workflowservice.GetActivityExecutionOutcomeRequest) (*workflowservice.GetActivityExecutionOutcomeResponse, error)
2225
CountActivityExecutions(context.Context, *workflowservice.CountActivityExecutionsRequest) (*workflowservice.CountActivityExecutionsResponse, error)
2326
DeleteActivityExecution(context.Context, *workflowservice.DeleteActivityExecutionRequest) (*workflowservice.DeleteActivityExecutionResponse, error)
24-
PollActivityExecution(context.Context, *workflowservice.PollActivityExecutionRequest) (*workflowservice.PollActivityExecutionResponse, error)
2527
ListActivityExecutions(context.Context, *workflowservice.ListActivityExecutionsRequest) (*workflowservice.ListActivityExecutionsResponse, error)
2628
RequestCancelActivityExecution(context.Context, *workflowservice.RequestCancelActivityExecutionRequest) (*workflowservice.RequestCancelActivityExecutionResponse, error)
2729
TerminateActivityExecution(context.Context, *workflowservice.TerminateActivityExecutionRequest) (*workflowservice.TerminateActivityExecutionResponse, error)
@@ -87,14 +89,13 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf
8789
return resp.GetFrontendResponse(), err
8890
}
8991

90-
// PollActivityExecution handles PollActivityExecutionRequest. This method supports querying current
91-
// activity state, optionally as a long-poll that waits for certain state changes. It is used by
92-
// clients to poll for activity state and/or result.
93-
func (h *frontendHandler) PollActivityExecution(
92+
// DescribeActivityExecution queries current activity state, optionally as a long-poll that waits
93+
// for any state change.
94+
func (h *frontendHandler) DescribeActivityExecution(
9495
ctx context.Context,
95-
req *workflowservice.PollActivityExecutionRequest,
96-
) (*workflowservice.PollActivityExecutionResponse, error) {
97-
err := ValidatePollActivityExecutionRequest(
96+
req *workflowservice.DescribeActivityExecutionRequest,
97+
) (*workflowservice.DescribeActivityExecutionResponse, error) {
98+
err := ValidateDescribeActivityExecutionRequest(
9899
req,
99100
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
100101
)
@@ -106,7 +107,31 @@ func (h *frontendHandler) PollActivityExecution(
106107
if err != nil {
107108
return nil, err
108109
}
109-
resp, err := h.client.PollActivityExecution(ctx, &activitypb.PollActivityExecutionRequest{
110+
111+
resp, err := h.client.DescribeActivityExecution(ctx, &activitypb.DescribeActivityExecutionRequest{
112+
NamespaceId: namespaceID.String(),
113+
FrontendRequest: req,
114+
})
115+
return resp.GetFrontendResponse(), err
116+
}
117+
118+
// GetActivityExecutionOutcome long-polls for activity outcome.
119+
func (h *frontendHandler) GetActivityExecutionOutcome(
120+
ctx context.Context,
121+
req *workflowservice.GetActivityExecutionOutcomeRequest,
122+
) (*workflowservice.GetActivityExecutionOutcomeResponse, error) {
123+
err := ValidateGetActivityExecutionOutcomeRequest(
124+
req,
125+
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
126+
)
127+
if err != nil {
128+
return nil, err
129+
}
130+
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
131+
if err != nil {
132+
return nil, err
133+
}
134+
resp, err := h.client.GetActivityExecutionOutcome(ctx, &activitypb.GetActivityExecutionOutcomeRequest{
110135
NamespaceId: namespaceID.String(),
111136
FrontendRequest: req,
112137
})
@@ -193,23 +218,25 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
193218
req = common.CloneProto(req)
194219
activityType := req.ActivityType.GetName()
195220

196-
if req.Options.RetryPolicy == nil {
197-
req.Options.RetryPolicy = &commonpb.RetryPolicy{}
221+
if req.RetryPolicy == nil {
222+
req.RetryPolicy = &commonpb.RetryPolicy{}
198223
}
199224

225+
opts := activityOptionsFromStartRequest(req)
200226
err := ValidateAndNormalizeActivityAttributes(
201227
req.ActivityId,
202228
activityType,
203229
dynamicconfig.DefaultActivityRetryPolicy.Get(h.dc),
204230
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
205231
namespaceID,
206-
req.Options,
232+
opts,
207233
req.Priority,
208234
durationpb.New(0),
209235
)
210236
if err != nil {
211237
return nil, err
212238
}
239+
applyActivityOptionsToStartRequest(opts, req)
213240

214241
err = validateAndNormalizeStartActivityExecutionRequest(
215242
req,
@@ -224,3 +251,27 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
224251

225252
return req, nil
226253
}
254+
255+
// activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields
256+
// of a StartActivityExecutionRequest for use with shared validation logic.
257+
func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions {
258+
return &apiactivitypb.ActivityOptions{
259+
TaskQueue: req.TaskQueue,
260+
ScheduleToCloseTimeout: req.ScheduleToCloseTimeout,
261+
ScheduleToStartTimeout: req.ScheduleToStartTimeout,
262+
StartToCloseTimeout: req.StartToCloseTimeout,
263+
HeartbeatTimeout: req.HeartbeatTimeout,
264+
RetryPolicy: req.RetryPolicy,
265+
}
266+
}
267+
268+
// applyActivityOptionsToStartRequest copies normalized values from ActivityOptions
269+
// back to the StartActivityExecutionRequest.
270+
func applyActivityOptionsToStartRequest(opts *apiactivitypb.ActivityOptions, req *workflowservice.StartActivityExecutionRequest) {
271+
req.TaskQueue = opts.TaskQueue
272+
req.ScheduleToCloseTimeout = opts.ScheduleToCloseTimeout
273+
req.ScheduleToStartTimeout = opts.ScheduleToStartTimeout
274+
req.StartToCloseTimeout = opts.StartToCloseTimeout
275+
req.HeartbeatTimeout = opts.HeartbeatTimeout
276+
req.RetryPolicy = opts.RetryPolicy
277+
}

chasm/lib/activity/gen/activitypb/v1/activity_state.pb.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)