Skip to content

Commit dd9a0d0

Browse files
committed
Add a proper worker group
1 parent 234aec3 commit dd9a0d0

File tree

1 file changed

+28
-22
lines changed

1 file changed

+28
-22
lines changed

operator/pkg/operator.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlig
361361

362362
verificationDataBatchLen := len(verificationDataBatch)
363363
results := make(chan bool, verificationDataBatchLen)
364+
jobs := make(chan VerificationData, verificationDataBatchLen)
364365

365366
disabledVerifiersBitmap, err := o.avsReader.DisabledVerifiers()
366367
if err != nil {
@@ -373,21 +374,23 @@ func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlig
373374
if maxWorkers < 1 {
374375
maxWorkers = 1
375376
}
376-
semaphore := make(chan struct{}, maxWorkers)
377+
377378
var wg sync.WaitGroup
378-
wg.Add(verificationDataBatchLen)
379+
for i := 0; i < maxWorkers; i++ {
380+
wg.Add(1)
381+
go func() {
382+
defer wg.Done()
383+
for data := range jobs {
384+
o.verify(data, disabledVerifiersBitmap, results)
385+
o.metrics.IncOperatorTaskResponses()
386+
}
387+
}()
388+
}
379389

380390
for _, verificationData := range verificationDataBatch {
381-
go func(data VerificationData) {
382-
semaphore <- struct{}{}
383-
defer func() {
384-
<-semaphore
385-
wg.Done()
386-
}()
387-
o.verify(data, disabledVerifiersBitmap, results)
388-
o.metrics.IncOperatorTaskResponses()
389-
}(verificationData)
391+
jobs <- verificationData
390392
}
393+
close(jobs)
391394

392395
go func() {
393396
wg.Wait()
@@ -452,6 +455,7 @@ func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlig
452455

453456
verificationDataBatchLen := len(verificationDataBatch)
454457
results := make(chan bool, verificationDataBatchLen)
458+
jobs := make(chan VerificationData, verificationDataBatchLen)
455459

456460
disabledVerifiersBitmap, err := o.avsReader.DisabledVerifiers()
457461
if err != nil {
@@ -464,21 +468,23 @@ func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlig
464468
if maxWorkers < 1 {
465469
maxWorkers = 1
466470
}
467-
semaphore := make(chan struct{}, maxWorkers)
471+
468472
var wg sync.WaitGroup
469-
wg.Add(verificationDataBatchLen)
473+
for i := 0; i < maxWorkers; i++ {
474+
wg.Add(1)
475+
go func() {
476+
defer wg.Done()
477+
for data := range jobs {
478+
o.verify(data, disabledVerifiersBitmap, results)
479+
o.metrics.IncOperatorTaskResponses()
480+
}
481+
}()
482+
}
470483

471484
for _, verificationData := range verificationDataBatch {
472-
go func(data VerificationData) {
473-
semaphore <- struct{}{}
474-
defer func() {
475-
<-semaphore
476-
wg.Done()
477-
}()
478-
o.verify(data, disabledVerifiersBitmap, results)
479-
o.metrics.IncOperatorTaskResponses()
480-
}(verificationData)
485+
jobs <- verificationData
481486
}
487+
close(jobs)
482488

483489
go func() {
484490
wg.Wait()

0 commit comments

Comments
 (0)