Skip to content

Commit 7ce67bd

Browse files
committed
Post-merge changes to standalone-activity (#8770)
All changes needed to make tests compile and pass after merging main into standalone-activity.
1 parent 36ea888 commit 7ce67bd

30 files changed

+308
-452
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type Engine interface {
4747
) ([]byte, error)
4848

4949
// NotifyExecution notifies any PollComponent callers waiting on the execution.
50-
NotifyExecution(EntityKey)
50+
NotifyExecution(ExecutionKey)
5151
}
5252

5353
type BusinessIDReusePolicy int

chasm/engine_mock.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/activity/activity.go

Lines changed: 46 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ import (
2525
)
2626

2727
type ActivityStore interface {
28-
// PopulateRecordStartedResponse populates the response for HandleStarted
29-
PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error
28+
// PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted
29+
PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error
3030

3131
// RecordCompleted applies the provided function to record activity completion
3232
RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
@@ -46,8 +46,9 @@ type Activity struct {
4646
// Standalone only
4747
RequestData chasm.Field[*activitypb.ActivityRequestData]
4848
Outcome chasm.Field[*activitypb.ActivityOutcome]
49-
// Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to
50-
// the workflow. For a standalone activity this would be nil.
49+
// Pointer to an implementation of the "store". For a workflow activity this would be a parent
50+
// pointer back to the workflow. For a standalone activity this is nil (Activity itself
51+
// implements the ActivityStore interface).
5152
// TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently.
5253
// TODO: figure out better naming.
5354
Store chasm.Field[ActivityStore]
@@ -72,10 +73,11 @@ func NewStandaloneActivity(
7273
ctx chasm.MutableContext,
7374
request *workflowservice.StartActivityExecutionRequest,
7475
) (*Activity, error) {
75-
visibility, err := chasm.NewVisibilityWithData(ctx, request.GetSearchAttributes().GetIndexedFields(), request.GetMemo().GetFields())
76-
if err != nil {
77-
return nil, err
78-
}
76+
visibility := chasm.NewVisibilityWithData(
77+
ctx,
78+
request.GetSearchAttributes().GetIndexedFields(),
79+
request.GetMemo().GetFields(),
80+
)
7981

8082
// TODO flatten this when API is updated
8183
options := request.GetOptions()
@@ -139,11 +141,7 @@ func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservi
139141
return nil, err
140142
}
141143

142-
attempt, err := a.LastAttempt.Get(ctx)
143-
if err != nil {
144-
return nil, err
145-
}
146-
144+
attempt := a.LastAttempt.Get(ctx)
147145
attempt.StartedTime = timestamppb.New(ctx.Now(a))
148146
attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity()
149147

@@ -153,47 +151,20 @@ func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservi
153151
DeploymentName: versionDirective.GetDeploymentName(),
154152
}
155153
}
156-
157-
store, err := a.Store.Get(ctx)
158-
if err != nil {
159-
return nil, err
160-
}
161-
162154
response := &historyservice.RecordActivityTaskStartedResponse{}
163-
if store == nil {
164-
if err := a.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
165-
return nil, err
166-
}
167-
} else {
168-
if err := store.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
169-
return nil, err
170-
}
171-
}
172-
173-
return response, nil
155+
err := a.StoreOrSelf(ctx).PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response)
156+
return response, err
174157
}
175158

176-
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error {
177-
attempt, err := a.LastAttempt.Get(ctx)
178-
if err != nil {
179-
return err
180-
}
181-
182-
lastHeartbeat, err := a.LastHeartbeat.Get(ctx)
183-
if err != nil {
184-
return err
185-
}
186-
187-
requestData, err := a.RequestData.Get(ctx)
188-
if err != nil {
189-
return err
190-
}
191-
192-
response.StartedTime = attempt.StartedTime
193-
response.Attempt = attempt.GetCount()
159+
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error {
160+
lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
194161
if lastHeartbeat != nil {
195162
response.HeartbeatDetails = lastHeartbeat.GetDetails()
196163
}
164+
requestData := a.RequestData.Get(ctx)
165+
attempt := a.LastAttempt.Get(ctx)
166+
response.StartedTime = attempt.StartedTime
167+
response.Attempt = attempt.GetCount()
197168
response.Priority = a.GetPriority()
198169
response.RetryPolicy = a.GetRetryPolicy()
199170
response.ScheduledEvent = &historypb.HistoryEvent{
@@ -212,7 +183,6 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.En
212183
},
213184
},
214185
}
215-
216186
return nil
217187
}
218188

@@ -285,18 +255,13 @@ func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.Te
285255

