Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions relayer/chainreader/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type EventRecord struct {
BlockHash []byte
BlockTimestamp uint64
Data map[string]any
IsSynthetic bool
}

func (store *DBStore) InsertEvents(ctx context.Context, records []EventRecord) error {
Expand All @@ -90,6 +91,7 @@ func (store *DBStore) InsertEvents(ctx context.Context, records []EventRecord) e
record.BlockHash,
record.BlockTimestamp,
data,
record.IsSynthetic,
)
if err != nil {
return fmt.Errorf("failed to insert event (handle: %s, offset: %d): %w", record.EventHandle, record.EventOffset, err)
Expand Down
8 changes: 5 additions & 3 deletions relayer/chainreader/database/database_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
data JSONB NOT NULL,
UNIQUE (event_account_address, event_handle, tx_digest, event_offset)
);
ALTER TABLE sui.events ADD COLUMN IF NOT EXISTS is_synthetic BOOLEAN DEFAULT FALSE;
`

CreateTransmitterCursorsTable = `
Expand All @@ -45,8 +46,9 @@ const (
block_height,
block_hash,
block_timestamp,
data
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
data,
is_synthetic
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT DO NOTHING;
`

Expand All @@ -59,7 +61,7 @@ const (
QueryEventsOffset = `
SELECT COALESCE(event_offset, 0) as event_offset, tx_digest
FROM sui.events
WHERE event_account_address = $1 AND event_handle = $2
WHERE event_account_address = $1 AND event_handle = $2 AND is_synthetic = FALSE
ORDER BY id DESC
LIMIT 1
`
Expand Down
53 changes: 3 additions & 50 deletions relayer/chainreader/indexer/events_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
"github.com/smartcontractkit/chainlink-sui/relayer/client"
"github.com/smartcontractkit/chainlink-sui/relayer/common"
)

type EventsIndexer struct {
Expand Down Expand Up @@ -158,55 +159,6 @@ func (eIndexer *EventsIndexer) SyncAllEvents(ctx context.Context) error {
return nil
}

// Converts snake_case to camelCase
func snakeToCamel(s string) string {
parts := strings.Split(s, "_")
for i := range parts {
if i > 0 && len(parts[i]) > 0 {
parts[i] = strings.ToUpper(string(parts[i][0])) + parts[i][1:]
}
}

return strings.Join(parts, "")
}

// Recursively convert all keys in the map to camelCase,
// with a special case for message.header.sequence_number → seqNum
func convertMapKeysToCamelCase(input any) any {
return convertMapKeysToCamelCaseWithPath(input, "")
}

func convertMapKeysToCamelCaseWithPath(input any, path string) any {
switch typed := input.(type) {
case map[string]any:
result := make(map[string]any)
for k, v := range typed {
camelKey := snakeToCamel(k)
fullPath := path
if fullPath != "" {
fullPath += "." + camelKey
} else {
fullPath = camelKey
}

if fullPath == "message.header.sequenceNumber" {
camelKey = "seqNum"
}

result[camelKey] = convertMapKeysToCamelCaseWithPath(v, fullPath)
}

return result

case []any:
for i, v := range typed {
typed[i] = convertMapKeysToCamelCaseWithPath(v, path)
}
}

return input
}

func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error {
if selector == nil {
return fmt.Errorf("unspecified selector for SyncEvent call")
Expand Down Expand Up @@ -347,7 +299,7 @@ eventLoop:
offset += uint64(i) + totalCount

// normalize the data, convert snake case to camel case
normalizedData := convertMapKeysToCamelCase(event.ParsedJson)
normalizedData := common.ConvertMapKeysToCamelCase(event.ParsedJson)

// Convert the txDigest to hex
txDigestHex := event.Id.TxDigest
Expand Down Expand Up @@ -375,6 +327,7 @@ eventLoop:
// Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers.
BlockTimestamp: block.Timestamp / 1000,
Data: normalizedData.(map[string]any),
IsSynthetic: false,
}
batchRecords = append(batchRecords, record)
}
Expand Down
38 changes: 38 additions & 0 deletions relayer/chainreader/indexer/events_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ func TestEventsIndexer(t *testing.T) {
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
BlockTimestamp: 1000000000,
Data: map[string]any{},
IsSynthetic: false,
}

// insert duplicate and incorrect event offsets
Expand Down Expand Up @@ -763,4 +764,41 @@ func TestEventsIndexer(t *testing.T) {
require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64))
}
})

t.Run("TestSyntheticEventsSkipForOffset", func(t *testing.T) {
eventHandle := packageId + "::offramp::ExecutionStateChanged"
record := database.EventRecord{
EventAccountAddress: accountAddress,
EventHandle: eventHandle,
EventOffset: 0,
TxDigest: "fake_digest",
BlockVersion: 0,
BlockHeight: "100",
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
BlockTimestamp: 1000000000,
Data: map[string]any{},
IsSynthetic: true,
}

recordB := database.EventRecord{
EventAccountAddress: accountAddress,
EventHandle: eventHandle,
EventOffset: 1,
TxDigest: "real_digest",
BlockVersion: 0,
BlockHeight: "100",
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
BlockTimestamp: 1000000000,
Data: map[string]any{},
IsSynthetic: false,
}

dbStore.InsertEvents(ctx, []database.EventRecord{record, recordB})

// query events with out of order event_offset
cursor, totalCount, err := dbStore.GetLatestOffset(ctx, accountAddress, eventHandle)
require.NoError(t, err)
require.Equal(t, recordB.TxDigest, cursor.TxDigest)
require.Equal(t, uint64(2), totalCount)
})
}
15 changes: 10 additions & 5 deletions relayer/chainreader/indexer/transactions_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
"github.com/smartcontractkit/chainlink-sui/relayer/client"
"github.com/smartcontractkit/chainlink-sui/relayer/codec"
"github.com/smartcontractkit/chainlink-sui/relayer/common"
)

type TransactionsIndexer struct {
Expand Down Expand Up @@ -488,13 +489,16 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
// Create synthetic ExecutionStateChanged event
// The fields map one-to-one the onchain event
executionStateChanged := map[string]any{
"sourceChainSelector": fmt.Sprintf("%d", sourceChainSelector),
"sequenceNumber": fmt.Sprintf("%d", execReport.Message.Header.SequenceNumber),
"messageId": execReport.Message.Header.MessageID,
"messageHash": messageHash[:],
"state": uint8(3), // 3 = FAILURE
"source_chain_selector": fmt.Sprintf("%d", sourceChainSelector),
"sequence_number": fmt.Sprintf("%d", execReport.Message.Header.SequenceNumber),
"message_id": execReport.Message.Header.MessageID,
"message_hash": messageHash[:],
"state": uint8(3), // 3 = FAILURE
}

// normalize keys
executionStateChanged = common.ConvertMapKeysToCamelCase(executionStateChanged).(map[string]any)

blockTimestamp, err := strconv.ParseUint(checkpointResponse.TimestampMs, 10, 64)
if err != nil {
tIndexer.logger.Errorw("Failed to parse block timestamp", "error", err)
Expand Down Expand Up @@ -524,6 +528,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
BlockHash: blockHashBytes,
BlockTimestamp: blockTimestamp,
Data: executionStateChanged,
IsSynthetic: true,
}

records = append(records, record)
Expand Down
49 changes: 49 additions & 0 deletions relayer/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,52 @@ func GetModuleForContract(contractName string) string {
return "state_object"
}
}

// Converts snake_case to camelCase
func SnakeToCamel(s string) string {
parts := strings.Split(s, "_")
for i := range parts {
if i > 0 && len(parts[i]) > 0 {
parts[i] = strings.ToUpper(string(parts[i][0])) + parts[i][1:]
}
}

return strings.Join(parts, "")
}

// Recursively convert all keys in the map to camelCase,
// with a special case for message.header.sequence_number → seqNum
func ConvertMapKeysToCamelCase(input any) any {
return ConvertMapKeysToCamelCaseWithPath(input, "")
}

func ConvertMapKeysToCamelCaseWithPath(input any, path string) any {
switch typed := input.(type) {
case map[string]any:
result := make(map[string]any)
for k, v := range typed {
camelKey := SnakeToCamel(k)
fullPath := path
if fullPath != "" {
fullPath += "." + camelKey
} else {
fullPath = camelKey
}

if fullPath == "message.header.sequenceNumber" {
camelKey = "seqNum"
}

result[camelKey] = ConvertMapKeysToCamelCaseWithPath(v, fullPath)
}

return result

case []any:
for i, v := range typed {
typed[i] = ConvertMapKeysToCamelCaseWithPath(v, path)
}
}

return input
}
Loading