Skip to content

Add support for Ephemeral TaskLists #1449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
946 changes: 940 additions & 6 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion internal/activity_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newActivityTaskHandlerWithCustomProvider(
}
return &activityTaskHandlerImpl{
clock: clock,
taskListName: params.TaskList,
taskListName: params.TaskList.GetName(),
identity: params.Identity,
service: service,
logger: params.Logger,
Expand Down
1 change: 1 addition & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ func getFeatureFlags(options *ClientOptions) FeatureFlags {
return FeatureFlags{
WorkflowExecutionAlreadyCompletedErrorEnabled: options.FeatureFlags.WorkflowExecutionAlreadyCompletedErrorEnabled,
PollerAutoScalerEnabled: options.FeatureFlags.PollerAutoScalerEnabled,
EphemeralTaskListsEnabled: options.FeatureFlags.EphemeralTaskListsEnabled,
}
}
return FeatureFlags{}
Expand Down
1 change: 1 addition & 0 deletions internal/compatibility/enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func TestTaskListKind(t *testing.T) {
apiv1.TaskListKind_TASK_LIST_KIND_INVALID,
apiv1.TaskListKind_TASK_LIST_KIND_NORMAL,
apiv1.TaskListKind_TASK_LIST_KIND_STICKY,
apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL,
} {
assert.Equal(t, item, proto.TaskListKind(thrift.TaskListKind(item)))
}
Expand Down
2 changes: 2 additions & 0 deletions internal/compatibility/proto/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func TaskListKind(t *shared.TaskListKind) apiv1.TaskListKind {
return apiv1.TaskListKind_TASK_LIST_KIND_NORMAL
case shared.TaskListKindSticky:
return apiv1.TaskListKind_TASK_LIST_KIND_STICKY
case shared.TaskListKindEphemeral:
return apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL
}
panic("unexpected enum value")
}
Expand Down
2 changes: 2 additions & 0 deletions internal/compatibility/thrift/enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func TaskListKind(t apiv1.TaskListKind) *shared.TaskListKind {
return shared.TaskListKindNormal.Ptr()
case apiv1.TaskListKind_TASK_LIST_KIND_STICKY:
return shared.TaskListKindSticky.Ptr()
case apiv1.TaskListKind_TASK_LIST_KIND_EPHEMERAL:
return shared.TaskListKindEphemeral.Ptr()
}
panic("unexpected enum value")
}
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
activityOptions struct {
ActivityID *string // Users can choose IDs but our framework makes it optional to decrease the crust.
TaskListName string
TaskListKind shared.TaskListKind
ScheduleToCloseTimeoutSeconds int32
ScheduleToStartTimeoutSeconds int32
StartToCloseTimeoutSeconds int32
Expand Down
9 changes: 8 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type (
contextPropagators []ContextPropagator
tracer opentracing.Tracer
workflowInterceptorFactories []WorkflowInterceptorFactory
featureFlags FeatureFlags
}

localActivityTask struct {
Expand Down Expand Up @@ -205,6 +206,7 @@ func newWorkflowExecutionEventHandler(
contextPropagators []ContextPropagator,
tracer opentracing.Tracer,
workflowInterceptorFactories []WorkflowInterceptorFactory,
featureFlags FeatureFlags,
) workflowExecutionEventHandler {
context := &workflowEnvironmentImpl{
workflowInfo: workflowInfo,
Expand All @@ -222,6 +224,7 @@ func newWorkflowExecutionEventHandler(
contextPropagators: contextPropagators,
tracer: tracer,
workflowInterceptorFactories: workflowInterceptorFactories,
featureFlags: featureFlags,
}
context.logger = logger.With(
zapcore.Field{Key: tagWorkflowType, Type: zapcore.StringType, String: workflowInfo.WorkflowType.Name},
Expand Down Expand Up @@ -472,7 +475,7 @@ func (wc *workflowEnvironmentImpl) ExecuteActivity(parameters executeActivityPar
}
activityID := scheduleTaskAttr.GetActivityId()
scheduleTaskAttr.ActivityType = activityTypePtr(parameters.ActivityType)
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName)})
scheduleTaskAttr.TaskList = common.TaskListPtr(m.TaskList{Name: common.StringPtr(parameters.TaskListName), Kind: parameters.TaskListKind.Ptr()})
scheduleTaskAttr.Input = parameters.Input
scheduleTaskAttr.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(parameters.ScheduleToCloseTimeoutSeconds)
scheduleTaskAttr.StartToCloseTimeoutSeconds = common.Int32Ptr(parameters.StartToCloseTimeoutSeconds)
Expand Down Expand Up @@ -804,6 +807,10 @@ func (wc *workflowEnvironmentImpl) GetWorkflowInterceptors() []WorkflowIntercept
return wc.workflowInterceptorFactories
}

func (wc *workflowEnvironmentImpl) GetFeatureFlags() FeatureFlags {
return wc.featureFlags
}

func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
event *m.HistoryEvent,
isReplay bool,
Expand Down
1 change: 1 addition & 0 deletions internal/internal_event_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workfl
nil,
opentracing.NoopTracer{},
nil,
FeatureFlags{},
).(*workflowExecutionEventHandlerImpl)
}

Expand Down
3 changes: 3 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type (
tracer opentracing.Tracer
workflowInterceptorFactories []WorkflowInterceptorFactory
disableStrictNonDeterminism bool
featureFlags FeatureFlags
}

activityProvider func(name string) activity
Expand Down Expand Up @@ -419,6 +420,7 @@ func newWorkflowTaskHandler(
tracer: params.Tracer,
workflowInterceptorFactories: params.WorkflowInterceptorChainFactories,
disableStrictNonDeterminism: params.WorkerBugPorts.DisableStrictNonDeterminismCheck,
featureFlags: params.FeatureFlags,
}

traceLog(func() {
Expand Down Expand Up @@ -622,6 +624,7 @@ func (w *workflowExecutionContextImpl) createEventHandler() {
w.wth.contextPropagators,
w.wth.tracer,
w.wth.workflowInterceptorFactories,
w.wth.featureFlags,
)
w.eventHandler.Store(eventHandler)
}
Expand Down
Loading
Loading