Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion relayer/chainreader/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,19 @@ func (store *DBStore) GetLatestOffset(ctx context.Context, eventAccountAddress,
var offset uint64
var txDigest string
var totalCount uint64
err := store.ds.QueryRowxContext(ctx, QueryEventsOffset, eventAccountAddress, eventHandle).Scan(&offset, &txDigest, &totalCount)
err := store.ds.QueryRowxContext(ctx, QueryEventsOffset, eventAccountAddress, eventHandle).Scan(&offset, &txDigest)
if err != nil {
// no rows found in DB, return a nil index
//nolint:nilnil
if errors.Is(err, sql.ErrNoRows) {
// this is not an error, just nothing to return
return nil, 0, nil
}

return nil, 0, fmt.Errorf("failed to get latest offset: %w", err)
}

err = store.ds.QueryRowxContext(ctx, CountEvents, eventAccountAddress, eventHandle).Scan(&totalCount)
if err != nil {
// no rows found in DB, return a nil index
//nolint:nilnil
Expand Down
20 changes: 5 additions & 15 deletions relayer/chainreader/database/database_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
CREATE INDEX IF NOT EXISTS idx_events_account_handle_timestamp ON sui.events(event_account_address, event_handle, block_timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_events_offset ON sui.events(event_account_address, event_handle, event_offset);
CREATE INDEX IF NOT EXISTS idx_events_data_gin ON sui.events USING gin(data);
CREATE INDEX IF NOT EXISTS idx_events_account_handle_id ON sui.events(event_account_address, event_handle, id DESC);
`

InsertEvent = `
Expand All @@ -50,24 +51,13 @@ const (
`

QueryEventsBase = `
WITH filtered_events AS (
SELECT event_account_address, event_handle, event_offset, block_version,
block_height, block_hash, block_timestamp, tx_digest, data,
ROW_NUMBER() OVER (
PARTITION BY tx_digest, block_height, md5(data::text)
ORDER BY event_offset DESC
) as rn
FROM sui.events
WHERE event_account_address = $1 AND event_handle = $2
)
SELECT event_account_address, event_handle, event_offset, block_version,
block_height, block_hash, block_timestamp, tx_digest, data
FROM filtered_events
WHERE rn = 1
SELECT event_account_address, event_handle, event_offset, block_version, block_height, block_hash, block_timestamp, tx_digest, data
FROM sui.events
WHERE event_account_address = $1 AND event_handle = $2
`

QueryEventsOffset = `
SELECT COALESCE(event_offset, 0) as event_offset, tx_digest, COUNT(*) OVER() as total_count
SELECT COALESCE(event_offset, 0) as event_offset, tx_digest
FROM sui.events
WHERE event_account_address = $1 AND event_handle = $2
ORDER BY id DESC
Expand Down
1 change: 1 addition & 0 deletions relayer/chainreader/indexer/events_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ func TestEventsIndexer(t *testing.T) {
})

t.Run("TestOrderedEventsQueryWithOutOfOrderEventOffset", func(t *testing.T) {
t.Skip("Skipping test ordered events query with out of order event offset until the relevant index is re-added")
// insert duplicate events with out of order event_offset for CCIPMessageSent

packageId := "0x30e087460af8a8aacccbc218aa358cdcde8d43faf61ec0638d71108e276e2f1d"
Expand Down
12 changes: 12 additions & 0 deletions relayer/chainreader/loop/loop_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ func runLoopChainReaderEchoTest(t *testing.T, log logger.Logger, rpcUrl string)
},
},
},
EventsIndexer: config.EventsIndexerConfig{
PollingInterval: 10 * time.Second,
SyncTimeout: 30 * time.Second,
},
TransactionsIndexer: config.TransactionsIndexerConfig{
PollingInterval: 10 * time.Second,
SyncTimeout: 30 * time.Second,
},
}

echoBinding := types.BoundContract{
Expand Down Expand Up @@ -255,6 +263,10 @@ func runLoopChainReaderEchoTest(t *testing.T, log logger.Logger, rpcUrl string)
err = loopReader.Bind(context.Background(), []types.BoundContract{echoBinding, counterBinding})
require.NoError(t, err)

// Start the indexers
err = indexerInstance.Start(ctx)
require.NoError(t, err)

log.Debugw("LoopChainReader setup complete")

t.Run("LoopReader_GetLatestValue_EchoU64", func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions relayer/chainreader/reader/chainreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,10 @@ func (s *suiChainReader) updateEventConfigs(ctx context.Context, contract pkgtyp
Event: eventConfig.EventType,
}

// sync the event in case it's not already in the database
err = evIndexer.SyncEvent(ctx, &selector)
// ensure that the event selector is included in the indexer's set for upcoming polling loop syncs
err = evIndexer.AddEventSelector(ctx, &selector)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to add event selector: %w", err)
}

return eventConfig, nil
Expand Down
Loading