Skip to content

Commit b29fdbb

Browse files
CCIP-8341 Passing TXHash and enabling events filtering in SourceChainReader service (#379)
1 parent 55f136a commit b29fdbb

File tree

7 files changed

+309
-0
lines changed

7 files changed

+309
-0
lines changed

integration/pkg/sourcereader/evm_source_reader.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ func (r *EVMSourceReader) FetchMessageSentEvents(ctx context.Context, fromBlock,
295295
Message: *decodedMsg,
296296
Receipts: allReceipts, // Keep original order from OnRamp event
297297
BlockNumber: log.BlockNumber,
298+
TxHash: protocol.ByteSlice(log.TxHash.Bytes()),
298299
})
299300
}
300301
return results, nil

pkg/chainaccess/filter.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package chainaccess
2+
3+
import "github.com/smartcontractkit/chainlink-ccv/protocol"
4+
5+
// VerifierIssuerFilter filters protocol.MessageSentEvent to only include those
6+
// that have at least one receipt issued by the specified verifier issuer address.
7+
type VerifierIssuerFilter struct {
8+
issuerAddress protocol.UnknownAddress
9+
}
10+
11+
func NewVerifierIssuerFilter(
12+
issuerAddress protocol.UnknownAddress,
13+
) MessageFilter {
14+
return &VerifierIssuerFilter{
15+
issuerAddress: issuerAddress,
16+
}
17+
}
18+
19+
func (v *VerifierIssuerFilter) Filter(msg protocol.MessageSentEvent) bool {
20+
for _, receipt := range msg.Receipts {
21+
if v.issuerAddress.Equal(receipt.Issuer) {
22+
return true
23+
}
24+
}
25+
return false
26+
}
27+
28+
// CompositeMessageFilter combines multiple MessageFilters using logical AND.
29+
// Works as allow-all if no filters are provided.
30+
type CompositeMessageFilter struct {
31+
filters []MessageFilter
32+
}
33+
34+
func NewCompositeMessageFilter(filters ...MessageFilter) MessageFilter {
35+
return &CompositeMessageFilter{
36+
filters: filters,
37+
}
38+
}
39+
40+
func (c *CompositeMessageFilter) Filter(msg protocol.MessageSentEvent) bool {
41+
for _, filter := range c.filters {
42+
if !filter.Filter(msg) {
43+
return false
44+
}
45+
}
46+
return true
47+
}

