Skip to content

Commit 6ea8603

Browse files
[CLD-272]: feat(operations): enable concurrency support (#95)
CCIP team has raised that they want to start using Operations API and they have a use case where they wan to deploy multiple contracts in parallel (multiple go routine calling executeSequence), currently Operations API is not thread safe due to the reporter. - update MemoryReporter and RecentReporter to be thread safe JIRA: https://smartcontract-it.atlassian.net/browse/CLD-272
1 parent 653ec19 commit 6ea8603

File tree

4 files changed

+186
-5
lines changed

4 files changed

+186
-5
lines changed

.changeset/itchy-dragons-shout.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink-deployments-framework": minor
3+
---
4+
5+
feat: Concurrency support for Operations API

.github/workflows/pull-request-main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,5 @@ jobs:
4141
- name: Build and test
4242
uses: smartcontractkit/.github/actions/ci-test-go@eeb76b5870e3c17856d5a60fd064a053c023b5f5 # [email protected]
4343
with:
44-
go-test-cmd: go test -coverprofile=coverage.txt $(go list ./...)
44+
go-test-cmd: go test -race -coverprofile=coverage.txt $(go list ./...)
4545
use-go-cache: true

operations/execute_test.go

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"errors"
66
"math"
7+
"sync"
78
"testing"
9+
"time"
810

911
"github.com/Masterminds/semver/v3"
1012
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -666,6 +668,138 @@ func Test_loadPreviousSuccessfulReport(t *testing.T) {
666668
}
667669
}
668670

