Skip to content

Commit 15af16c

Browse files
authored
Merge pull request #27 from famarting/optional-skip-wait-for-instance-start
grpc api: optional skip wait for instance start
2 parents fb03c12 + 37011da commit 15af16c

File tree

1 file changed

+22
-13
lines changed

1 file changed

+22
-13
lines changed

backend/executor.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,16 @@ type Executor interface {
4949
type grpcExecutor struct {
5050
protos.UnimplementedTaskHubSidecarServiceServer
5151

52-
workItemQueue chan *protos.WorkItem
53-
pendingOrchestrators *sync.Map // map[api.InstanceID]*ExecutionResults
54-
pendingActivities *sync.Map // map[string]*activityExecutionResult
55-
backend Backend
56-
logger Logger
57-
onWorkItemConnection func(context.Context) error
58-
onWorkItemDisconnect func(context.Context) error
59-
streamShutdownChan <-chan any
60-
streamSendTimeout *time.Duration
52+
workItemQueue chan *protos.WorkItem
53+
pendingOrchestrators *sync.Map // map[api.InstanceID]*ExecutionResults
54+
pendingActivities *sync.Map // map[string]*activityExecutionResult
55+
backend Backend
56+
logger Logger
57+
onWorkItemConnection func(context.Context) error
58+
onWorkItemDisconnect func(context.Context) error
59+
streamShutdownChan <-chan any
60+
streamSendTimeout *time.Duration
61+
skipWaitForInstanceStart bool
6162
}
6263

6364
type grpcExecutorOptions func(g *grpcExecutor)
@@ -98,6 +99,12 @@ func WithStreamSendTimeout(d time.Duration) grpcExecutorOptions {
9899
}
99100
}
100101

102+
func WithSkipWaitForInstanceStart() grpcExecutorOptions {
103+
return func(g *grpcExecutor) {
104+
g.skipWaitForInstanceStart = true
105+
}
106+
}
107+
101108
// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
102109
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
103110
grpcExecutor := &grpcExecutor{
@@ -506,12 +513,14 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
506513
},
507514
}
508515
if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
509-
return nil, err
516+
return nil, fmt.Errorf("failed to create orchestration instance: %w", err)
510517
}
511518

512-
_, err := g.WaitForInstanceStart(ctx, &protos.GetInstanceRequest{InstanceId: instanceID})
513-
if err != nil {
514-
return nil, err
519+
if !g.skipWaitForInstanceStart {
520+
_, err := g.WaitForInstanceStart(ctx, &protos.GetInstanceRequest{InstanceId: instanceID})
521+
if err != nil {
522+
return nil, err
523+
}
515524
}
516525

517526
return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil

0 commit comments

Comments
 (0)