From c79b636c8a8aad45e0753a86fb25785a4d7d1902 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Thu, 8 Jan 2026 05:13:42 +0400 Subject: [PATCH 1/2] add a field is_synthetic to events table to avoid using incorrect digests for offsets --- relayer/chainreader/database/database.go | 2 + .../chainreader/database/database_queries.go | 8 ++-- relayer/chainreader/indexer/events_indexer.go | 1 + .../indexer/events_indexer_test.go | 38 +++++++++++++++++++ .../indexer/transactions_indexer.go | 1 + 5 files changed, 47 insertions(+), 3 deletions(-) diff --git a/relayer/chainreader/database/database.go b/relayer/chainreader/database/database.go index e8fa6b28..db3c1f26 100644 --- a/relayer/chainreader/database/database.go +++ b/relayer/chainreader/database/database.go @@ -67,6 +67,7 @@ type EventRecord struct { BlockHash []byte BlockTimestamp uint64 Data map[string]any + IsSynthetic bool } func (store *DBStore) InsertEvents(ctx context.Context, records []EventRecord) error { @@ -90,6 +91,7 @@ func (store *DBStore) InsertEvents(ctx context.Context, records []EventRecord) e record.BlockHash, record.BlockTimestamp, data, + record.IsSynthetic, ) if err != nil { return fmt.Errorf("failed to insert event (handle: %s, offset: %d): %w", record.EventHandle, record.EventOffset, err) diff --git a/relayer/chainreader/database/database_queries.go b/relayer/chainreader/database/database_queries.go index a44906b8..e3d0594d 100644 --- a/relayer/chainreader/database/database_queries.go +++ b/relayer/chainreader/database/database_queries.go @@ -19,6 +19,7 @@ const ( data JSONB NOT NULL, UNIQUE (event_account_address, event_handle, tx_digest, event_offset) ); + ALTER TABLE sui.events ADD COLUMN IF NOT EXISTS is_synthetic BOOLEAN DEFAULT FALSE; ` CreateTransmitterCursorsTable = ` @@ -45,8 +46,9 @@ const ( block_height, block_hash, block_timestamp, - data - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + data, + is_synthetic + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT DO NOTHING; ` @@ -59,7 +61,7 @@ const ( QueryEventsOffset = ` SELECT COALESCE(event_offset, 0) as event_offset, tx_digest FROM sui.events - WHERE event_account_address = $1 AND event_handle = $2 + WHERE event_account_address = $1 AND event_handle = $2 AND is_synthetic = FALSE ORDER BY id DESC LIMIT 1 ` diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 282c6b1b..f49adc45 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -375,6 +375,7 @@ eventLoop: // Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers. BlockTimestamp: block.Timestamp / 1000, Data: normalizedData.(map[string]any), + IsSynthetic: false, } batchRecords = append(batchRecords, record) } diff --git a/relayer/chainreader/indexer/events_indexer_test.go b/relayer/chainreader/indexer/events_indexer_test.go index 799413cb..9b2ca632 100644 --- a/relayer/chainreader/indexer/events_indexer_test.go +++ b/relayer/chainreader/indexer/events_indexer_test.go @@ -611,6 +611,7 @@ func TestEventsIndexer(t *testing.T) { BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"), BlockTimestamp: 1000000000, Data: map[string]any{}, + IsSynthetic: false, } // insert duplicate and incorrect event offsets @@ -763,4 +764,41 @@ func TestEventsIndexer(t *testing.T) { require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64)) } }) + + t.Run("TestSyntheticEventsSkipForOffset", func(t *testing.T) { + eventHandle := packageId + "::offramp::ExecutionStateChanged" + record := database.EventRecord{ + EventAccountAddress: accountAddress, + EventHandle: eventHandle, + EventOffset: 0, + TxDigest: "fake_digest", + BlockVersion: 0, + BlockHeight: "100", + BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"), + BlockTimestamp: 1000000000, + Data: map[string]any{}, + IsSynthetic: true, + } + + recordB := database.EventRecord{ + EventAccountAddress: accountAddress, + EventHandle: eventHandle, + EventOffset: 1, + TxDigest: "real_digest", + BlockVersion: 0, + BlockHeight: "100", + BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"), + BlockTimestamp: 1000000000, + Data: map[string]any{}, + IsSynthetic: false, + } + + dbStore.InsertEvents(ctx, []database.EventRecord{record, recordB}) + + // query events with out of order event_offset + cursor, totalCount, err := dbStore.GetLatestOffset(ctx, accountAddress, eventHandle) + require.NoError(t, err) + require.Equal(t, recordB.TxDigest, cursor.TxDigest) + require.Equal(t, uint64(2), totalCount) + }) } diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index 67a3b715..ec50ece1 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -524,6 +524,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con BlockHash: blockHashBytes, BlockTimestamp: blockTimestamp, Data: executionStateChanged, + IsSynthetic: true, } records = append(records, record) From b57f94c4524723c38181f98a68e817859e5f4b73 Mon Sep 17 00:00:00 2001 From: faisal-link Date: Thu, 8 Jan 2026 17:40:39 +0400 Subject: [PATCH 2/2] minor refactoring --- relayer/chainreader/indexer/events_indexer.go | 52 +------------------ .../indexer/transactions_indexer.go | 14 +++-- relayer/common/utils.go | 49 +++++++++++++++++ 3 files changed, 60 insertions(+), 55 deletions(-) diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index f49adc45..8b5fd053 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -18,6 +18,7 @@ import ( "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database" "github.com/smartcontractkit/chainlink-sui/relayer/client" + "github.com/smartcontractkit/chainlink-sui/relayer/common" ) type EventsIndexer struct { @@ -158,55 +159,6 @@ func (eIndexer *EventsIndexer) SyncAllEvents(ctx context.Context) error { return nil } -// Converts snake_case to camelCase -func snakeToCamel(s string) string { - parts := strings.Split(s, "_") - for i := range parts { - if i > 0 && len(parts[i]) > 0 { - parts[i] = strings.ToUpper(string(parts[i][0])) + parts[i][1:] - } - } - - return strings.Join(parts, "") -} - -// Recursively convert all keys in the map to camelCase, -// with a special case for message.header.sequence_number → seqNum -func convertMapKeysToCamelCase(input any) any { - return convertMapKeysToCamelCaseWithPath(input, "") -} - -func convertMapKeysToCamelCaseWithPath(input any, path string) any { - switch typed := input.(type) { - case map[string]any: - result := make(map[string]any) - for k, v := range typed { - camelKey := snakeToCamel(k) - fullPath := path - if fullPath != "" { - fullPath += "." + camelKey - } else { - fullPath = camelKey - } - - if fullPath == "message.header.sequenceNumber" { - camelKey = "seqNum" - } - - result[camelKey] = convertMapKeysToCamelCaseWithPath(v, fullPath) - } - - return result - - case []any: - for i, v := range typed { - typed[i] = convertMapKeysToCamelCaseWithPath(v, path) - } - } - - return input -} - func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error { if selector == nil { return fmt.Errorf("unspecified selector for SyncEvent call") @@ -347,7 +299,7 @@ eventLoop: offset += uint64(i) + totalCount // normalize the data, convert snake case to camel case - normalizedData := convertMapKeysToCamelCase(event.ParsedJson) + normalizedData := common.ConvertMapKeysToCamelCase(event.ParsedJson) // Convert the txDigest to hex txDigestHex := event.Id.TxDigest diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index 7f70b642..a3170980 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -22,6 +22,7 @@ import ( "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database" "github.com/smartcontractkit/chainlink-sui/relayer/client" "github.com/smartcontractkit/chainlink-sui/relayer/codec" + "github.com/smartcontractkit/chainlink-sui/relayer/common" ) type TransactionsIndexer struct { @@ -488,13 +489,16 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con // Create synthetic ExecutionStateChanged event // The fields map one-to-one the onchain event executionStateChanged := map[string]any{ - "sourceChainSelector": fmt.Sprintf("%d", sourceChainSelector), - "sequenceNumber": fmt.Sprintf("%d", execReport.Message.Header.SequenceNumber), - "messageId": execReport.Message.Header.MessageID, - "messageHash": messageHash[:], - "state": uint8(3), // 3 = FAILURE + "source_chain_selector": fmt.Sprintf("%d", sourceChainSelector), + "sequence_number": fmt.Sprintf("%d", execReport.Message.Header.SequenceNumber), + "message_id": execReport.Message.Header.MessageID, + "message_hash": messageHash[:], + "state": uint8(3), // 3 = FAILURE } + // normalize keys + executionStateChanged = common.ConvertMapKeysToCamelCase(executionStateChanged).(map[string]any) + blockTimestamp, err := strconv.ParseUint(checkpointResponse.TimestampMs, 10, 64) if err != nil { tIndexer.logger.Errorw("Failed to parse block timestamp", "error", err) diff --git a/relayer/common/utils.go b/relayer/common/utils.go index 68f5da69..fff5d085 100644 --- a/relayer/common/utils.go +++ b/relayer/common/utils.go @@ -114,3 +114,52 @@ func GetModuleForContract(contractName string) string { return "state_object" } } + +// Converts snake_case to camelCase +func SnakeToCamel(s string) string { + parts := strings.Split(s, "_") + for i := range parts { + if i > 0 && len(parts[i]) > 0 { + parts[i] = strings.ToUpper(string(parts[i][0])) + parts[i][1:] + } + } + + return strings.Join(parts, "") +} + +// Recursively convert all keys in the map to camelCase, +// with a special case for message.header.sequence_number → seqNum +func ConvertMapKeysToCamelCase(input any) any { + return ConvertMapKeysToCamelCaseWithPath(input, "") +} + +func ConvertMapKeysToCamelCaseWithPath(input any, path string) any { + switch typed := input.(type) { + case map[string]any: + result := make(map[string]any) + for k, v := range typed { + camelKey := SnakeToCamel(k) + fullPath := path + if fullPath != "" { + fullPath += "." + camelKey + } else { + fullPath = camelKey + } + + if fullPath == "message.header.sequenceNumber" { + camelKey = "seqNum" + } + + result[camelKey] = ConvertMapKeysToCamelCaseWithPath(v, fullPath) + } + + return result + + case []any: + for i, v := range typed { + typed[i] = ConvertMapKeysToCamelCaseWithPath(v, path) + } + } + + return input +}