286256
// getLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
287257
// to avoid unnecessary writes when heartbeats are not used.
288-
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) (*activitypb.ActivityHeartbeatState, error) {
289-
heartbeat, err := a.LastHeartbeat.Get(ctx)
290-
if err != nil {
291-
return nil, err
292-
}
293-
294-
if heartbeat == nil {
258+
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState {
259+
heartbeat, ok := a.LastHeartbeat.TryGet(ctx)
260+
if !ok {
295261
heartbeat = &activitypb.ActivityHeartbeatState{}
296262
a.LastHeartbeat = chasm.NewDataField(ctx, heartbeat)
297263
}
298-
299-
return heartbeat, nil
264+
return heartbeat
300265
}
301266

302267
func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.RequestCancelActivityExecutionRequest) (
@@ -361,10 +326,7 @@ func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Fa
361326
// recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
362327
// set the outcome failure directly and leave the attempt failure as is.
363328
func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
364-
outcome, err := a.Outcome.Get(ctx)
365-
if err != nil {
366-
return err
367-
}
329+
outcome := a.Outcome.Get(ctx)
368330

369331
failure := &failurepb.Failure{
370332
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, timeoutType.String()),
@@ -393,16 +355,8 @@ func (a *Activity) recordFailedAttempt(
393355
failure *failurepb.Failure,
394356
noRetriesLeft bool,
395357
) error {
396-
outcome, err := a.Outcome.Get(ctx)
397-
if err != nil {
398-
return err
399-
}
400-
401-
attempt, err := a.LastAttempt.Get(ctx)
402-
if err != nil {
403-
return err
404-
}
405-
358+
outcome := a.Outcome.Get(ctx)
359+
attempt := a.LastAttempt.Get(ctx)
406360
currentTime := timestamppb.New(ctx.Now(a))
407361

408362
attempt.LastFailureDetails = &activitypb.ActivityAttemptState_LastFailureDetails{
@@ -419,37 +373,28 @@ func (a *Activity) recordFailedAttempt(
419373
} else {
420374
attempt.CurrentRetryInterval = durationpb.New(retryInterval)
421375
}
422-
423376
return nil
424377
}
425378

426379
func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
427380
if !TransitionRescheduled.Possible(a) {
428381
return false, 0, nil
429382
}
430-
431-
attempt, err := a.LastAttempt.Get(ctx)
432-
if err != nil {
433-
return false, 0, err
434-
}
383+
attempt := a.LastAttempt.Get(ctx)
435384
retryPolicy := a.RetryPolicy
436385

437386
enoughAttempts := retryPolicy.GetMaximumAttempts() == 0 || attempt.GetCount() < retryPolicy.GetMaximumAttempts()
438387
enoughTime, retryInterval, err := a.hasEnoughTimeForRetry(ctx, overridingRetryInterval)
439388
if err != nil {
440389
return false, 0, err
441390
}
442-
443391
return enoughAttempts && enoughTime, retryInterval, nil
444392
}
445393

446394
// hasEnoughTimeForRetry checks if there is enough time left in the schedule-to-close timeout. If sufficient time
447395
// remains, it will also return a valid retry interval
448396
func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
449-
attempt, err := a.LastAttempt.Get(ctx)
450-
if err != nil {
451-
return false, 0, err
452-
}
397+
attempt := a.LastAttempt.Get(ctx)
453398

454399
// Use overriding retry interval if provided, else calculate based on retry policy
455400
retryInterval := overridingRetryInterval
@@ -521,21 +466,9 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
521466
return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus())
522467
}
523468

524-
requestData, err := a.RequestData.Get(ctx)
525-
if err != nil {
526-
return nil, err
527-
}
528-
529-
attempt, err := a.LastAttempt.Get(ctx)
530-
if err != nil {
531-
return nil, err
532-
}
533-
534-
heartbeat, err := a.LastHeartbeat.Get(ctx)
535-
if err != nil {
536-
return nil, err
537-
}
538-
469+
requestData := a.RequestData.Get(ctx)
470+
attempt := a.LastAttempt.Get(ctx)
471+
heartbeat, _ := a.LastHeartbeat.TryGet(ctx)
539472
key := ctx.ExecutionKey()
540473

