Skip to content

Commit 2e58711

Browse files
Add a field is_synthetic to events table to avoid using incorrect digest for offsets (#312)
* add a field is_synthetic to events table to avoid using incorrect digests for offsets * minor refactoring
1 parent 13ac154 commit 2e58711

File tree

6 files changed

+107
-58
lines changed

6 files changed

+107
-58
lines changed

relayer/chainreader/database/database.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type EventRecord struct {
6767
BlockHash []byte
6868
BlockTimestamp uint64
6969
Data map[string]any
70+
IsSynthetic bool
7071
}
7172

7273
func (store *DBStore) InsertEvents(ctx context.Context, records []EventRecord) error {
@@ -90,6 +91,7 @@ func (store *DBStore) InsertEvents(ctx context.Context, records []EventRecord) e
9091
record.BlockHash,
9192
record.BlockTimestamp,
9293
data,
94+
record.IsSynthetic,
9395
)
9496
if err != nil {
9597
return fmt.Errorf("failed to insert event (handle: %s, offset: %d): %w", record.EventHandle, record.EventOffset, err)

relayer/chainreader/database/database_queries.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const (
1919
data JSONB NOT NULL,
2020
UNIQUE (event_account_address, event_handle, tx_digest, event_offset)
2121
);
22+
ALTER TABLE sui.events ADD COLUMN IF NOT EXISTS is_synthetic BOOLEAN DEFAULT FALSE;
2223
`
2324

2425
CreateTransmitterCursorsTable = `
@@ -45,8 +46,9 @@ const (
4546
block_height,
4647
block_hash,
4748
block_timestamp,
48-
data
49-
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
49+
data,
50+
is_synthetic
51+
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
5052
ON CONFLICT DO NOTHING;
5153
`
5254

@@ -59,7 +61,7 @@ const (
5961
QueryEventsOffset = `
6062
SELECT COALESCE(event_offset, 0) as event_offset, tx_digest
6163
FROM sui.events
62-
WHERE event_account_address = $1 AND event_handle = $2
64+
WHERE event_account_address = $1 AND event_handle = $2 AND is_synthetic = FALSE
6365
ORDER BY id DESC
6466
LIMIT 1
6567
`

relayer/chainreader/indexer/events_indexer.go

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818

1919
"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
2020
"github.com/smartcontractkit/chainlink-sui/relayer/client"
21+
"github.com/smartcontractkit/chainlink-sui/relayer/common"
2122
)
2223

2324
type EventsIndexer struct {
@@ -158,55 +159,6 @@ func (eIndexer *EventsIndexer) SyncAllEvents(ctx context.Context) error {
158159
return nil
159160
}
160161

161-
// Converts snake_case to camelCase
162-
func snakeToCamel(s string) string {
163-
parts := strings.Split(s, "_")
164-
for i := range parts {
165-
if i > 0 && len(parts[i]) > 0 {
166-
parts[i] = strings.ToUpper(string(parts[i][0])) + parts[i][1:]
167-
}
168-
}
169-
170-
return strings.Join(parts, "")
171-
}
172-
173-
// Recursively convert all keys in the map to camelCase,
174-
// with a special case for message.header.sequence_number → seqNum
175-
func convertMapKeysToCamelCase(input any) any {
176-
return convertMapKeysToCamelCaseWithPath(input, "")
177-
}
178-
179-
func convertMapKeysToCamelCaseWithPath(input any, path string) any {
180-
switch typed := input.(type) {
181-
case map[string]any:
182-
result := make(map[string]any)
183-
for k, v := range typed {
184-
camelKey := snakeToCamel(k)
185-
fullPath := path
186-
if fullPath != "" {
187-
fullPath += "." + camelKey
188-
} else {
189-
fullPath = camelKey
190-
}
191-
192-
if fullPath == "message.header.sequenceNumber" {
193-
camelKey = "seqNum"
194-
}
195-
196-
result[camelKey] = convertMapKeysToCamelCaseWithPath(v, fullPath)
197-
}
198-
199-
return result
200-
201-
case []any:
202-
for i, v := range typed {
203-
typed[i] = convertMapKeysToCamelCaseWithPath(v, path)
204-
}
205-
}
206-
207-
return input
208-
}
209-
210162
func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error {
211163
if selector == nil {
212164
return fmt.Errorf("unspecified selector for SyncEvent call")
@@ -347,7 +299,7 @@ eventLoop:
347299
offset += uint64(i) + totalCount
348300

349301
// normalize the data, convert snake case to camel case
350-
normalizedData := convertMapKeysToCamelCase(event.ParsedJson)
302+
normalizedData := common.ConvertMapKeysToCamelCase(event.ParsedJson)
351303

352304
// Convert the txDigest to hex
353305
txDigestHex := event.Id.TxDigest
@@ -375,6 +327,7 @@ eventLoop:
375327
// Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers.
376328
BlockTimestamp: block.Timestamp / 1000,
377329
Data: normalizedData.(map[string]any),
330+
IsSynthetic: false,
378331
}
379332
batchRecords = append(batchRecords, record)
380333
}

relayer/chainreader/indexer/events_indexer_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,6 +611,7 @@ func TestEventsIndexer(t *testing.T) {
611611
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
612612
BlockTimestamp: 1000000000,
613613
Data: map[string]any{},
614+
IsSynthetic: false,
614615
}
615616

616617
// insert duplicate and incorrect event offsets
@@ -763,4 +764,41 @@ func TestEventsIndexer(t *testing.T) {
763764
require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64))
764765
}
765766
})
767+
768+
t.Run("TestSyntheticEventsSkipForOffset", func(t *testing.T) {
769+
eventHandle := packageId + "::offramp::ExecutionStateChanged"
770+
record := database.EventRecord{
771+
EventAccountAddress: accountAddress,
772+
EventHandle: eventHandle,
773+
EventOffset: 0,
774+
TxDigest: "fake_digest",
775+
BlockVersion: 0,
776+
BlockHeight: "100",
777+
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
778+
BlockTimestamp: 1000000000,
779+
Data: map[string]any{},
780+
IsSynthetic: true,
781+
}
782+
783+
recordB := database.EventRecord{
784+
EventAccountAddress: accountAddress,
785+
EventHandle: eventHandle,
786+
EventOffset: 1,
787+
TxDigest: "real_digest",
788+
BlockVersion: 0,
789+
BlockHeight: "100",
790+
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
791+
BlockTimestamp: 1000000000,
792+
Data: map[string]any{},
793+
IsSynthetic: false,
794+
}
795+
796+
dbStore.InsertEvents(ctx, []database.EventRecord{record, recordB})
797+
798+
// query events with out of order event_offset
799+
cursor, totalCount, err := dbStore.GetLatestOffset(ctx, accountAddress, eventHandle)
800+
require.NoError(t, err)
801+
require.Equal(t, recordB.TxDigest, cursor.TxDigest)
802+
require.Equal(t, uint64(2), totalCount)
803+
})
766804
}

relayer/chainreader/indexer/transactions_indexer.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
2323
"github.com/smartcontractkit/chainlink-sui/relayer/client"
2424
"github.com/smartcontractkit/chainlink-sui/relayer/codec"
25+
"github.com/smartcontractkit/chainlink-sui/relayer/common"
2526
)
2627

2728
type TransactionsIndexer struct {
@@ -488,13 +489,16 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
488489
// Create synthetic ExecutionStateChanged event
489490
// The fields map one-to-one the onchain event
490491
executionStateChanged := map[string]any{
491-
"sourceChainSelector": fmt.Sprintf("%d", sourceChainSelector),
492-
"sequenceNumber": fmt.Sprintf("%d", execReport.Message.Header.SequenceNumber),
493-
"messageId": execReport.Message.Header.MessageID,
494-
"messageHash": messageHash[:],
495-
"state": uint8(3), // 3 = FAILURE
492+
"source_chain_selector": fmt.Sprintf("%d", sourceChainSelector),
493+
"sequence_number": fmt.Sprintf("%d", execReport.Message.Header.SequenceNumber),
494+
"message_id": execReport.Message.Header.MessageID,
495+
"message_hash": messageHash[:],
496+
"state": uint8(3), // 3 = FAILURE
496497
}
497498

499+
// normalize keys
500+
executionStateChanged = common.ConvertMapKeysToCamelCase(executionStateChanged).(map[string]any)
501+
498502
blockTimestamp, err := strconv.ParseUint(checkpointResponse.TimestampMs, 10, 64)
499503
if err != nil {
500504
tIndexer.logger.Errorw("Failed to parse block timestamp", "error", err)
@@ -524,6 +528,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
524528
BlockHash: blockHashBytes,
525529
BlockTimestamp: blockTimestamp,
526530
Data: executionStateChanged,
531+
IsSynthetic: true,
527532
}
528533

529534
records = append(records, record)

relayer/common/utils.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,3 +114,52 @@ func GetModuleForContract(contractName string) string {
114114
return "state_object"
115115
}
116116
}
117+
118+
// Converts snake_case to camelCase
119+
func SnakeToCamel(s string) string {
120+
parts := strings.Split(s, "_")
121+
for i := range parts {
122+
if i > 0 && len(parts[i]) > 0 {
123+
parts[i] = strings.ToUpper(string(parts[i][0])) + parts[i][1:]
124+
}
125+
}
126+
127+
return strings.Join(parts, "")
128+
}
129+
130+
// Recursively convert all keys in the map to camelCase,
131+
// with a special case for message.header.sequence_number → seqNum
132+
func ConvertMapKeysToCamelCase(input any) any {
133+
return ConvertMapKeysToCamelCaseWithPath(input, "")
134+
}
135+
136+
func ConvertMapKeysToCamelCaseWithPath(input any, path string) any {
137+
switch typed := input.(type) {
138+
case map[string]any:
139+
result := make(map[string]any)
140+
for k, v := range typed {
141+
camelKey := SnakeToCamel(k)
142+
fullPath := path
143+
if fullPath != "" {
144+
fullPath += "." + camelKey
145+
} else {
146+
fullPath = camelKey
147+
}
148+
149+
if fullPath == "message.header.sequenceNumber" {
150+
camelKey = "seqNum"
151+
}
152+
153+
result[camelKey] = ConvertMapKeysToCamelCaseWithPath(v, fullPath)
154+
}
155+
156+
return result
157+
158+
case []any:
159+
for i, v := range typed {
160+
typed[i] = ConvertMapKeysToCamelCaseWithPath(v, path)
161+
}
162+
}
163+
164+
return input
165+
}

0 commit comments

Comments
 (0)