Skip to content

Commit c86fca5

Browse files
authored
Merge branch 'main' into aptos-service
2 parents de445d1 + c188380 commit c86fca5

File tree

15 files changed

+1427
-50
lines changed

15 files changed

+1427
-50
lines changed

keystore/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ require (
4141
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4242
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.16.1 // indirect
4343
github.com/cloudevents/sdk-go/v2 v2.16.1 // indirect
44-
github.com/consensys/gnark-crypto v0.18.0 // indirect
44+
github.com/consensys/gnark-crypto v0.18.1 // indirect
4545
github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect
4646
github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect
4747
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect

keystore/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ github.com/cloudevents/sdk-go/v2 v2.16.1 h1:G91iUdqvl88BZ1GYYr9vScTj5zzXSyEuqbfE
6262
github.com/cloudevents/sdk-go/v2 v2.16.1/go.mod h1:v/kVOaWjNfbvc6tkhhlkhvLapj8Aa8kvXiH5GiOHCKI=
6363
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
6464
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
65-
github.com/consensys/gnark-crypto v0.18.0 h1:vIye/FqI50VeAr0B3dx+YjeIvmc3LWz4yEfbWBpTUf0=
66-
github.com/consensys/gnark-crypto v0.18.0/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c=
65+
github.com/consensys/gnark-crypto v0.18.1 h1:RyLV6UhPRoYYzaFnPQA4qK3DyuDgkTgskDdoGqFt3fI=
66+
github.com/consensys/gnark-crypto v0.18.1/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c=
6767
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
6868
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
6969
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=

pkg/capabilities/errors/error_serialization.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func DeserializeErrorFromString(errorMsg string) Error {
1515
parts := strings.SplitN(errorMsg, errorMessageSeparator, 4)
1616

1717
if len(parts) < 4 {
18-
// To maintain backwards compatability with messages from remote nodes on an older code version, create an error
18+
// To maintain backwards compatibility with messages from remote nodes on an older code version, create an error
1919
// with the full message and default to private system error with an unknown error code.
2020
return NewError(errors.New(errorMsg), VisibilityPrivate, OriginSystem, Unknown)
2121
}

pkg/chipingress/batch/batcher.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package batch
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// batchWithInterval reads from a channel and calls processFn with batches based on size or time interval.
9+
// When the context is cancelled, it automatically flushes any remaining items in the batch.
10+
func batchWithInterval[T any](
11+
ctx context.Context,
12+
input <-chan T,
13+
batchSize int,
14+
interval time.Duration,
15+
processFn func([]T),
16+
) {
17+
var batch []T
18+
timer := time.NewTimer(interval)
19+
timer.Stop()
20+
21+
flush := func() {
22+
if len(batch) > 0 {
23+
processFn(batch)
24+
batch = nil
25+
timer.Stop()
26+
}
27+
}
28+
29+
for {
30+
select {
31+
case <-ctx.Done():
32+
flush()
33+
return
34+
case msg, ok := <-input:
35+
if !ok {
36+
// Channel closed
37+
flush()
38+
return
39+
}
40+
41+
// Start timer on first message in batch
42+
if len(batch) == 0 {
43+
timer.Reset(interval)
44+
}
45+
46+
batch = append(batch, msg)
47+
48+
// Flush when batch is full
49+
if len(batch) >= batchSize {
50+
processFn(batch)
51+
batch = nil
52+
timer.Stop()
53+
}
54+
case <-timer.C:
55+
flush()
56+
}
57+
}
58+
}

pkg/chipingress/batch/client.go

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package batch
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
9+
"go.uber.org/zap"
10+
11+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
12+
)
13+
14+
type messageWithCallback struct {
15+
event *chipingress.CloudEventPb
16+
callback func(error)
17+
}
18+
19+
// Client is a batching client that accumulates messages and sends them in batches.
20+
type Client struct {
21+
client chipingress.Client
22+
batchSize int
23+
maxConcurrentSends chan struct{}
24+
batchInterval time.Duration
25+
maxPublishTimeout time.Duration
26+
messageBuffer chan *messageWithCallback
27+
stopCh stopCh
28+
log *zap.SugaredLogger
29+
callbackWg sync.WaitGroup
30+
shutdownTimeout time.Duration
31+
shutdownOnce sync.Once
32+
batcherDone chan struct{}
33+
cancelBatcher context.CancelFunc
34+
}
35+
36+
// Opt is a functional option for configuring the batch Client.
37+
type Opt func(*Client)
38+
39+
// NewBatchClient creates a new batching client with the given options.
40+
func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
41+
c := &Client{
42+
client: client,
43+
log: zap.NewNop().Sugar(),
44+
batchSize: 10,
45+
maxConcurrentSends: make(chan struct{}, 1),
46+
messageBuffer: make(chan *messageWithCallback, 200),
47+
batchInterval: 100 * time.Millisecond,
48+
maxPublishTimeout: 5 * time.Second,
49+
stopCh: make(chan struct{}),
50+
callbackWg: sync.WaitGroup{},
51+
shutdownTimeout: 5 * time.Second,
52+
batcherDone: make(chan struct{}),
53+
}
54+
55+
for _, opt := range opts {
56+
opt(c)
57+
}
58+
59+
return c, nil
60+
}
61+
62+
// Start begins processing messages from the queue and sending them in batches
63+
func (b *Client) Start(ctx context.Context) {
64+
// Create a cancellable context for the batcher
65+
batcherCtx, cancel := context.WithCancel(ctx)
66+
b.cancelBatcher = cancel
67+
68+
go func() {
69+
defer close(b.batcherDone)
70+
71+
go func() {
72+
select {
73+
case <-ctx.Done():
74+
b.Stop()
75+
case <-b.stopCh:
76+
cancel()
77+
}
78+
}()
79+
80+
batchWithInterval(
81+
batcherCtx,
82+
b.messageBuffer,
83+
b.batchSize,
84+
b.batchInterval,
85+
func(batch []*messageWithCallback) {
86+
// Detach from cancellation so final flush can still publish during shutdown.
87+
// sendBatch still enforces maxPublishTimeout for each publish call.
88+
b.sendBatch(context.WithoutCancel(batcherCtx), batch)
89+
},
90+
)
91+
}()
92+
}
93+
94+
// Stop ensures:
95+
// - current batch is flushed
96+
// - all current network calls are completed
97+
// - all callbacks are completed
98+
// Forcibly shutdowns down after timeout if not completed.
99+
func (b *Client) Stop() {
100+
b.shutdownOnce.Do(func() {
101+
ctx, cancel := b.stopCh.CtxWithTimeout(b.shutdownTimeout)
102+
defer cancel()
103+
104+
if b.cancelBatcher != nil {
105+
b.cancelBatcher()
106+
}
107+
close(b.stopCh)
108+
109+
done := make(chan struct{})
110+
go func() {
111+
<-b.batcherDone
112+
for range cap(b.maxConcurrentSends) {
113+
b.maxConcurrentSends <- struct{}{}
114+
}
115+
// wait for all callbacks to complete
116+
b.callbackWg.Wait()
117+
close(done)
118+
}()
119+
120+
select {
121+
case <-done:
122+
// All successfully shutdown
123+
case <-ctx.Done(): // timeout or context cancelled
124+
b.log.Warnw("timed out waiting for shutdown to finish, force closing", "timeout", b.shutdownTimeout)
125+
}
126+
})
127+
}
128+
129+
// QueueMessage queues a single message to the batch client with an optional callback.
130+
// The callback will be invoked after the batch containing this message is sent.
131+
// The callback receives an error parameter (nil on success).
132+
// Callbacks are invoked from goroutines
133+
// Returns immediately with no blocking - drops message if channel is full.
134+
// Returns an error if the message was dropped.
135+
func (b *Client) QueueMessage(event *chipingress.CloudEventPb, callback func(error)) error {
136+
if event == nil {
137+
return nil
138+
}
139+
140+
// Check shutdown first to avoid race with buffer send
141+
select {
142+
case <-b.stopCh:
143+
return errors.New("client is shutdown")
144+
default:
145+
}
146+
147+
msg := &messageWithCallback{
148+
event: event,
149+
callback: callback,
150+
}
151+
152+
select {
153+
case b.messageBuffer <- msg:
154+
return nil
155+
default:
156+
return errors.New("message buffer is full")
157+
}
158+
}
159+
160+
func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback) {
161+
if len(messages) == 0 {
162+
return
163+
}
164+
165+
// acquire semaphore, limiting concurrent sends
166+
b.maxConcurrentSends <- struct{}{}
167+
168+
go func() {
169+
defer func() { <-b.maxConcurrentSends }()
170+
// this is specifically to prevent long running network calls
171+
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
172+
defer cancel()
173+
174+
events := make([]*chipingress.CloudEventPb, len(messages))
175+
for i, msg := range messages {
176+
events[i] = msg.event
177+
}
178+
_, err := b.client.PublishBatch(ctxTimeout, &chipingress.CloudEventBatch{Events: events})
179+
if err != nil {
180+
b.log.Errorw("failed to publish batch", "error", err)
181+
}
182+
// the callbacks are placed in their own goroutine to not block releasing the semaphore
183+
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
184+
b.callbackWg.Go(func() {
185+
for _, msg := range messages {
186+
if msg.callback != nil {
187+
msg.callback(err)
188+
}
189+
}
190+
})
191+
}()
192+
}
193+
194+
// WithBatchSize sets the number of messages to accumulate before sending a batch
195+
func WithBatchSize(batchSize int) Opt {
196+
return func(c *Client) {
197+
c.batchSize = batchSize
198+
}
199+
}
200+
201+
// WithMaxConcurrentSends sets the maximum number of concurrent batch send operations
202+
func WithMaxConcurrentSends(maxConcurrentSends int) Opt {
203+
return func(c *Client) {
204+
c.maxConcurrentSends = make(chan struct{}, maxConcurrentSends)
205+
}
206+
}
207+
208+
// WithBatchInterval sets the maximum time to wait before sending an incomplete batch
209+
func WithBatchInterval(batchTimeout time.Duration) Opt {
210+
return func(c *Client) {
211+
c.batchInterval = batchTimeout
212+
}
213+
}
214+
215+
// WithShutdownTimeout sets the maximum time to wait for shutdown to complete
216+
func WithShutdownTimeout(shutdownTimeout time.Duration) Opt {
217+
return func(c *Client) {
218+
c.shutdownTimeout = shutdownTimeout
219+
}
220+
}
221+
222+
// WithMessageBuffer sets the size of the message queue buffer
223+
func WithMessageBuffer(messageBufferSize int) Opt {
224+
return func(c *Client) {
225+
c.messageBuffer = make(chan *messageWithCallback, messageBufferSize)
226+
}
227+
}
228+
229+
// WithMaxPublishTimeout sets the maximum time to wait for a batch publish operation
230+
func WithMaxPublishTimeout(maxPublishTimeout time.Duration) Opt {
231+
return func(c *Client) {
232+
c.maxPublishTimeout = maxPublishTimeout
233+
}
234+
}
235+
236+
// WithLogger sets the logger for the batch client
237+
func WithLogger(log *zap.SugaredLogger) Opt {
238+
return func(c *Client) {
239+
c.log = log
240+
}
241+
}

0 commit comments

Comments
 (0)