541474
info := &activity.ActivityExecutionInfo{
@@ -551,7 +484,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
551484
LastStartedTime: attempt.GetStartedTime(),
552485
LastWorkerIdentity: attempt.GetLastWorkerIdentity(),
553486
Priority: a.GetPriority(),
554-
RunId: key.EntityID,
487+
RunId: key.RunID,
555488
RunState: runState,
556489
ScheduledTime: a.GetScheduledTime(),
557490
Status: status,
@@ -582,25 +515,19 @@ func (a *Activity) buildPollActivityExecutionResponse(
582515

583516
var input *commonpb.Payloads
584517
if request.GetIncludeInput() {
585-
activityRequest, err := a.RequestData.Get(ctx)
586-
if err != nil {
587-
return nil, err
588-
}
518+
activityRequest := a.RequestData.Get(ctx)
589519
input = activityRequest.GetInput()
590520
}
591521

592522
response := &workflowservice.PollActivityExecutionResponse{
593523
Info: info,
594-
RunId: ctx.ExecutionKey().EntityID,
524+
RunId: ctx.ExecutionKey().RunID,
595525
Input: input,
596526
StateChangeLongPollToken: token,
597527
}
598528

599529
if request.GetIncludeOutcome() {
600-
activityOutcome, err := a.Outcome.Get(ctx)
601-
if err != nil {
602-
return nil, err
603-
}
530+
activityOutcome := a.Outcome.Get(ctx)
604531
// There are two places where a failure might be stored but only one place where a
605532
// successful outcome is stored.
606533
if successful := activityOutcome.GetSuccessful(); successful != nil {
@@ -618,10 +545,7 @@ func (a *Activity) buildPollActivityExecutionResponse(
618545
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)
619546

620547
if shouldHaveFailure {
621-
attempt, err := a.LastAttempt.Get(ctx)
622-
if err != nil {
623-
return nil, err
624-
}
548+
attempt := a.LastAttempt.Get(ctx)
625549
if details := attempt.GetLastFailureDetails(); details != nil {
626550
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
627551
Failure: details.GetFailure(),
@@ -635,3 +559,13 @@ func (a *Activity) buildPollActivityExecutionResponse(
635559
FrontendResponse: response,
636560
}, nil
637561
}
562+
563+
// StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone
564+
// activities), it returns the activity itself.
565+
func (a *Activity) StoreOrSelf(ctx chasm.MutableContext) ActivityStore {
566+
store, ok := a.Store.TryGet(ctx)
567+
if ok {
568+
return store
569+
}
570+
return a
571+
}

chasm/lib/activity/activity_tasks.go

Lines changed: 6 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,9 @@ func (e *activityDispatchTaskExecutor) Validate(
3232
_ chasm.TaskAttributes,
3333
task *activitypb.ActivityDispatchTask,
3434
) (bool, error) {
35-
attempt, err := activity.LastAttempt.Get(ctx)
36-
if err != nil {
37-
return false, err
38-
}
39-
4035
// TODO make sure we handle resets when we support them, as they will reset the attempt count
41-
if !TransitionStarted.Possible(activity) || task.Attempt != attempt.Count {
42-
return false, nil
43-
}
44-
45-
return true, nil
36+
return (TransitionStarted.Possible(activity) &&
37+
task.Attempt == activity.LastAttempt.Get(ctx).GetCount()), nil
4638
}
4739

4840
func (e *activityDispatchTaskExecutor) Execute(
@@ -78,13 +70,8 @@ func (e *scheduleToStartTimeoutTaskExecutor) Validate(
7870
_ chasm.TaskAttributes,
7971
task *activitypb.ScheduleToStartTimeoutTask,
8072
) (bool, error) {
81-
attempt, err := activity.LastAttempt.Get(ctx)
82-
if err != nil {
83-
return false, err
84-
}
85-
86-
valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && task.Attempt == attempt.Count
87-
return valid, nil
73+
return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED &&
74+
task.Attempt == activity.LastAttempt.Get(ctx).GetCount()), nil
8875
}
8976

9077
func (e *scheduleToStartTimeoutTaskExecutor) Execute(
@@ -132,12 +119,8 @@ func (e *startToCloseTimeoutTaskExecutor) Validate(
132119
_ chasm.TaskAttributes,
133120
task *activitypb.StartToCloseTimeoutTask,
134121
) (bool, error) {
135-
attempt, err := activity.LastAttempt.Get(ctx)
136-
if err != nil {
137-
return false, err
138-
}
139-
140-
valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && task.Attempt == attempt.Count
122+
valid := (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED &&
123+
task.Attempt == activity.LastAttempt.Get(ctx).GetCount())
141124
return valid, nil
142125
}
143126

chasm/lib/activity/fx.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,9 @@ var FrontendModule = fx.Module(
2828
fx.Provide(activitypb.NewActivityServiceLayeredClient),
2929
fx.Provide(NewFrontendHandler),
3030
fx.Provide(resource.SearchAttributeValidatorProvider),
31+
fx.Invoke(func(registry *chasm.Registry) error {
32+
// Frontend needs to register the component in order to serialize ComponentRefs, but doesn't
33+
// need task executors.
34+
return registry.Register(newComponentOnlyLibrary())
35+
}),
3136
)

0 commit comments

Comments
 (0)