pkg/chainaccess/filter_test.go

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package chainaccess
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
8+
"github.com/smartcontractkit/chainlink-ccv/protocol"
9+
)
10+
11+
func TestVerifierIssuerFilter_Filter(t *testing.T) {
12+
targetIssuer := protocol.UnknownAddress("0x1234567890abcdef")
13+
otherIssuer1 := protocol.UnknownAddress("0xabcdef1234567890")
14+
otherIssuer2 := protocol.UnknownAddress("0xfedcba0987654321")
15+
16+
tests := []struct {
17+
name string
18+
issuer protocol.UnknownAddress
19+
msg protocol.MessageSentEvent
20+
expected bool
21+
}{
22+
{
23+
name: "no receipts - should return false",
24+
issuer: targetIssuer,
25+
msg: protocol.MessageSentEvent{
26+
Receipts: []protocol.ReceiptWithBlob{},
27+
},
28+
expected: false,
29+
},
30+
{
31+
name: "single receipt with matching issuer - should return true",
32+
issuer: targetIssuer,
33+
msg: protocol.MessageSentEvent{
34+
Receipts: []protocol.ReceiptWithBlob{
35+
{Issuer: targetIssuer},
36+
},
37+
},
38+
expected: true,
39+
},
40+
{
41+
name: "single receipt with non-matching issuer - should return false",
42+
issuer: targetIssuer,
43+
msg: protocol.MessageSentEvent{
44+
Receipts: []protocol.ReceiptWithBlob{
45+
{Issuer: otherIssuer1},
46+
},
47+
},
48+
expected: false,
49+
},
50+
{
51+
name: "multiple receipts with matching issuer as first - should return true",
52+
issuer: targetIssuer,
53+
msg: protocol.MessageSentEvent{
54+
Receipts: []protocol.ReceiptWithBlob{
55+
{Issuer: targetIssuer},
56+
{Issuer: otherIssuer1},
57+
{Issuer: otherIssuer2},
58+
},
59+
},
60+
expected: true,
61+
},
62+
{
63+
name: "multiple receipts with matching issuer in middle - should return true",
64+
issuer: targetIssuer,
65+
msg: protocol.MessageSentEvent{
66+
Receipts: []protocol.ReceiptWithBlob{
67+
{Issuer: otherIssuer1},
68+
{Issuer: targetIssuer},
69+
{Issuer: otherIssuer2},
70+
},
71+
},
72+
expected: true,
73+
},
74+
{
75+
name: "multiple receipts with matching issuer as last - should return true",
76+
issuer: targetIssuer,
77+
msg: protocol.MessageSentEvent{
78+
Receipts: []protocol.ReceiptWithBlob{
79+
{Issuer: otherIssuer1},
80+
{Issuer: otherIssuer2},
81+
{Issuer: targetIssuer},
82+
},
83+
},
84+
expected: true,
85+
},
86+
{
87+
name: "multiple receipts with no matching issuer - should return false",
88+
issuer: targetIssuer,
89+
msg: protocol.MessageSentEvent{
90+
Receipts: []protocol.ReceiptWithBlob{
91+
{Issuer: otherIssuer1},
92+
{Issuer: otherIssuer2},
93+
},
94+
},
95+
expected: false,
96+
},
97+
{
98+
name: "multiple receipts with duplicate matching issuers - should return true",
99+
issuer: targetIssuer,
100+
msg: protocol.MessageSentEvent{
101+
Receipts: []protocol.ReceiptWithBlob{
102+
{Issuer: targetIssuer},
103+
{Issuer: otherIssuer1},
104+
{Issuer: targetIssuer},
105+
},
106+
},
107+
expected: true,
108+
},
109+
}
110+
111+
for _, tt := range tests {
112+
t.Run(tt.name, func(t *testing.T) {
113+
filter := NewVerifierIssuerFilter(tt.issuer)
114+
result := filter.Filter(tt.msg)
115+
assert.Equal(t, tt.expected, result)
116+
})
117+
}
118+
}
119+
120+
func TestCompositeMessageFilter_Filter(t *testing.T) {
121+
issuer1 := protocol.UnknownAddress("0x1111111111111111")
122+
issuer2 := protocol.UnknownAddress("0x2222222222222222")
123+
issuer3 := protocol.UnknownAddress("0x3333333333333333")
124+
125+
tests := []struct {
126+
name string
127+
filters []MessageFilter
128+
msg protocol.MessageSentEvent
129+
expected bool
130+
}{
131+
{
132+
name: "no filters - should return true",
133+
filters: []MessageFilter{},
134+
msg: protocol.MessageSentEvent{
135+
Receipts: []protocol.ReceiptWithBlob{},
136+
},
137+
expected: true,
138+
},
139+
{
140+
name: "single filter matches - should return true",
141+
filters: []MessageFilter{
142+
NewVerifierIssuerFilter(issuer1),
143+
},
144+
msg: protocol.MessageSentEvent{
145+
Receipts: []protocol.ReceiptWithBlob{
146+
{Issuer: issuer1},
147+
},
148+
},
149+
expected: true,
150+
},
151+
{
152+
name: "single filter does not match - should return false",
153+
filters: []MessageFilter{
154+
NewVerifierIssuerFilter(issuer1),
155+
},
156+
msg: protocol.MessageSentEvent{
157+
Receipts: []protocol.ReceiptWithBlob{
158+
{Issuer: issuer2},
159+
},
160+
},
161+
expected: false,
162+
},
163+
{
164+
name: "multiple filters all match - should return true",
165+
filters: []MessageFilter{
166+
NewVerifierIssuerFilter(issuer1),
167+
NewVerifierIssuerFilter(issuer2),
168+
},
169+
msg: protocol.MessageSentEvent{
170+
Receipts: []protocol.ReceiptWithBlob{
171+
{Issuer: issuer1},
172+
{Issuer: issuer2},
173+
},
174+
},
175+
expected: true,
176+
},
177+
{
178+
name: "multiple filters first does not match - should return false",
179+
filters: []MessageFilter{
180+
NewVerifierIssuerFilter(issuer1),
181+
NewVerifierIssuerFilter(issuer2),
182+
},
183+
msg: protocol.MessageSentEvent{
184+
Receipts: []protocol.ReceiptWithBlob{
185+
{Issuer: issuer2},
186+
},
187+
},
188+
expected: false,
189+
},
190+
{
191+
name: "multiple filters second does not match - should return false",
192+
filters: []MessageFilter{
193+
NewVerifierIssuerFilter(issuer1),
194+
NewVerifierIssuerFilter(issuer2),
195+
},
196+
msg: protocol.MessageSentEvent{
197+
Receipts: []protocol.ReceiptWithBlob{
198+
{Issuer: issuer1},
199+
},
200+
},
201+
expected: false,
202+
},
203+
{
204+
name: "multiple filters none match - should return false",
205+
filters: []MessageFilter{
206+
NewVerifierIssuerFilter(issuer1),
207+
NewVerifierIssuerFilter(issuer2),
208+
},
209+
msg: protocol.MessageSentEvent{
210+
Receipts: []protocol.ReceiptWithBlob{
211+
{Issuer: issuer3},
212+
},
213+
},
214+
expected: false,
215+
},
216+
{
217+
name: "three filters all match - should return true",
218+
filters: []MessageFilter{
219+
NewVerifierIssuerFilter(issuer1),
220+
NewVerifierIssuerFilter(issuer2),
221+
NewVerifierIssuerFilter(issuer3),
222+
},
223+
msg: protocol.MessageSentEvent{
224+
Receipts: []protocol.ReceiptWithBlob{
225+
{Issuer: issuer1},
226+
{Issuer: issuer2},
227+
{Issuer: issuer3},
228+
},
229+
},
230+
expected: true,
231+
},
232+
}
233+
234+
for _, tt := range tests {
235+
t.Run(tt.name, func(t *testing.T) {
236+
filter := NewCompositeMessageFilter(tt.filters...)
237+
result := filter.Filter(tt.msg)
238+
assert.Equal(t, tt.expected, result)
239+
})
240+
}
241+
}

