Skip to content

Commit 8a5352b

Browse files
authored
Significantly reduce number of go routines in executor (#45)
* Significantly reduce number of go routines in executor Significantly reduce number of go routines generated by sending work to the client. Signed-off-by: joshvanl <[email protected]> * Fix go routine sending of wi Signed-off-by: joshvanl <[email protected]> * Only set timeout on the response Signed-off-by: joshvanl <[email protected]> * don't confuse error msgs Signed-off-by: joshvanl <[email protected]> * Make errCh channel size 1 Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]>
1 parent 7ec83e0 commit 8a5352b

File tree

1 file changed

+30
-13
lines changed

1 file changed

+30
-13
lines changed

backend/executor.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,19 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
321321
}
322322
}()
323323

324+
ch := make(chan *protos.WorkItem)
325+
errCh := make(chan error, 1)
326+
go func() {
327+
for {
328+
select {
329+
case <-stream.Context().Done():
330+
return
331+
case wi := <-ch:
332+
errCh <- stream.Send(wi)
333+
}
334+
}
335+
}()
336+
324337
// The worker client invokes this method, which streams back work-items as they arrive.
325338
for {
326339
select {
@@ -349,7 +362,7 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
349362
}
350363
}
351364

352-
if err := g.sendWorkItem(stream, wi); err != nil {
365+
if err := g.sendWorkItem(stream, wi, ch, errCh); err != nil {
353366
g.logger.Errorf("encountered an error while sending work item: %v", err)
354367
return err
355368
}
@@ -360,25 +373,29 @@ func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream prot
360373
}
361374
}
362375

363-
func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkItemsServer, wi *protos.WorkItem) error {
376+
func (g *grpcExecutor) sendWorkItem(stream protos.TaskHubSidecarService_GetWorkItemsServer, wi *protos.WorkItem,
377+
ch chan *protos.WorkItem, errCh chan error,
378+
) error {
379+
select {
380+
case <-stream.Context().Done():
381+
return stream.Context().Err()
382+
case ch <- wi:
383+
}
384+
364385
ctx := stream.Context()
365386
if g.streamSendTimeout != nil {
366387
var cancel context.CancelFunc
367388
ctx, cancel = context.WithTimeout(ctx, *g.streamSendTimeout)
368389
defer cancel()
369390
}
370391

371-
errCh := make(chan error, 2)
372-
go func() {
373-
select {
374-
case errCh <- stream.Send(wi):
375-
case <-ctx.Done():
376-
g.logger.Errorf("timed out while sending work item")
377-
errCh <- fmt.Errorf("timed out while sending work item: %w", ctx.Err())
378-
}
379-
}()
380-
381-
return <-errCh
392+
select {
393+
case <-ctx.Done():
394+
g.logger.Errorf("timed out while sending work item")
395+
return fmt.Errorf("timed out while sending work item: %w", ctx.Err())
396+
case err := <-errCh:
397+
return err
398+
}
382399
}
383400

384401
// CompleteOrchestratorTask implements protos.TaskHubSidecarServiceServer

0 commit comments

Comments
 (0)