Skip to content

Commit 8a04127

Browse files
authored
feat(operator): add thread pool operator to avoid exhausting the systems (#2079)
1 parent fc04800 commit 8a04127

File tree

1 file changed

+23
-7
lines changed

1 file changed

+23
-7
lines changed

operator/pkg/operator.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"os"
1313
"path/filepath"
14+
"runtime"
1415
"sync"
1516
"time"
1617

@@ -329,21 +330,36 @@ func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlig
329330

330331
verificationDataBatchLen := len(verificationDataBatch)
331332
results := make(chan bool, verificationDataBatchLen)
332-
var wg sync.WaitGroup
333-
wg.Add(verificationDataBatchLen)
333+
jobs := make(chan VerificationData, verificationDataBatchLen)
334+
334335
disabledVerifiersBitmap, err := o.avsReader.DisabledVerifiers()
335336
if err != nil {
336337
o.Logger.Errorf("Could not check verifiers status: %s", err)
337338
results <- false
338339
return err
339340
}
340-
for _, verificationData := range verificationDataBatch {
341-
go func(data VerificationData) {
341+
342+
maxWorkers := runtime.NumCPU() - 1
343+
if maxWorkers < 1 {
344+
maxWorkers = 1
345+
}
346+
347+
var wg sync.WaitGroup
348+
for i := 0; i < maxWorkers; i++ {
349+
wg.Add(1)
350+
go func() {
342351
defer wg.Done()
343-
o.verify(data, disabledVerifiersBitmap, results)
344-
o.metrics.IncOperatorTaskResponses()
345-
}(verificationData)
352+
for data := range jobs {
353+
o.verify(data, disabledVerifiersBitmap, results)
354+
o.metrics.IncOperatorTaskResponses()
355+
}
356+
}()
357+
}
358+
359+
for _, verificationData := range verificationDataBatch {
360+
jobs <- verificationData
346361
}
362+
close(jobs)
347363

348364
go func() {
349365
wg.Wait()

0 commit comments

Comments
 (0)