Skip to content

Commit 234aec3

Browse files
committed
Add threadpool to operator
1 parent a3675f1 commit 234aec3

File tree

1 file changed

+29
-6
lines changed

1 file changed

+29
-6
lines changed

operator/pkg/operator.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"os"
1313
"path/filepath"
14+
"runtime"
1415
"sync"
1516
"time"
1617

@@ -360,8 +361,6 @@ func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlig
360361

361362
verificationDataBatchLen := len(verificationDataBatch)
362363
results := make(chan bool, verificationDataBatchLen)
363-
var wg sync.WaitGroup
364-
wg.Add(verificationDataBatchLen)
365364

366365
disabledVerifiersBitmap, err := o.avsReader.DisabledVerifiers()
367366
if err != nil {
@@ -370,9 +369,21 @@ func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlig
370369
return err
371370
}
372371

372+
maxWorkers := runtime.NumCPU() - 1
373+
if maxWorkers < 1 {
374+
maxWorkers = 1
375+
}
376+
semaphore := make(chan struct{}, maxWorkers)
377+
var wg sync.WaitGroup
378+
wg.Add(verificationDataBatchLen)
379+
373380
for _, verificationData := range verificationDataBatch {
374381
go func(data VerificationData) {
375-
defer wg.Done()
382+
semaphore <- struct{}{}
383+
defer func() {
384+
<-semaphore
385+
wg.Done()
386+
}()
376387
o.verify(data, disabledVerifiersBitmap, results)
377388
o.metrics.IncOperatorTaskResponses()
378389
}(verificationData)
@@ -441,17 +452,29 @@ func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlig
441452

442453
verificationDataBatchLen := len(verificationDataBatch)
443454
results := make(chan bool, verificationDataBatchLen)
444-
var wg sync.WaitGroup
445-
wg.Add(verificationDataBatchLen)
455+
446456
disabledVerifiersBitmap, err := o.avsReader.DisabledVerifiers()
447457
if err != nil {
448458
o.Logger.Errorf("Could not check verifiers status: %s", err)
449459
results <- false
450460
return err
451461
}
462+
463+
maxWorkers := runtime.NumCPU() - 1
464+
if maxWorkers < 1 {
465+
maxWorkers = 1
466+
}
467+
semaphore := make(chan struct{}, maxWorkers)
468+
var wg sync.WaitGroup
469+
wg.Add(verificationDataBatchLen)
470+
452471
for _, verificationData := range verificationDataBatch {
453472
go func(data VerificationData) {
454-
defer wg.Done()
473+
semaphore <- struct{}{}
474+
defer func() {
475+
<-semaphore
476+
wg.Done()
477+
}()
455478
o.verify(data, disabledVerifiersBitmap, results)
456479
o.metrics.IncOperatorTaskResponses()
457480
}(verificationData)

0 commit comments

Comments
 (0)