Skip to content

Commit a7103a2

Browse files
authored
Merge pull request #50 from famarting/fix-disconnect-cb-not-called
fix on disconnect callback not being invoked
2 parents 854d3b4 + c8ef857 commit a7103a2

File tree

1 file changed

+28
-10
lines changed

1 file changed

+28
-10
lines changed

backend/executor.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,15 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
281281

282282
// There are some cases where the app may need to be notified when a client connects to fetch work items, like
283283
// for auto-starting the worker. The app also has an opportunity to set itself as unavailable by returning an error.
284-
callback := g.onWorkItemConnection
285-
if callback != nil {
286-
if err := callback(stream.Context()); err != nil {
287-
message := "unable to establish work item stream at this time: " + err.Error()
288-
g.logger.Warn(message)
289-
return status.Errorf(codes.Unavailable, message)
284+
if err := g.executeOnWorkItemConnection(stream.Context()); err != nil {
285+
message := "unable to establish work item stream at this time: " + err.Error()
286+
g.logger.Warn(message)
287+
288+
if derr := g.executeOnWorkItemDisconnect(stream.Context()); derr != nil {
289+
g.logger.Warnf("error while disconnecting work item stream: %v", derr)
290290
}
291+
292+
return status.Errorf(codes.Unavailable, message)
291293
}
292294

293295
defer func() {
@@ -313,10 +315,8 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
313315
}
314316
return true
315317
})
316-
if callback := g.onWorkItemDisconnect; callback != nil {
317-
if err := callback(stream.Context()); err != nil {
318-
g.logger.Warnf("error while disconnecting work item stream: %v", err)
319-
}
318+
if err := g.executeOnWorkItemDisconnect(stream.Context()); err != nil {
319+
g.logger.Warnf("error while disconnecting work item stream: %v", err)
320320
}
321321
}()
322322

@@ -397,6 +397,24 @@ func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkI
397397
}
398398
}
399399

400+
func (g *grpcExecutor) executeOnWorkItemConnection(ctx context.Context) error {
401+
if callback := g.onWorkItemConnection; callback != nil {
402+
if err := callback(ctx); err != nil {
403+
return err
404+
}
405+
}
406+
return nil
407+
}
408+
409+
func (g *grpcExecutor) executeOnWorkItemDisconnect(ctx context.Context) error {
410+
if callback := g.onWorkItemDisconnect; callback != nil {
411+
if err := callback(ctx); err != nil {
412+
return err
413+
}
414+
}
415+
return nil
416+
}
417+
400418
// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer
401419
func (g *grpcExecutor) CompleteOrchestratorTask(ctx context.Context, res *protos.OrchestratorResponse) (*protos.CompleteTaskResponse, error) {
402420
return emptyCompleteTaskResponse, g.backend.CompleteOrchestratorTask(ctx, res)

0 commit comments

Comments
 (0)