Skip to content

Commit 33955d2

Browse files
committed
Add support for Ephemeral TaskLists
Update Sessions to use Ephemeral TaskLists behind a feature flag. This ensures that the per-host TaskList is automatically removed once it is no longer used. This should only be enabled once the server fully supports Ephemeral TaskLists as it will otherwise return errors for the unknown TaskListKind.
1 parent 97eba63 commit 33955d2

27 files changed

+1243
-115
lines changed

.gen/go/shared/shared.go

Lines changed: 940 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/activity_task_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func newActivityTaskHandlerWithCustomProvider(
8080
}
8181
return &activityTaskHandlerImpl{
8282
clock: clock,
83-
taskListName: params.TaskList,
83+
taskListName: params.TaskList.GetName(),
8484
identity: params.Identity,
8585
service: service,
8686
logger: params.Logger,

internal/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags {
656656
return FeatureFlags{
657657
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
658658
PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled,
659+
EphemeralTaskListsEnabled: options.FeatureFlags.EphemeralTaskListsEnabled,
659660
}
660661
}
661662
return FeatureFlags{}

internal/compatibility/enum_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ func TestTaskListKind(t *testing.T) {
266266
apiv1.TaskListKind_TASK_LIST_KIND_INVALID,
267267
apiv1.TaskListKind_TASK_LIST_KIND_NORMAL,
268268
apiv1.TaskListKind_TASK_LIST_KIND_STICKY,
269+
apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL,
269270
} {
270271
assert.Equal(t, item, proto.TaskListKind(thrift.TaskListKind(item)))
271272
}

internal/compatibility/proto/enum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ func TaskListKind(t *shared.TaskListKind) apiv1.TaskListKind {
3535
return apiv1.TaskListKind_TASK_LIST_KIND_NORMAL
3636
case shared.TaskListKindSticky:
3737
return apiv1.TaskListKind_TASK_LIST_KIND_STICKY
38+
case shared.TaskListKindEphemeral:
39+
return apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL
3840
}
3941
panic("unexpected enum value")
4042
}

internal/compatibility/thrift/enum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func TaskListKind(t apiv1.TaskListKind) *shared.TaskListKind {
3434
return shared.TaskListKindNormal.Ptr()
3535
case apiv1.TaskListKind_TASK_LIST_KIND_STICKY:
3636
return shared.TaskListKindSticky.Ptr()
37+
case apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL:
38+
return shared.TaskListKindEphemeral.Ptr()
3739
}
3840
panic("unexpected enum value")
3941
}

internal/internal_activity.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type (
5959
activityOptions struct {
6060
ActivityID *string // Users can choose IDs but our framework makes it optional to decrease the crust.
6161
TaskListName string
62+
TaskListKind shared.TaskListKind
6263
ScheduleToCloseTimeoutSeconds int32
6364
ScheduleToStartTimeoutSeconds int32
6465
StartToCloseTimeoutSeconds int32

internal/internal_event_handlers.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type (
121121
contextPropagators []ContextPropagator
122122
tracer opentracing.Tracer
123123
workflowInterceptorFactories []WorkflowInterceptorFactory
124+
featureFlags FeatureFlags
124125
}
125126

126127
localActivityTask struct {
@@ -205,6 +206,7 @@ func newWorkflowExecutionEventHandler(
205206
contextPropagators []ContextPropagator,
206207
tracer opentracing.Tracer,
207208
workflowInterceptorFactories []WorkflowInterceptorFactory,
209+
featureFlags FeatureFlags,
208210
) workflowExecutionEventHandler {
209211
context := &workflowEnvironmentImpl{
210212
workflowInfo: workflowInfo,
@@ -222,6 +224,7 @@ func newWorkflowExecutionEventHandler(
222224
contextPropagators: contextPropagators,
223225
tracer: tracer,
224226
workflowInterceptorFactories: workflowInterceptorFactories,
227+
featureFlags: featureFlags,
225228
}
226229
context.logger = logger.With(
227230
zapcore.Field{Key: tagWorkflowType, Type: zapcore.StringType, String: workflowInfo.WorkflowType.Name},
@@ -472,7 +475,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar
472475
}
473476
activityID := scheduleTaskAttr.GetActivityId()
474477
scheduleTaskAttr.ActivityType = activityTypePtr(parameters.ActivityType)
475-
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName)})
478+
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName), Kind: parameters.TaskListKind.Ptr()})
476479
scheduleTaskAttr.Input = parameters.Input
477480
scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToCloseTimeoutSeconds)
478481
scheduleTaskAttr.StartToCloseTimeoutSeconds = common.Int32Ptr(parameters.StartToCloseTimeoutSeconds)
@@ -804,6 +807,10 @@ func (wc *workflowEnvironmentImpl) GetWorkflowInterceptors() []WorkflowIntercept
804807
return wc.workflowInterceptorFactories
805808
}
806809

810+
func (wc *workflowEnvironmentImpl) GetFeatureFlags() FeatureFlags {
811+
return wc.featureFlags
812+
}
813+
807814
func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
808815
event *m.HistoryEvent,
809816
isReplay bool,

internal/internal_event_handlers_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,6 +1065,7 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl
10651065
nil,
10661066
opentracing.NoopTracer{},
10671067
nil,
1068+
FeatureFlags{},
10681069
).(*workflowExecutionEventHandlerImpl)
10691070
}
10701071

0 commit comments

Comments
 (0)