diff --git a/relayer/chainreader/database/database.go b/relayer/chainreader/database/database.go index 6df5de0f..0780f661 100644 --- a/relayer/chainreader/database/database.go +++ b/relayer/chainreader/database/database.go @@ -166,7 +166,19 @@ func (store *DBStore) GetLatestOffset(ctx context.Context, eventAccountAddress, var offset uint64 var txDigest string var totalCount uint64 - err := store.ds.QueryRowxContext(ctx, QueryEventsOffset, eventAccountAddress, eventHandle).Scan(&offset, &txDigest, &totalCount) + err := store.ds.QueryRowxContext(ctx, QueryEventsOffset, eventAccountAddress, eventHandle).Scan(&offset, &txDigest) + if err != nil { + // no rows found in DB, return a nil index + //nolint:nilnil + if errors.Is(err, sql.ErrNoRows) { + // this is not an error, just nothing to return + return nil, 0, nil + } + + return nil, 0, fmt.Errorf("failed to get latest offset: %w", err) + } + + err = store.ds.QueryRowxContext(ctx, CountEvents, eventAccountAddress, eventHandle).Scan(&totalCount) if err != nil { // no rows found in DB, return a nil index //nolint:nilnil diff --git a/relayer/chainreader/database/database_queries.go b/relayer/chainreader/database/database_queries.go index 9d5bbc0a..a44906b8 100644 --- a/relayer/chainreader/database/database_queries.go +++ b/relayer/chainreader/database/database_queries.go @@ -32,6 +32,7 @@ const ( CREATE INDEX IF NOT EXISTS idx_events_account_handle_timestamp ON sui.events(event_account_address, event_handle, block_timestamp DESC); CREATE INDEX IF NOT EXISTS idx_events_offset ON sui.events(event_account_address, event_handle, event_offset); CREATE INDEX IF NOT EXISTS idx_events_data_gin ON sui.events USING gin(data); + CREATE INDEX IF NOT EXISTS idx_events_account_handle_id ON sui.events(event_account_address, event_handle, id DESC); ` InsertEvent = ` @@ -56,7 +57,7 @@ const ( ` QueryEventsOffset = ` - SELECT COALESCE(event_offset, 0) as event_offset, tx_digest, COUNT(*) OVER() as total_count + SELECT COALESCE(event_offset, 0) as event_offset, tx_digest FROM sui.events WHERE event_account_address = $1 AND event_handle = $2 ORDER BY id DESC diff --git a/relayer/chainreader/indexer/events_indexer_test.go b/relayer/chainreader/indexer/events_indexer_test.go index aae0bbed..47780135 100644 --- a/relayer/chainreader/indexer/events_indexer_test.go +++ b/relayer/chainreader/indexer/events_indexer_test.go @@ -4,12 +4,14 @@ package indexer_test import ( "context" + "fmt" "os" "strconv" "sync" "testing" "time" + "github.com/mr-tron/base58" indexer2 "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/indexer" "github.com/stretchr/testify/require" @@ -17,6 +19,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest" "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database" "github.com/smartcontractkit/chainlink-sui/relayer/client" @@ -591,4 +594,173 @@ func TestEventsIndexer(t *testing.T) { log.Infow("Race detection test completed - no races detected!") }) + + t.Run("TestOrderedEventsQueryWithOutOfOrderEventOffset", func(t *testing.T) { + t.Skip("Skipping test ordered events query with out of order event offset until the relevant index is re-added") + // insert duplicate events with out of order event_offset for CCIPMessageSent + + packageId := "0x30e087460af8a8aacccbc218aa358cdcde8d43faf61ec0638d71108e276e2f1d" + eventHandle := packageId + "::onramp::CCIPMessageSent" + baseRecord := database.EventRecord{ + EventAccountAddress: accountAddress, + EventHandle: eventHandle, + EventOffset: 0, + TxDigest: "5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ", + BlockVersion: 0, + BlockHeight: "100", + BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"), + BlockTimestamp: 1000000000, + Data: map[string]any{}, + } + + // insert duplicate and incorrect event offsets + for i := range 200_000 { + recordA := baseRecord + recordB := baseRecord + + // use different event_offset for both records + recordA.EventOffset = uint64(i) + recordB.EventOffset = uint64(i%2 + 1000) + + // use duplicate data for both records + recordA.BlockHeight = strconv.Itoa(100 + i) + recordB.BlockHeight = strconv.Itoa(100 + i) + + recordA.TxDigest = base58.Encode([]byte("record" + strconv.Itoa(i))) + recordB.TxDigest = base58.Encode([]byte("record" + strconv.Itoa(i))) + + recordA.Data = map[string]any{ + "destChainSelector": 3478487238524512106, + "sequenceNumber": 776 + uint64(i), + } + recordB.Data = map[string]any{ + "destChainSelector": 3478487238524512106, + "sequenceNumber": 776 + uint64(i), + } + + dbStore.InsertEvents(ctx, []database.EventRecord{recordA, recordB}) + } + + // insert some other unrelated events + for i := range 10_000 { + recordA := baseRecord + + // use different event_offset for both records + recordA.EventOffset = uint64(i + 1) + + // use duplicate data for both records + recordA.BlockHeight = "100" + + recordA.TxDigest = base58.Encode([]byte("record" + strconv.Itoa(i))) + + recordA.EventHandle = packageId + "::onramp::SomeOtherEvent" + + recordA.Data = map[string]any{ + "destChainSelector": 3478487238524512106, + "sequenceNumber": 176 + uint64(i), + } + + dbStore.InsertEvents(ctx, []database.EventRecord{recordA}) + } + + // query events with out of order event_offset + events, err := dbStore.QueryEvents(ctx, accountAddress, eventHandle, []query.Expression{ + { + BoolExpression: query.BoolExpression{ + BoolOperator: query.AND, + Expressions: []query.Expression{ + { + Primitive: &primitives.Comparator{ + Name: "sequenceNumber", + ValueComparators: []primitives.ValueComparator{ + {Value: uint64(776), Operator: primitives.Gte}, + {Value: uint64(779), Operator: primitives.Lte}, + }, + }, + }, + { + Primitive: &primitives.Comparator{ + Name: "destChainSelector", + ValueComparators: []primitives.ValueComparator{ + {Value: "3478487238524512106", Operator: primitives.Eq}, + }, + }, + }, + }, + }, + }, + }, query.LimitAndSort{ + Limit: query.Limit{ + Count: 100, + }, + SortBy: []query.SortBy{ + query.NewSortBySequence(query.Asc), + }, + }) + + // we should only get 10 events + require.NoError(t, err) + require.Equal(t, 4, len(events)) + + for _, event := range events { + fmt.Printf("eventHandle: %s\n", event.EventHandle) + fmt.Printf("sequenceNumber: %f\n", event.Data["sequenceNumber"].(float64)) + fmt.Println("--------------------------------") + } + + // events should have strictly increasing sequence numbers and be in order + for i := range len(events) - 1 { + require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64)) + } + + // query another range for the same event handle + events, err = dbStore.QueryEvents(ctx, accountAddress, eventHandle, []query.Expression{ + { + BoolExpression: query.BoolExpression{ + BoolOperator: query.AND, + Expressions: []query.Expression{ + { + Primitive: &primitives.Comparator{ + Name: "sequenceNumber", + ValueComparators: []primitives.ValueComparator{ + {Value: uint64(779), Operator: primitives.Gte}, + {Value: uint64(785), Operator: primitives.Lte}, + }, + }, + }, + { + Primitive: &primitives.Comparator{ + Name: "destChainSelector", + ValueComparators: []primitives.ValueComparator{ + {Value: "3478487238524512106", Operator: primitives.Eq}, + }, + }, + }, + }, + }, + }, + }, query.LimitAndSort{ + Limit: query.Limit{ + Count: 100, + }, + SortBy: []query.SortBy{ + query.NewSortBySequence(query.Asc), + }, + }) + + require.NoError(t, err) + require.Equal(t, 7, len(events)) + + for _, event := range events { + fmt.Printf("eventHandle: %s\n", event.EventHandle) + fmt.Printf("sequenceNumber: %f\n", event.Data["sequenceNumber"].(float64)) + fmt.Println("--------------------------------") + } + + // events should have strictly increasing sequence numbers and be in order + for i := range len(events) - 1 { + require.Equal(t, events[i].EventHandle, eventHandle) + require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64)) + } + }) } diff --git a/relayer/chainreader/loop/loop_reader_test.go b/relayer/chainreader/loop/loop_reader_test.go index d4bfdb6c..e034bdf9 100644 --- a/relayer/chainreader/loop/loop_reader_test.go +++ b/relayer/chainreader/loop/loop_reader_test.go @@ -200,6 +200,14 @@ func runLoopChainReaderEchoTest(t *testing.T, log logger.Logger, rpcUrl string) }, }, }, + EventsIndexer: config.EventsIndexerConfig{ + PollingInterval: 10 * time.Second, + SyncTimeout: 30 * time.Second, + }, + TransactionsIndexer: config.TransactionsIndexerConfig{ + PollingInterval: 10 * time.Second, + SyncTimeout: 30 * time.Second, + }, } echoBinding := types.BoundContract{ @@ -255,6 +263,10 @@ func runLoopChainReaderEchoTest(t *testing.T, log logger.Logger, rpcUrl string) err = loopReader.Bind(context.Background(), []types.BoundContract{echoBinding, counterBinding}) require.NoError(t, err) + // Start the indexers + err = indexerInstance.Start(ctx) + require.NoError(t, err) + log.Debugw("LoopChainReader setup complete") t.Run("LoopReader_GetLatestValue_EchoU64", func(t *testing.T) { diff --git a/relayer/chainreader/reader/chainreader.go b/relayer/chainreader/reader/chainreader.go index d6984f2b..62953bc3 100644 --- a/relayer/chainreader/reader/chainreader.go +++ b/relayer/chainreader/reader/chainreader.go @@ -607,10 +607,10 @@ func (s *suiChainReader) updateEventConfigs(ctx context.Context, contract pkgtyp Event: eventConfig.EventType, } - // sync the event in case it's not already in the database - err = evIndexer.SyncEvent(ctx, &selector) + // ensure that the event selector is included in the indexer's set for upcoming polling loop syncs + err = evIndexer.AddEventSelector(ctx, &selector) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to add event selector: %w", err) } return eventConfig, nil