Skip to content

Commit 4e1ae76

Browse files
committed
Add support for Ephemeral TaskLists
Update Sessions to use Ephemeral TaskLists behind a feature flag. This ensures that the per-host TaskList is automatically removed once it is no longer used. This should only be enabled once the server fully supports Ephemeral TaskLists as it will otherwise return errors for the unknown TaskListKind.
1 parent 97eba63 commit 4e1ae76

25 files changed

+1237
-115
lines changed

.gen/go/shared/shared.go

Lines changed: 940 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/activity_task_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func newActivityTaskHandlerWithCustomProvider(
8080
}
8181
return &activityTaskHandlerImpl{
8282
clock: clock,
83-
taskListName: params.TaskList,
83+
taskListName: params.TaskList.GetName(),
8484
identity: params.Identity,
8585
service: service,
8686
logger: params.Logger,

internal/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags {
656656
return FeatureFlags{
657657
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
658658
PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled,
659+
EphemeralTaskListsEnabled: options.FeatureFlags.EphemeralTaskListsEnabled,
659660
}
660661
}
661662
return FeatureFlags{}

internal/compatibility/enum_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ func TestTaskListKind(t *testing.T) {
266266
apiv1.TaskListKind_TASK_LIST_KIND_INVALID,
267267
apiv1.TaskListKind_TASK_LIST_KIND_NORMAL,
268268
apiv1.TaskListKind_TASK_LIST_KIND_STICKY,
269+
apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL,
269270
} {
270271
assert.Equal(t, item, proto.TaskListKind(thrift.TaskListKind(item)))
271272
}

internal/compatibility/proto/enum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ func TaskListKind(t *shared.TaskListKind) apiv1.TaskListKind {
3535
return apiv1.TaskListKind_TASK_LIST_KIND_NORMAL
3636
case shared.TaskListKindSticky:
3737
return apiv1.TaskListKind_TASK_LIST_KIND_STICKY
38+
case shared.TaskListKindEphemeral:
39+
return apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL
3840
}
3941
panic("unexpected enum value")
4042
}

internal/compatibility/thrift/enum.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ func TaskListKind(t apiv1.TaskListKind) *shared.TaskListKind {
3434
return shared.TaskListKindNormal.Ptr()
3535
case apiv1.TaskListKind_TASK_LIST_KIND_STICKY:
3636
return shared.TaskListKindSticky.Ptr()
37+
case apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL:
38+
return shared.TaskListKindEphemeral.Ptr()
3739
}
3840
panic("unexpected enum value")
3941
}

internal/internal_activity.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type (
5959
activityOptions struct {
6060
ActivityID *string // Users can choose IDs but our framework makes it optional to decrease the crust.
6161
TaskListName string
62+
TaskListKind shared.TaskListKind
6263
ScheduleToCloseTimeoutSeconds int32
6364
ScheduleToStartTimeoutSeconds int32
6465
StartToCloseTimeoutSeconds int32

internal/internal_event_handlers.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type (
121121
contextPropagators []ContextPropagator
122122
tracer opentracing.Tracer
123123
workflowInterceptorFactories []WorkflowInterceptorFactory
124+
featureFlags *FeatureFlags
124125
}
125126

126127
localActivityTask struct {
@@ -472,7 +473,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar
472473
}
473474
activityID := scheduleTaskAttr.GetActivityId()
474475
scheduleTaskAttr.ActivityType = activityTypePtr(parameters.ActivityType)
475-
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName)})
476+
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName), Kind: parameters.TaskListKind.Ptr()})
476477
scheduleTaskAttr.Input = parameters.Input
477478
scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToCloseTimeoutSeconds)
478479
scheduleTaskAttr.StartToCloseTimeoutSeconds = common.Int32Ptr(parameters.StartToCloseTimeoutSeconds)
@@ -804,6 +805,10 @@ func (wc *workflowEnvironmentImpl) GetWorkflowInterceptors() []WorkflowIntercept
804805
return wc.workflowInterceptorFactories
805806
}
806807

808+
func (wc *workflowEnvironmentImpl) GetFeatureFlags() FeatureFlags {
809+
return *wc.featureFlags
810+
}
811+
807812
func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
808813
event *m.HistoryEvent,
809814
isReplay bool,

internal/internal_task_handlers_test.go

Lines changed: 67 additions & 67 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)