Skip to content

Commit 27c156a

Browse files
fretz12bergundydandavison
committed
Added standalone activity chasm dispatch task (#8540)
Added standalone activity Chasm tasks. Added handling of start activity and e2e implementation of standalone activity start execution with existing services. Updated protos related to standalone activities. The Chasm tasks are needed to kick off standalone activity execution via the existing services. Proto changes needed to so that the component ref can be passed and handled via service stack. - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [ ] added new unit test(s) - [ ] added new functional test(s) --------- Co-authored-by: Roey Berman <[email protected]> Co-authored-by: Dan Davison <[email protected]>
1 parent 8ae1c4d commit 27c156a

File tree

25 files changed

+752
-117
lines changed

25 files changed

+752
-117
lines changed

api/historyservice/v1/request_response.pb.go

Lines changed: 16 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/matchingservice/v1/request_response.pb.go

Lines changed: 16 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/persistence/v1/tasks.pb.go

Lines changed: 14 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

chasm/lib/activity/activity.go

Lines changed: 142 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,26 @@ package activity
22

33
import (
44
commonpb "go.temporal.io/api/common/v1"
5+
deploymentpb "go.temporal.io/api/deployment/v1"
6+
enumspb "go.temporal.io/api/enums/v1"
7+
historypb "go.temporal.io/api/history/v1"
8+
"go.temporal.io/api/serviceerror"
59
"go.temporal.io/api/workflowservice/v1"
610
"go.temporal.io/server/api/historyservice/v1"
11+
"go.temporal.io/server/api/matchingservice/v1"
12+
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
713
"go.temporal.io/server/chasm"
814
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
915
"google.golang.org/protobuf/types/known/timestamppb"
1016
)
1117

1218
type ActivityStore interface {
13-
PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, res *historyservice.RecordActivityTaskStartedResponse) error
19+
PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error
1420
RecordCompletion(ctx chasm.MutableContext) error
1521
}
1622

