Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion relayer/chainreader/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,19 @@ func (store *DBStore) GetLatestOffset(ctx context.Context, eventAccountAddress,
var offset uint64
var txDigest string
var totalCount uint64
err := store.ds.QueryRowxContext(ctx, QueryEventsOffset, eventAccountAddress, eventHandle).Scan(&offset, &txDigest, &totalCount)
err := store.ds.QueryRowxContext(ctx, QueryEventsOffset, eventAccountAddress, eventHandle).Scan(&offset, &txDigest)
if err != nil {
// no rows found in DB, return a nil index
//nolint:nilnil
if errors.Is(err, sql.ErrNoRows) {
// this is not an error, just nothing to return
return nil, 0, nil
}

return nil, 0, fmt.Errorf("failed to get latest offset: %w", err)
}

err = store.ds.QueryRowxContext(ctx, CountEvents, eventAccountAddress, eventHandle).Scan(&totalCount)
if err != nil {
// no rows found in DB, return a nil index
//nolint:nilnil
Expand Down
3 changes: 2 additions & 1 deletion relayer/chainreader/database/database_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
CREATE INDEX IF NOT EXISTS idx_events_account_handle_timestamp ON sui.events(event_account_address, event_handle, block_timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_events_offset ON sui.events(event_account_address, event_handle, event_offset);
CREATE INDEX IF NOT EXISTS idx_events_data_gin ON sui.events USING gin(data);
CREATE INDEX IF NOT EXISTS idx_events_account_handle_id ON sui.events(event_account_address, event_handle, id DESC);
`

InsertEvent = `
Expand All @@ -56,7 +57,7 @@ const (
`

QueryEventsOffset = `
SELECT COALESCE(event_offset, 0) as event_offset, tx_digest, COUNT(*) OVER() as total_count
SELECT COALESCE(event_offset, 0) as event_offset, tx_digest
FROM sui.events
WHERE event_account_address = $1 AND event_handle = $2
ORDER BY id DESC
Expand Down
172 changes: 172 additions & 0 deletions relayer/chainreader/indexer/events_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ package indexer_test

import (
"context"
"fmt"
"os"
"strconv"
"sync"
"testing"
"time"

"github.com/mr-tron/base58"
indexer2 "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/indexer"

"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"

"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
"github.com/smartcontractkit/chainlink-sui/relayer/client"
Expand Down Expand Up @@ -591,4 +594,173 @@ func TestEventsIndexer(t *testing.T) {

log.Infow("Race detection test completed - no races detected!")
})

t.Run("TestOrderedEventsQueryWithOutOfOrderEventOffset", func(t *testing.T) {
t.Skip("Skipping test ordered events query with out of order event offset until the relevant index is re-added")
// insert duplicate events with out of order event_offset for CCIPMessageSent

packageId := "0x30e087460af8a8aacccbc218aa358cdcde8d43faf61ec0638d71108e276e2f1d"
eventHandle := packageId + "::onramp::CCIPMessageSent"
baseRecord := database.EventRecord{
EventAccountAddress: accountAddress,
EventHandle: eventHandle,
EventOffset: 0,
TxDigest: "5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ",
BlockVersion: 0,
BlockHeight: "100",
BlockHash: []byte("5HueCGU5rMjxEXxiPuD5BDku4MkFqeZyd4dZ1jvhTVqvbTLvyTJ"),
BlockTimestamp: 1000000000,
Data: map[string]any{},
}

// insert duplicate and incorrect event offsets
for i := range 200_000 {
recordA := baseRecord
recordB := baseRecord

// use different event_offset for both records
recordA.EventOffset = uint64(i)
recordB.EventOffset = uint64(i%2 + 1000)

// use duplicate data for both records
recordA.BlockHeight = strconv.Itoa(100 + i)
recordB.BlockHeight = strconv.Itoa(100 + i)

recordA.TxDigest = base58.Encode([]byte("record" + strconv.Itoa(i)))
recordB.TxDigest = base58.Encode([]byte("record" + strconv.Itoa(i)))

recordA.Data = map[string]any{
"destChainSelector": 3478487238524512106,
"sequenceNumber": 776 + uint64(i),
}
recordB.Data = map[string]any{
"destChainSelector": 3478487238524512106,
"sequenceNumber": 776 + uint64(i),
}

dbStore.InsertEvents(ctx, []database.EventRecord{recordA, recordB})
}

// insert some other unrelated events
for i := range 10_000 {
recordA := baseRecord

// use different event_offset for both records
recordA.EventOffset = uint64(i + 1)

// use duplicate data for both records
recordA.BlockHeight = "100"

recordA.TxDigest = base58.Encode([]byte("record" + strconv.Itoa(i)))

recordA.EventHandle = packageId + "::onramp::SomeOtherEvent"

recordA.Data = map[string]any{
"destChainSelector": 3478487238524512106,
"sequenceNumber": 176 + uint64(i),
}

dbStore.InsertEvents(ctx, []database.EventRecord{recordA})
}

// query events with out of order event_offset
events, err := dbStore.QueryEvents(ctx, accountAddress, eventHandle, []query.Expression{
{
BoolExpression: query.BoolExpression{
BoolOperator: query.AND,
Expressions: []query.Expression{
{
Primitive: &primitives.Comparator{
Name: "sequenceNumber",
ValueComparators: []primitives.ValueComparator{
{Value: uint64(776), Operator: primitives.Gte},
{Value: uint64(779), Operator: primitives.Lte},
},
},
},
{
Primitive: &primitives.Comparator{
Name: "destChainSelector",
ValueComparators: []primitives.ValueComparator{
{Value: "3478487238524512106", Operator: primitives.Eq},
},
},
},
},
},
},
}, query.LimitAndSort{
Limit: query.Limit{
Count: 100,
},
SortBy: []query.SortBy{
query.NewSortBySequence(query.Asc),
},
})

// we should only get 10 events
require.NoError(t, err)
require.Equal(t, 4, len(events))

for _, event := range events {
fmt.Printf("eventHandle: %s\n", event.EventHandle)
fmt.Printf("sequenceNumber: %f\n", event.Data["sequenceNumber"].(float64))
fmt.Println("--------------------------------")
}

// events should have strictly increasing sequence numbers and be in order
for i := range len(events) - 1 {
require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64))
}

// query another range for the same event handle
events, err = dbStore.QueryEvents(ctx, accountAddress, eventHandle, []query.Expression{
{
BoolExpression: query.BoolExpression{
BoolOperator: query.AND,
Expressions: []query.Expression{
{
Primitive: &primitives.Comparator{
Name: "sequenceNumber",
ValueComparators: []primitives.ValueComparator{
{Value: uint64(779), Operator: primitives.Gte},
{Value: uint64(785), Operator: primitives.Lte},
},
},
},
{
Primitive: &primitives.Comparator{
Name: "destChainSelector",
ValueComparators: []primitives.ValueComparator{
{Value: "3478487238524512106", Operator: primitives.Eq},
},
},
},
},
},
},
}, query.LimitAndSort{
Limit: query.Limit{
Count: 100,
},
SortBy: []query.SortBy{
query.NewSortBySequence(query.Asc),
},
})

require.NoError(t, err)
require.Equal(t, 7, len(events))

for _, event := range events {
fmt.Printf("eventHandle: %s\n", event.EventHandle)
fmt.Printf("sequenceNumber: %f\n", event.Data["sequenceNumber"].(float64))
fmt.Println("--------------------------------")
}

// events should have strictly increasing sequence numbers and be in order
for i := range len(events) - 1 {
require.Equal(t, events[i].EventHandle, eventHandle)
require.Equal(t, events[i].Data["sequenceNumber"].(float64)+1, events[i+1].Data["sequenceNumber"].(float64))
}
})
}
12 changes: 12 additions & 0 deletions relayer/chainreader/loop/loop_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ func runLoopChainReaderEchoTest(t *testing.T, log logger.Logger, rpcUrl string)
},
},
},
EventsIndexer: config.EventsIndexerConfig{
PollingInterval: 10 * time.Second,
SyncTimeout: 30 * time.Second,
},
TransactionsIndexer: config.TransactionsIndexerConfig{
PollingInterval: 10 * time.Second,
SyncTimeout: 30 * time.Second,
},
}

echoBinding := types.BoundContract{
Expand Down Expand Up @@ -255,6 +263,10 @@ func runLoopChainReaderEchoTest(t *testing.T, log logger.Logger, rpcUrl string)
err = loopReader.Bind(context.Background(), []types.BoundContract{echoBinding, counterBinding})
require.NoError(t, err)

// Start the indexers
err = indexerInstance.Start(ctx)
require.NoError(t, err)

log.Debugw("LoopChainReader setup complete")

t.Run("LoopReader_GetLatestValue_EchoU64", func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions relayer/chainreader/reader/chainreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,10 @@ func (s *suiChainReader) updateEventConfigs(ctx context.Context, contract pkgtyp
Event: eventConfig.EventType,
}

// sync the event in case it's not already in the database
err = evIndexer.SyncEvent(ctx, &selector)
// ensure that the event selector is included in the indexer's set for upcoming polling loop syncs
err = evIndexer.AddEventSelector(ctx, &selector)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to add event selector: %w", err)
}

return eventConfig, nil
Expand Down
Loading