Skip to content

Commit 3202604

Browse files
kaleofdutygtklockerPhilippSchindlerstchrysa
committed
Improvements:
- Move to go1.24 - OCR3.1 alpha Based on cb8e5eb65c417703b7d35c96a95b9e13a2d14020 Co-authored-by: Kostis Karantias <[email protected]> Co-authored-by: Philipp Schindler <[email protected]> Co-authored-by: stchrysa <[email protected]>
1 parent ac7eac4 commit 3202604

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+17658
-26
lines changed

commontypes/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net"
77
"strings"
88

9+
// TODO: is there a way to remove this dependency?
910
ragetypes "github.com/smartcontractkit/libocr/ragep2p/types"
1011
)
1112

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
module github.com/smartcontractkit/libocr
22

3-
go 1.23.0
3+
go 1.24
44

5-
toolchain go1.23.6
5+
toolchain go1.24.4
66

77
require (
88
github.com/ethereum/go-ethereum v1.15.3
@@ -65,7 +65,7 @@ require (
6565
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c // indirect
6666
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
6767
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
68-
github.com/klauspost/compress v1.16.0 // indirect
68+
github.com/klauspost/compress v1.17.11 // indirect
6969
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
7070
github.com/kr/pretty v0.3.1 // indirect
7171
github.com/kr/text v0.2.0 // indirect
@@ -102,7 +102,7 @@ require (
102102
github.com/urfave/cli/v2 v2.27.5 // indirect
103103
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
104104
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
105-
golang.org/x/net v0.36.0 // indirect
105+
golang.org/x/net v0.34.0 // indirect
106106
golang.org/x/sync v0.11.0 // indirect
107107
golang.org/x/sys v0.30.0 // indirect
108108
golang.org/x/text v0.22.0 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd
141141
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
142142
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
143143
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
144-
github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4=
145-
github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
144+
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
145+
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
146146
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
147147
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
148148
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
@@ -312,8 +312,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
312312
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
313313
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
314314
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
315-
golang.org/x/net v0.36.0 h1:vWF2fRbw4qslQsQzgFqZff+BItCvGFQqKzKIzx1rmoA=
316-
golang.org/x/net v0.36.0/go.mod h1:bFmbeoIPfrw4sMHNhb4J9f6+tPziuGjq7Jk/38fxi1I=
315+
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
316+
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
317317
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
318318
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
319319
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

internal/ringbuffer/ringbuffer.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package ringbuffer
2+
3+
import "fmt"
4+
5+
// RingBuffer implements a fixed capacity ring buffer for items of type T.
6+
// NOTE: THIS IMPLEMENTATION IS NOT SAFE FOR CONCURRENT USE.
7+
type RingBuffer[T any] struct {
8+
first int // index of the front (=oldest) element
9+
size int // number of elements currently stored in this ring buffer
10+
items []T // fixed size buffer holding the elements
11+
}
12+
13+
func NewRingBuffer[T any](cap int) *RingBuffer[T] {
14+
if cap <= 0 {
15+
panic(fmt.Sprintf("NewRingBuffer: cap must be positive, got %d", cap))
16+
}
17+
return &RingBuffer[T]{
18+
0,
19+
0,
20+
make([]T, cap),
21+
}
22+
}
23+
24+
func (rb *RingBuffer[T]) Size() int {
25+
return rb.size
26+
}
27+
28+
func (rb *RingBuffer[T]) Cap() int {
29+
return len(rb.items)
30+
}
31+
32+
func (rb *RingBuffer[T]) IsEmpty() bool {
33+
return rb.size == 0
34+
}
35+
36+
func (rb *RingBuffer[T]) IsFull() bool {
37+
return rb.size == len(rb.items)
38+
}
39+
40+
// Peek returns the front (=oldest) item without removing it.
41+
// Return false as second argument if there are no items in the ring buffer.
42+
func (rb *RingBuffer[T]) Peek() (result T, ok bool) {
43+
if rb.size > 0 {
44+
ok = true
45+
result = rb.items[rb.first]
46+
}
47+
return result, ok
48+
}
49+
50+
// Pop removes and returns the front (=oldest) item.
51+
// Return false as second argument if there are no items in the ring buffer.
52+
func (rb *RingBuffer[T]) Pop() (result T, ok bool) {
53+
result, ok = rb.Peek()
54+
if ok {
55+
var zero T
56+
rb.items[rb.first] = zero
57+
rb.first = (rb.first + 1) % len(rb.items)
58+
rb.size--
59+
}
60+
return result, ok
61+
}
62+
63+
// Try to push a new item to the back of the ring buffer.
64+
// Returns
65+
// - true if the item was added, or
66+
// - false if the item cannot be added because the buffer is currently full.
67+
func (rb *RingBuffer[T]) TryPush(item T) (ok bool) {
68+
if rb.IsFull() {
69+
return false
70+
}
71+
rb.items[(rb.first+rb.size)%len(rb.items)] = item
72+
rb.size++
73+
return true
74+
}
75+
76+
// Push new item to the back of the ring buffer.
77+
// If the buffer is currently full, the front (=oldest) item is evicted and returned to make space for the new item.
78+
func (rb *RingBuffer[T]) PushEvict(item T) (evicted T, didEvict bool) {
79+
if rb.IsFull() {
80+
// Evict the oldest item to be returned.
81+
evicted = rb.items[rb.first]
82+
didEvict = true
83+
84+
// Push the new item to new empty space and update the first index to the next (oldest) item.
85+
rb.items[rb.first] = item
86+
rb.first = (rb.first + 1) % len(rb.items)
87+
} else {
88+
// Perform a normal push operation (which is known to be successful as the buffer is not full).
89+
rb.items[(rb.first+rb.size)%len(rb.items)] = item
90+
rb.size++
91+
}
92+
return evicted, didEvict
93+
}

internal/util/generic.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,11 @@ func NilCoalesce[T any](maybe *T, default_ T) T {
1111
return default_
1212
}
1313
}
14+
15+
func NilCoalesceSlice[T any](maybe []T) []T {
16+
if maybe != nil {
17+
return maybe
18+
} else {
19+
return []T{}
20+
}
21+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package responselimit
2+
3+
import (
4+
"math/rand"
5+
"sync"
6+
"time"
7+
8+
"github.com/smartcontractkit/libocr/networking/internal/ocrendpointv3/types"
9+
)
10+
11+
type ResponseCheckResult byte
12+
13+
// Enum specifying the list of return values for responseChecker.CheckResponse(...).
14+
const (
15+
// A response is rejected if the policy
16+
// (1) was not found, or
17+
// (2) was expired, or
18+
// (3) was found but decided to reject the request.
19+
//
20+
// As policies are automatically cleaned up (in some non-deterministic manner), there is no way to distinguish
21+
// cases (1) and (2), and for simplicity also case (3) is handled identically.
22+
//
23+
// We intentionally use 0 as the first enum value for Reject as a safe default here.
24+
ResponseCheckResultReject ResponseCheckResult = iota
25+
26+
// A (non-expired) policy was found, and the policy did decide that the response should be allowed.
27+
ResponseCheckResultAllow
28+
)
29+
30+
type responseCheckerMapEntry struct {
31+
index int
32+
policy ResponsePolicy
33+
streamID types.StreamID
34+
}
35+
36+
// Data structure for keeping track of open requests until a set expiry date.
37+
//
38+
// Cleanup of expired entries is performed automatically. Whenever a new entry is added, two random entries are checked
39+
// and removed if expired. This ensures that, on expectation, the number of tracked entries is approx. 2x the number
40+
// of non-expired entries.
41+
//
42+
// SetPolicy(...) and CheckResponse(...) are O(1) operations.
43+
type ResponseChecker struct {
44+
mutex sync.Mutex
45+
rids []types.RequestID
46+
policies map[types.RequestID]responseCheckerMapEntry
47+
rng *rand.Rand
48+
}
49+
50+
func NewResponseChecker() *ResponseChecker {
51+
return &ResponseChecker{
52+
sync.Mutex{},
53+
make([]types.RequestID, 0),
54+
make(map[types.RequestID]responseCheckerMapEntry),
55+
rand.New(rand.NewSource(time.Now().UnixNano())),
56+
}
57+
}
58+
59+
// Sets the policy for a given (fresh) request ID. After setting the policy, calling Pop(...) for the same ID before the
60+
// policy expires returns the policy Set with this function. If a policy with the provided ID is already present, it
61+
// will be overwritten.
62+
func (c *ResponseChecker) SetPolicy(sid types.StreamID, rid types.RequestID, policy ResponsePolicy) {
63+
c.mutex.Lock()
64+
defer c.mutex.Unlock()
65+
66+
// Lookup an existing policy for the provided request ID.
67+
// If it exists, we override the policy, keeping its location at the prior index.
68+
// Otherwise, we need use a new index and also track the request ID in the c.rids list.
69+
entry, exists := c.policies[rid]
70+
if exists {
71+
entry = responseCheckerMapEntry{entry.index, policy, sid}
72+
} else {
73+
// We set entry.index = len(c.rids) to let it point to the request ID we will append to c.rids list.
74+
entry = responseCheckerMapEntry{len(c.rids), policy, sid}
75+
c.rids = append(c.rids, rid)
76+
}
77+
78+
// Actually save the policy update back to the c.policies map.
79+
c.policies[rid] = entry
80+
81+
// If the number of tracked policies increased, we check 2 random policies and remove them if expired. This way
82+
// the number of tracked policies only grows to 2x the number of non-expired policies in expectation.
83+
if !exists {
84+
c.cleanupExpired()
85+
}
86+
}
87+
88+
// Lookup the policy for a given response and check if it should be allowed or rejected.
89+
// See responseCheckResult for additional documentation on the potential return values of this function.
90+
func (c *ResponseChecker) CheckResponse(sid types.StreamID, rid types.RequestID, size int) ResponseCheckResult {
91+
c.mutex.Lock()
92+
defer c.mutex.Unlock()
93+
94+
entry, exists := c.policies[rid]
95+
if !exists {
96+
return ResponseCheckResultReject
97+
}
98+
if entry.streamID != sid {
99+
return ResponseCheckResultReject
100+
}
101+
102+
now := time.Now()
103+
if entry.policy.isPolicyExpired(now) {
104+
c.removeEntry(rid, entry.index)
105+
return ResponseCheckResultReject
106+
}
107+
108+
policyResult := entry.policy.checkResponse(rid, size, now)
109+
110+
// Recheck the policy of expiry, useful to cleanup one-time-use policies immediately.
111+
if entry.policy.isPolicyExpired(now) {
112+
c.removeEntry(rid, entry.index)
113+
}
114+
115+
return policyResult
116+
}
117+
118+
// Removes all currently tracked policies for the given stream ID. To ensure that responses sent to a stream cannot be
119+
// accepted after this stream is closed and reopened, this function is called when the Stream is closed (and removed
120+
// from the demuxer).
121+
func (c *ResponseChecker) ClearPoliciesForStream(sid types.StreamID) {
122+
c.mutex.Lock()
123+
defer c.mutex.Unlock()
124+
125+
for i := 0; i < len(c.rids); i++ {
126+
rid := c.rids[i]
127+
policy := c.policies[rid]
128+
129+
if policy.streamID == sid {
130+
// We found a policy which matches the given stream ID.
131+
// So we remove the entry from the list of request IDs and policies.
132+
c.removeEntry(rid, i)
133+
134+
// The above removeEntry(...) removes c.rids[i], thus in the next iteration index its value is replaced
135+
// by a different request ID. We decrement index i to ensure that we don't skip the new value at index i.
136+
i--
137+
}
138+
}
139+
}
140+
141+
// Check two random policies. A checked policy is removed if it is found to be expired.
142+
func (c *ResponseChecker) cleanupExpired() {
143+
now := time.Now()
144+
145+
// At most 2 iterations, enter loop body only if c.rids is non empty.
146+
for i := 0; i < 2 && len(c.rids) > 0; i++ {
147+
// Select a random policy.
148+
index := c.rng.Intn(len(c.rids))
149+
id := c.rids[index]
150+
policy := c.policies[id].policy
151+
152+
// Remove it if it is expired.
153+
if policy.isPolicyExpired(now) {
154+
c.removeEntry(id, index)
155+
}
156+
}
157+
}
158+
159+
// Remove the policy for a given request ID from (1) the map of policies and (2) the list of request IDs.
160+
func (c *ResponseChecker) removeEntry(id types.RequestID, index int) {
161+
// Remove the entry from the map of polices.
162+
delete(c.policies, id)
163+
164+
// Handle the "index == last-index" corner case separately.
165+
// This avoids wrongfully reinserting the deleted policy.
166+
if index == len(c.rids)-1 {
167+
c.rids = c.rids[0 : len(c.rids)-1]
168+
return
169+
}
170+
171+
// Swap the last entry's id to the position of the to be removed id, and remove the last value from the rids list.
172+
lastID := c.rids[len(c.rids)-1]
173+
c.rids[index] = lastID
174+
c.rids = c.rids[0 : len(c.rids)-1]
175+
176+
// Update the index point for the c.policies[lastId] to point to the now changed position.
177+
lastEntry := c.policies[lastID]
178+
lastEntry.index = index
179+
c.policies[lastID] = lastEntry
180+
}

0 commit comments

Comments
 (0)