Skip to content

Commit e10c4bb

Browse files
committed
keep order in processor spawn
1 parent 49d809a commit e10c4bb

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

pipeline/processor.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -400,9 +400,13 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
400400
nextActionIdx := parent.action + 1
401401

402402
wg := &sync.WaitGroup{}
403-
results := make(chan *Event)
403+
results := make([]*Event, len(nodes))
404+
resultsChan := make(chan struct {
405+
index int
406+
child *Event
407+
}, len(nodes))
404408

405-
for _, node := range nodes {
409+
for i, node := range nodes {
406410
// we can't reuse parent event (using insaneJSON.Root{Node: child}
407411
// because of nil decoder
408412
child := &Event{
@@ -415,24 +419,33 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
415419
child.action = nextActionIdx
416420

417421
wg.Add(1)
418-
go func(child *Event) {
422+
go func(i int, child *Event) {
419423
defer wg.Done()
420424
ok, _ := p.doActions(child)
421425
if ok {
422-
results <- child
426+
resultsChan <- struct {
427+
index int
428+
child *Event
429+
}{index: i, child: child}
423430
}
424-
}(child)
431+
}(i, child)
425432
}
426433

427434
go func() {
428435
wg.Wait()
429-
close(results)
436+
close(resultsChan)
430437
}()
431438

432-
for child := range results {
433-
child.stage = eventStageOutput
434-
p.output.Out(child)
435-
child.Root.ReleaseMem()
439+
for result := range resultsChan {
440+
results[result.index] = result.child
441+
}
442+
443+
for _, child := range results {
444+
if child != nil {
445+
child.stage = eventStageOutput
446+
p.output.Out(child)
447+
child.Root.ReleaseMem()
448+
}
436449
}
437450

438451
if p.busyActionsTotal == 0 {

0 commit comments

Comments
 (0)