23+
// Activity component represents an activity execution persistence object and can be either standalone activity or one
24+
// embedded within a workflow.
1725
type Activity struct {
1826
chasm.UnimplementedComponent
1927

@@ -26,14 +34,21 @@ type Activity struct {
2634
// Standalone only
2735
RequestData chasm.Field[*activitypb.ActivityRequestData]
2836

29-
// Pointer to an implementation of the "store" (for a workflow activity this would be a parent pointer back to
30-
// the workflow).
37+
// Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to
38+
// the workflow. For a standalone activity this would be nil.
39+
// TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently.
3140
// TODO: figure out better naming.
3241
Store chasm.Field[ActivityStore]
3342
}
3443

44+
// RecordActivityTaskStartedParams holds parameters for RecordActivityTaskStarted
45+
type RecordActivityTaskStartedParams struct {
46+
VersionDirective *taskqueuespb.TaskVersionDirective
47+
WorkerIdentity string
48+
}
49+
3550
// LifecycleState TODO: we need to add more lifecycle states to better categorize some activity states, particulary for terminated/canceled.
36-
func (a Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
51+
func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
3752
switch a.Status {
3853
case activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED:
3954
return chasm.LifecycleStateCompleted
@@ -47,6 +62,7 @@ func (a Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
4762
}
4863
}
4964

65+
// NewStandaloneActivity creates a new activity component and adds associated tasks to start execution.
5066
func NewStandaloneActivity(
5167
ctx chasm.MutableContext,
5268
request *workflowservice.StartActivityExecutionRequest,
@@ -56,23 +72,32 @@ func NewStandaloneActivity(
5672
return nil, err
5773
}
5874

59-
return &Activity{
75+
// TODO flatten this when API is updated
76+
options := request.GetOptions()
77+
78+
activity := &Activity{
6079
ActivityState: &activitypb.ActivityState{
61-
ActivityType: request.ActivityType,
62-
ActivityOptions: request.Options,
63-
Priority: request.Priority,
80+
ActivityType: request.ActivityType,
81+
TaskQueue: options.GetTaskQueue(),
82+
ScheduleToCloseTimeout: options.GetScheduleToCloseTimeout(),
83+
ScheduleToStartTimeout: options.GetScheduleToStartTimeout(),
84+
StartToCloseTimeout: options.GetStartToCloseTimeout(),
85+
HeartbeatTimeout: options.GetHeartbeatTimeout(),
86+
RetryPolicy: options.GetRetryPolicy(),
87+
Priority: request.Priority,
6488
},
6589
Attempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{
66-
Count: 1,
67-
LastStartedTime: timestamppb.Now(),
90+
Count: 1,
6891
}),
6992
RequestData: chasm.NewDataField(ctx, &activitypb.ActivityRequestData{
7093
Input: request.Input,
7194
Header: request.Header,
7295
UserMetadata: request.UserMetadata,
7396
}),
7497
Visibility: chasm.NewComponentField(ctx, visibility),
75-
}, nil
98+
}
99+
100+
return activity, nil
76101
}
77102

78103
func NewEmbeddedActivity(
@@ -82,18 +107,121 @@ func NewEmbeddedActivity(
82107
) {
83108
}
84109

85-
func (a *Activity) PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, res *historyservice.RecordActivityTaskStartedResponse) error {
110+
func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID string) (*matchingservice.AddActivityTaskRequest, error) {
111+
// Get latest component ref and unmarshal into proto ref
112+
componentRef, err := ctx.Ref(a)
113+
if err != nil {
114+
return nil, err
115+
}
116+
117+
// Note: No need to set the vector clock here, as the components track version conflicts for read/write
118+
return &matchingservice.AddActivityTaskRequest{
119+
NamespaceId: namespaceID,
120+
TaskQueue: a.GetTaskQueue(),
121+
ScheduleToStartTimeout: a.GetScheduleToStartTimeout(),
122+
Priority: a.GetPriority(),
123+
ComponentRef: componentRef,
124+
}, nil
125+
}
126+
127+
// RecordActivityTaskStarted updates the activity on recording activity task started and populates the response.
128+
func (a *Activity) RecordActivityTaskStarted(ctx chasm.MutableContext, params RecordActivityTaskStartedParams) (*historyservice.RecordActivityTaskStartedResponse, error) {
129+
if err := TransitionStarted.Apply(a, ctx, nil); err != nil {
130+
return nil, err
131+
}
132+
133+
attempt, err := a.Attempt.Get(ctx)
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
attempt.LastStartedTime = timestamppb.New(ctx.Now(a))
139+
attempt.LastWorkerIdentity = params.WorkerIdentity
140+
141+
if versionDirective := params.VersionDirective.GetDeploymentVersion(); versionDirective != nil {
142+
attempt.LastDeploymentVersion = &deploymentpb.WorkerDeploymentVersion{
143+
BuildId: versionDirective.GetBuildId(),
144+
DeploymentName: versionDirective.GetDeploymentName(),
145+
}
146+
}
147+
86148
store, err := a.Store.Get(ctx)
149+
if err != nil {
150+
return nil, err
151+
}
152+
153+
activityRefBytes, err := ctx.Ref(a)
154+
if err != nil {
155+
return nil, err
156+
}
157+
158+
activityRef, err := chasm.DeserializeComponentRef(activityRefBytes)
159+
if err != nil {
160+
return nil, err
161+
}
162+
163+
response := &historyservice.RecordActivityTaskStartedResponse{}
164+
if store == nil {
165+
// TODO Get entity key from from context once we rebase on main
166+
if err := a.PopulateRecordActivityTaskStartedResponse(ctx, activityRef.EntityKey, response); err != nil {
167+
return nil, err
168+
}
169+
} else {
170+
if err := store.PopulateRecordActivityTaskStartedResponse(ctx, activityRef.EntityKey, response); err != nil {
171+
return nil, err
172+
}
173+
}
174+
175+
return response, nil
176+
}
177+
178+
func (a *Activity) PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error {
179+
attempt, err := a.Attempt.Get(ctx)
87180
if err != nil {
88181
return err
89182
}
90-
if err := store.PopulateRecordActivityTaskStartedResponse(ctx, res); err != nil {
183+
184+
lastHeartbeat, err := a.LastHeartbeat.Get(ctx)
185+
if err != nil {
91186
return err
92187
}
93-
// ...
188+
189+
requestData, err := a.RequestData.Get(ctx)
190+
if err != nil {
191+
return err
192+
}
193+
194+
response.StartedTime = attempt.LastStartedTime
195+
response.Attempt = attempt.GetCount()
196+
if lastHeartbeat != nil {
197+
response.HeartbeatDetails = lastHeartbeat.GetDetails()
198+
}
199+
response.Priority = a.GetPriority()
200+
response.RetryPolicy = a.GetRetryPolicy()
201+
response.ScheduledEvent = &historypb.HistoryEvent{
202+
EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED,
203+
Attributes: &historypb.HistoryEvent_ActivityTaskScheduledEventAttributes{
204+
ActivityTaskScheduledEventAttributes: &historypb.ActivityTaskScheduledEventAttributes{
205+
ActivityId: key.BusinessID,
206+
ActivityType: a.GetActivityType(),
207+
Input: requestData.GetInput(),
208+
Header: requestData.GetHeader(),
209+
TaskQueue: a.GetTaskQueue(),
210+
ScheduleToCloseTimeout: a.GetScheduleToCloseTimeout(),
211+
ScheduleToStartTimeout: a.GetScheduleToStartTimeout(),
212+
StartToCloseTimeout: a.GetStartToCloseTimeout(),
213+
HeartbeatTimeout: a.GetHeartbeatTimeout(),
214+
},
215+
},
216+
}
217+
94218
return nil
95219
}
96220

221+
func (a *Activity) RecordCompletion(_ chasm.MutableContext) error {
222+
return serviceerror.NewUnimplemented("RecordCompletion is not implemented")
223+
}
224+
97225
func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.Payloads) (chasm.NoValue, error) {
98226
a.LastHeartbeat = chasm.NewDataField(ctx, &activitypb.ActivityHeartbeatState{
99227
RecordedTime: timestamppb.New(ctx.Now(a)),

0 commit comments

Comments
 (0)