@@ -281,13 +281,15 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
281
281
282
282
// There are some cases where the app may need to be notified when a client connects to fetch work items, like
283
283
// for auto-starting the worker. The app also has an opportunity to set itself as unavailable by returning an error.
284
- callback := g .onWorkItemConnection
285
- if callback != nil {
286
- if err := callback ( stream . Context ()); err != nil {
287
- message := "unable to establish work item stream at this time: " + err . Error ()
288
- g . logger . Warn ( message )
289
- return status . Errorf ( codes . Unavailable , message )
284
+ if err := g .executeOoWorkItemConnection ( stream . Context ()); err != nil {
285
+ message := "unable to establish work item stream at this time: " + err . Error ()
286
+ g . logger . Warn ( message )
287
+
288
+ if derr := g . executeOnWorkItemDisconnect ( stream . Context ()); derr != nil {
289
+ g . logger . Warnf ( "error while disconnecting work item stream: %v" , derr )
290
290
}
291
+
292
+ return status .Errorf (codes .Unavailable , message )
291
293
}
292
294
293
295
defer func () {
@@ -313,10 +315,8 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
313
315
}
314
316
return true
315
317
})
316
- if callback := g .onWorkItemDisconnect ; callback != nil {
317
- if err := callback (stream .Context ()); err != nil {
318
- g .logger .Warnf ("error while disconnecting work item stream: %v" , err )
319
- }
318
+ if err := g .executeOnWorkItemDisconnect (stream .Context ()); err != nil {
319
+ g .logger .Warnf ("error while disconnecting work item stream: %v" , err )
320
320
}
321
321
}()
322
322
@@ -397,6 +397,24 @@ func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkI
397
397
}
398
398
}
399
399
400
+ func (g * grpcExecutor ) executeOoWorkItemConnection (ctx context.Context ) error {
401
+ if callback := g .onWorkItemConnection ; callback != nil {
402
+ if err := callback (ctx ); err != nil {
403
+ return err
404
+ }
405
+ }
406
+ return nil
407
+ }
408
+
409
+ func (g * grpcExecutor ) executeOnWorkItemDisconnect (ctx context.Context ) error {
410
+ if callback := g .onWorkItemDisconnect ; callback != nil {
411
+ if err := callback (ctx ); err != nil {
412
+ return err
413
+ }
414
+ }
415
+ return nil
416
+ }
417
+
400
418
// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer
401
419
func (g * grpcExecutor ) CompleteOrchestratorTask (ctx context.Context , res * protos.OrchestratorResponse ) (* protos.CompleteTaskResponse , error ) {
402
420
return emptyCompleteTaskResponse , g .backend .CompleteOrchestratorTask (ctx , res )
0 commit comments