pkg/chainaccess/interfaces.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,9 @@ type RMNCurseReader interface {
6363
// - Chain selectors as bytes16s
6464
GetRMNCursedSubjects(ctx context.Context) ([]protocol.Bytes16, error)
6565
}
66+
67+
// MessageFilter defines an interface for filtering protocol.MessageSentEvent.
68+
type MessageFilter interface {
69+
// Filter returns true if the given MessageSentEvent should be processed, false to skip it.
70+
Filter(msg protocol.MessageSentEvent) bool
71+
}

protocol/common_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,4 +276,5 @@ type MessageSentEvent struct {
276276
Message Message // The decoded CCIP message
277277
Receipts []ReceiptWithBlob // Verifier receipts + executor receipt
278278
BlockNumber uint64 // Block number where event occurred
279+
TxHash ByteSlice // Transaction hash of the event
279280
}

verifier/source_reader_service.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type SourceReaderService struct {
4343
ccipMessageSentTopic string
4444
pollInterval time.Duration
4545
chainSelector protocol.ChainSelector
46+
filter chainaccess.MessageFilter
4647

4748
// State that requires synchronization
4849
mu sync.RWMutex
@@ -83,6 +84,8 @@ func NewSourceReaderService(
8384
chainSelector: chainSelector,
8485
ccipMessageSentTopic: onramp.OnRampCCIPMessageSent{}.Topic().Hex(),
8586
chainStatusManager: chainStatusManager,
87+
// TODO: Pass real filters via constructor. Empty chainaccess.CompositeMessageFilter means allow all
88+
filter: chainaccess.NewCompositeMessageFilter(),
8689
}
8790

8891
// Apply options
@@ -606,10 +609,19 @@ func (r *SourceReaderService) processEventCycle(ctx context.Context) {
606609
now := time.Now()
607610
tasks := make([]VerificationTask, 0, len(events))
608611
for _, event := range events {
612+
if !r.filter.Filter(event) {
613+
r.logger.Debugw("Event filtered out",
614+
"txHash", event.TxHash,
615+
"blockNumber", event.BlockNumber,
616+
"messageID", event.MessageID,
617+
)
618+
}
619+
609620
task := VerificationTask{
610621
Message: event.Message,
611622
ReceiptBlobs: event.Receipts,
612623
BlockNumber: event.BlockNumber,
624+
TxHash: event.TxHash,
613625
FirstSeenAt: now,
614626
}
615627
tasks = append(tasks, task)

verifier/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type VerificationTask struct {
1111
// TODO: Rename ReceiptBlobs to VerifierBlobs to match with onchain code.
1212
ReceiptBlobs []protocol.ReceiptWithBlob `json:"receipt_blobs"`
1313
Message protocol.Message `json:"message"`
14+
TxHash protocol.ByteSlice `json:"tx_hash"`
1415
BlockNumber uint64 `json:"block_number"` // Block number when the message was included
1516
FirstSeenAt time.Time `json:"first_seen_at"` // When message first entered the system (for E2E latency)
1617
QueuedAt time.Time `json:"queued_at"` // When added to finality queue (for finality wait duration)

0 commit comments

Comments
 (0)