671+
func Test_ExecuteSequence_Concurrent(t *testing.T) {
672+
t.Parallel()
673+
674+
version := semver.MustParse("1.0.0")
675+
676+
op := NewOperation("increment", version, "increment by 1",
677+
func(b Bundle, deps any, input int) (output int, err error) {
678+
return input + 1, nil
679+
})
680+
681+
sequence := NewSequence("concurrent-seq", version, "concurrent sequence test",
682+
func(b Bundle, deps any, input int) (int, error) {
683+
res, err := ExecuteOperation(b, op, nil, input)
684+
if err != nil {
685+
return 0, err
686+
}
687+
688+
// Introduce a small delay to increase chance of race conditions
689+
time.Sleep(time.Millisecond)
690+
691+
return res.Output, nil
692+
})
693+
694+
reporter := NewMemoryReporter()
695+
bundle := NewBundle(context.Background, logger.Test(t), reporter)
696+
697+
const numGoroutines = 10
698+
var wg sync.WaitGroup
699+
wg.Add(numGoroutines)
700+
701+
// Channel to collect results
702+
type result struct {
703+
report SequenceReport[int, int]
704+
err error
705+
}
706+
results := make(chan result, numGoroutines)
707+
708+
for i := range numGoroutines {
709+
go func(input int) {
710+
defer wg.Done()
711+
712+
report, err := ExecuteSequence(bundle, sequence, nil, input)
713+
results <- result{report, err}
714+
}(i) // Each goroutine uses its index as input
715+
}
716+
717+
wg.Wait()
718+
close(results)
719+
720+
// Collect and verify results
721+
for res := range results {
722+
require.NoError(t, res.err, "ExecuteSequence should not return an error")
723+
require.Nil(t, res.report.Err, "Report error should be nil")
724+
725+
// Output should be input + 1
726+
input := res.report.Input
727+
expectedOutput := input + 1
728+
assert.Equal(t, expectedOutput, res.report.Output,
729+
"Output should be input + 1 for input %d", input)
730+
731+
// Verify execution reports
732+
assert.Len(t, res.report.ExecutionReports, 2,
733+
"Should have 2 execution reports (sequence + operation)")
734+
}
735+
736+
// Verify reporter has all reports
737+
allReports, err := reporter.GetReports()
738+
require.NoError(t, err)
739+
740+
// We expect 2*numGoroutines reports (1 sequence + 1 operation per goroutine)
741+
assert.Len(t, allReports, numGoroutines*2,
742+
"Reporter should have %d reports", numGoroutines*2)
743+
}
744+
745+
func Test_ExecuteOperation_Concurrent(t *testing.T) {
746+
t.Parallel()
747+
748+
version := semver.MustParse("1.0.0")
749+
750+
op := NewOperation("increment", version, "increment by 1",
751+
func(b Bundle, deps any, input int) (output int, err error) {
752+
// Introduce a small delay to increase chance of race conditions
753+
time.Sleep(time.Millisecond)
754+
return input + 1, nil
755+
})
756+
757+
reporter := NewMemoryReporter()
758+
bundle := NewBundle(context.Background, logger.Test(t), reporter)
759+
760+
const numGoroutines = 10
761+
var wg sync.WaitGroup
762+
wg.Add(numGoroutines)
763+
764+
// Channel to collect results
765+
type result struct {
766+
report Report[int, int]
767+
err error
768+
}
769+
results := make(chan result, numGoroutines)
770+
771+
for i := range numGoroutines {
772+
go func(input int) {
773+
defer wg.Done()
774+
775+
report, err := ExecuteOperation(bundle, op, nil, input)
776+
results <- result{report, err}
777+
}(i) // Each goroutine uses its index as input
778+
}
779+
780+
wg.Wait()
781+
close(results)
782+
783+
for res := range results {
784+
require.NoError(t, res.err, "ExecuteOperation should not return an error")
785+
require.Nil(t, res.report.Err, "Report error should be nil")
786+
787+
// Output should be input + 1
788+
input := res.report.Input
789+
expectedOutput := input + 1
790+
assert.Equal(t, expectedOutput, res.report.Output,
791+
"Output should be input + 1 for input %d", input)
792+
}
793+
794+
// Verify reporter has all reports
795+
allReports, err := reporter.GetReports()
796+
require.NoError(t, err)
797+
798+
// We expect numGoroutines reports (1 per goroutine)
799+
assert.Len(t, allReports, numGoroutines,
800+
"Reporter should have %d reports", numGoroutines)
801+
}
802+
669803
type errorReporter struct {
670804
Reporter
671805
GetReportError error

operations/report.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"github.com/google/uuid"
@@ -93,8 +94,10 @@ type Reporter interface {
9394
}
9495

9596
// MemoryReporter stores reports in memory.
97+
// This is thread-safe and can be used in a multi-threaded environment.
9698
type MemoryReporter struct {
9799
reports []Report[any, any]
100+
mu sync.RWMutex
98101
}
99102

100103
type MemoryReporterOption func(*MemoryReporter)
@@ -119,19 +122,32 @@ func NewMemoryReporter(options ...MemoryReporterOption) *MemoryReporter {
119122

120123
// AddReport adds a report to the memory reporter.
121124
func (e *MemoryReporter) AddReport(report Report[any, any]) error {
125+
e.mu.Lock()
126+
defer e.mu.Unlock()
127+
122128
e.reports = append(e.reports, report)
123129

124130
return nil
125131
}
126132

127133
// GetReports returns all reports.
128134
func (e *MemoryReporter) GetReports() ([]Report[any, any], error) {
129-
return e.reports, nil
135+
e.mu.RLock()
136+
defer e.mu.RUnlock()
137+
138+
// Create a copy to avoid data races after returning
139+
reports := make([]Report[any, any], len(e.reports))
140+
copy(reports, e.reports)
141+
142+
return reports, nil
130143
}
131144

132145
// GetReport returns a report by ID.
133146
// Returns ErrReportNotFound if the report is not found.
134147
func (e *MemoryReporter) GetReport(id string) (Report[any, any], error) {
148+
e.mu.RLock()
149+
defer e.mu.RUnlock()
150+
135151
for _, report := range e.reports {
136152
if report.ID == id {
137153
return report, nil
@@ -145,14 +161,29 @@ func (e *MemoryReporter) GetReport(id string) (Report[any, any], error) {
145161
// It does this by recursively fetching all the child reports.
146162
// Useful when returning all the reports in a sequence to the changeset output.
147163
func (e *MemoryReporter) GetExecutionReports(seqID string) ([]Report[any, any], error) {
164+
e.mu.RLock()
165+
defer e.mu.RUnlock()
166+
148167
var allReports []Report[any, any]
149168

150169
var getReportsRecursively func(id string) error
151170
getReportsRecursively = func(id string) error {
152-
report, err := e.GetReport(id)
153-
if err != nil {
154-
return err
171+
var report Report[any, any]
172+
found := false
173+
174+
for _, r := range e.reports {
175+
if r.ID == id {
176+
report = r
177+
found = true
178+
179+
break
180+
}
181+
}
182+
183+
if !found {
184+
return fmt.Errorf("report_id %s: %w", id, ErrReportNotFound)
155185
}
186+
156187
for _, childID := range report.ChildOperationReports {
157188
if err := getReportsRecursively(childID); err != nil {
158189
return err
@@ -172,24 +203,35 @@ func (e *MemoryReporter) GetExecutionReports(seqID string) ([]Report[any, any],
172203

173204
// RecentReporter is a wrapper around a Reporter that keeps track of the most recent reports.
174205
// Useful when trying to get a list of reports that was recently added in a sequence.
206+
// It is thread-safe and can be used in a multi-threaded environment.
175207
type RecentReporter struct {
176208
Reporter
177209
recentReports []Report[any, any]
210+
mu sync.RWMutex
178211
}
179212

180213
// AddReport adds a report to the recent reporter.
181214
func (e *RecentReporter) AddReport(report Report[any, any]) error {
215+
// First add to underlying reporter
182216
err := e.Reporter.AddReport(report)
183217
if err != nil {
184218
return err
185219
}
220+
221+
// Then add to recent reports
222+
e.mu.Lock()
223+
defer e.mu.Unlock()
224+
186225
e.recentReports = append(e.recentReports, report)
187226

188227
return nil
189228
}
190229

191230
// GetRecentReports returns all the reports that was added since the construction of the RecentReporter.
192231
func (e *RecentReporter) GetRecentReports() []Report[any, any] {
232+
e.mu.RLock()
233+
defer e.mu.RUnlock()
234+
193235
return e.recentReports
194236
}
195237

0 commit comments

Comments
 (0)