Skip to content

Commit 357d6c5

Browse files
faisal-chainlinkstackman27FelixFan1992
authored
Fix: use hex for transaction digest in events (#257)
* update event indexer and txn indexer to insert txDigest as a hex string * fix bug related to events cursor txDigest * update events indexer test * tidy * block hash base58 * Misc fixes (#274) * allow maxRetries instead of maxRetries - 1 * avoid overwriting the module name in event selector when querying for events * update tests to skip if the DB url is not provided * update txm confirmer test * edit error logs * Enhancement/allow override event offset (#295) * firedrill contracts test * add firedrill sanity check * extend configs to allow overriding event offset * add an offset override fallback for events * remove unnecessary unlock * ensure base58 instead of hex when parsing block and tx digest in transaction indexer * cleanup logs * search SourceChainConfigSet using strings instead of u64 for chain selectors * add a database table to store transactions cursors when replaying failed transactions from transmitters * disable firedrill sanity test --------- Co-authored-by: Sishir Giri <[email protected]> Co-authored-by: FelixFan1992 <[email protected]>
1 parent b80fff7 commit 357d6c5

File tree

14 files changed

+547
-38
lines changed

14 files changed

+547
-38
lines changed

relayer/chainreader/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ type ChainReaderEvent struct {
5858

5959
// The expected event type (optional). When not provided, the event type is used as-is.
6060
ExpectedEventType any
61+
62+
// A fallback for events selectors with no offset recorded in the DB and a starting point
63+
// earlier than the pruning cutoff of the RPC
64+
EventSelectorDefaultOffset *client.EventId
6165
}
6266

6367
type EventsIndexerConfig struct {

relayer/chainreader/database/database.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ func (store *DBStore) EnsureSchema(ctx context.Context) error {
4949
return fmt.Errorf("failed to create sui indexes: %w", err)
5050
}
5151

52+
_, err = store.ds.ExecContext(ctx, CreateTransmitterCursorsTable)
53+
if err != nil {
54+
return fmt.Errorf("failed to create sui.transmitter_cursors table: %w", err)
55+
}
56+
5257
return nil
5358
}
5459

@@ -235,3 +240,23 @@ func operatorSQL(op primitives.ComparisonOperator) string {
235240
return "="
236241
}
237242
}
243+
244+
func (store *DBStore) GetTransmitterCursor(ctx context.Context, transmitter models.SuiAddress) (string, error) {
245+
var cursor string
246+
err := store.ds.QueryRowxContext(ctx, GetTransmitterCursor, transmitter).Scan(&cursor)
247+
if err != nil {
248+
if errors.Is(err, sql.ErrNoRows) {
249+
return "", nil
250+
}
251+
return "", fmt.Errorf("failed to get transmitter cursor: %w", err)
252+
}
253+
return cursor, nil
254+
}
255+
256+
func (store *DBStore) UpdateTransmitterCursor(ctx context.Context, transmitter models.SuiAddress, cursor string) error {
257+
_, err := store.ds.ExecContext(ctx, UpdateTransmitterCursor, transmitter, cursor)
258+
if err != nil {
259+
return fmt.Errorf("failed to update transmitter cursor: %w", err)
260+
}
261+
return nil
262+
}

relayer/chainreader/database/database_queries.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ const (
2121
);
2222
`
2323

24+
CreateTransmitterCursorsTable = `
25+
CREATE TABLE IF NOT EXISTS sui.transmitter_cursors (
26+
transmitter TEXT PRIMARY KEY,
27+
cursor TEXT NOT NULL
28+
);
29+
`
30+
2431
CreateIndices = `
2532
CREATE INDEX IF NOT EXISTS idx_events_account_handle_timestamp ON sui.events(event_account_address, event_handle, block_timestamp DESC);
2633
CREATE INDEX IF NOT EXISTS idx_events_offset ON sui.events(event_account_address, event_handle, event_offset);
@@ -78,4 +85,15 @@ const (
7885
FROM sui.events
7986
WHERE id = $1
8087
`
88+
89+
GetTransmitterCursor = `
90+
SELECT cursor
91+
FROM sui.transmitter_cursors
92+
WHERE transmitter = $1
93+
`
94+
95+
UpdateTransmitterCursor = `
96+
INSERT INTO sui.transmitter_cursors (transmitter, cursor) VALUES ($1, $2)
97+
ON CONFLICT (transmitter) DO UPDATE SET cursor = $2;
98+
`
8199
)

relayer/chainreader/indexer/events_indexer.go

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package indexer
22

33
import (
44
"context"
5+
"encoding/hex"
56
"errors"
67
"fmt"
78
"strconv"
@@ -10,6 +11,7 @@ import (
1011
"time"
1112

1213
"github.com/block-vision/sui-go-sdk/models"
14+
"github.com/mr-tron/base58"
1315

1416
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1517
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
@@ -26,8 +28,9 @@ type EventsIndexer struct {
2628
syncTimeout time.Duration
2729

2830
// Protected by configMutex
29-
eventConfigurations []*client.EventSelector
30-
configMutex sync.RWMutex
31+
eventConfigurations []*client.EventSelector
32+
eventOffsetOverrides map[string]client.EventId
33+
configMutex sync.RWMutex
3134

3235
// Protected by cursorMutex
3336
// a map of event handles to the last processed cursor
@@ -40,6 +43,7 @@ type EventsIndexerApi interface {
4043
SyncAllEvents(ctx context.Context) error
4144
SyncEvent(ctx context.Context, selector *client.EventSelector) error
4245
AddEventSelector(ctx context.Context, selector *client.EventSelector) error
46+
SetEventOffsetOverrides(ctx context.Context, offsetOverrides map[string]client.EventId) error
4347
Ready() error
4448
Close() error
4549
}
@@ -208,6 +212,7 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E
208212
return fmt.Errorf("unspecified selector for SyncEvent call")
209213
}
210214

215+
eventKey := fmt.Sprintf("%s::%s", selector.Module, selector.Event)
211216
eventHandle := fmt.Sprintf("%s::%s::%s", selector.Package, selector.Module, selector.Event)
212217

213218
// check if the event selector is already tracked, if not add it to the list
@@ -227,16 +232,42 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E
227232
cursor := eIndexer.lastProcessedCursors[eventHandle]
228233

229234
var totalCount uint64
230-
var err error
235+
231236
if cursor == nil {
232237
// attempt to get the latest event sync of the given type and use its data to construct a cursor
233-
cursor, totalCount, err = eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle)
234-
if err != nil {
238+
dbOffsetCursor, dbTotalCount, offsetErr := eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle)
239+
if offsetErr != nil {
235240
eIndexer.cursorMutex.RUnlock()
236-
return err
241+
eIndexer.logger.Errorw("syncEvent: failed to get latest offset", "error", offsetErr)
242+
return offsetErr
237243
}
238244

239-
eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle, "cursor", cursor)
245+
if dbOffsetCursor != nil {
246+
txDigestBytes, err := hex.DecodeString(strings.TrimPrefix(dbOffsetCursor.TxDigest, "0x"))
247+
if err != nil {
248+
eIndexer.cursorMutex.RUnlock()
249+
eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err, "txDigest", dbOffsetCursor.TxDigest)
250+
return err
251+
}
252+
// convert the db offset cursor digest from hex (the format stored in the DB) to base58 (the format expected by the client)
253+
cursor = &models.EventId{
254+
TxDigest: base58.Encode(txDigestBytes),
255+
EventSeq: dbOffsetCursor.EventSeq,
256+
}
257+
258+
totalCount = dbTotalCount
259+
} else {
260+
eIndexer.configMutex.RLock()
261+
if override, ok := eIndexer.eventOffsetOverrides[eventKey]; ok {
262+
cursor = &models.EventId{
263+
TxDigest: override.TxDigest,
264+
EventSeq: override.EventSeq,
265+
}
266+
} else {
267+
eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle)
268+
}
269+
eIndexer.configMutex.RUnlock()
270+
}
240271
}
241272

242273
batchSize := uint(batchSizeRecords)
@@ -308,15 +339,29 @@ eventLoop:
308339
// normalize the data, convert snake case to camel case
309340
normalizedData := convertMapKeysToCamelCase(event.ParsedJson)
310341

342+
// Convert the txDigest to hex
343+
txDigestHex := event.Id.TxDigest
344+
if base58Bytes, err := base58.Decode(txDigestHex); err == nil {
345+
hexTxId := hex.EncodeToString(base58Bytes)
346+
txDigestHex = "0x" + hexTxId
347+
}
348+
349+
blockHashBytes, err := base58.Decode(block.TxDigest)
350+
if err != nil {
351+
eIndexer.logger.Errorw("Failed to decode block hash", "error", err)
352+
// fallback
353+
blockHashBytes = []byte(block.TxDigest)
354+
}
355+
311356
// Convert event to database record
312357
record := database.EventRecord{
313358
EventAccountAddress: selector.Package,
314359
EventHandle: eventHandle,
315360
EventOffset: offset,
316-
TxDigest: event.Id.TxDigest,
361+
TxDigest: txDigestHex,
317362
BlockVersion: 0,
318363
BlockHeight: fmt.Sprintf("%d", block.Height),
319-
BlockHash: []byte(block.TxDigest),
364+
BlockHash: blockHashBytes,
320365
// Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers.
321366
BlockTimestamp: block.Timestamp / 1000,
322367
Data: normalizedData.(map[string]any),
@@ -409,6 +454,13 @@ func (eIndexer *EventsIndexer) AddEventSelector(ctx context.Context, selector *c
409454
return nil
410455
}
411456

457+
func (eIndexer *EventsIndexer) SetEventOffsetOverrides(ctx context.Context, offsetOverrides map[string]client.EventId) error {
458+
eIndexer.configMutex.Lock()
459+
defer eIndexer.configMutex.Unlock()
460+
eIndexer.eventOffsetOverrides = offsetOverrides
461+
return nil
462+
}
463+
412464
// IsEventSelectorAdded checks if a specific event selector has already been included in the list of events to sync
413465
func (eIndexer *EventsIndexer) isEventSelectorAdded(eConfig client.EventSelector) bool {
414466
eIndexer.configMutex.RLock()

relayer/chainreader/indexer/events_indexer_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,36 @@ func TestEventsIndexer(t *testing.T) {
523523
log.Infow("All concurrent access tests completed successfully")
524524
})
525525

526+
t.Run("TestWithTimestamps", func(t *testing.T) {
527+
log.Infow("Testing with timestamps")
528+
529+
// Trigger some events
530+
for i := 1; i <= 3; i++ {
531+
createEvent(i)
532+
}
533+
534+
// Create a new event selector for timestamps
535+
timestampEventSelector := &client.EventSelector{
536+
Package: packageId,
537+
Module: "counter",
538+
Event: "CounterIncremented",
539+
}
540+
541+
// Run sync to index events
542+
err := indexer.SyncEvent(ctx, timestampEventSelector)
543+
require.NoError(t, err)
544+
545+
// Wait for events to be indexed
546+
events := waitForEventCount(3, 60*time.Second)
547+
require.GreaterOrEqual(t, len(events), 3)
548+
549+
// Check that events are recorded with timestamps in seconds
550+
for _, event := range events[:3] {
551+
require.Greater(t, event.BlockTimestamp, uint64(0), "Event should have a timestamp")
552+
require.Less(t, event.BlockTimestamp, uint64(time.Now().Unix()+1), "Event timestamp should be in the past")
553+
}
554+
})
555+
526556
t.Run("TestRaceDetection", func(t *testing.T) {
527557
// Run with: go test -race -run TestEventsIndexer/TestRaceDetection
528558
log.Infow("Starting race detection test")

relayer/chainreader/indexer/indexer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func NewIndexer(
4040
transactionIndexer TransactionsIndexerApi,
4141
) *Indexer {
4242
return &Indexer{
43-
log: logger.Named(l, "Indexers"),
43+
log: logger.Named(l, "SuiIndexers"),
4444
eventsIndexer: eventsIndexer,
4545
eventsIndexerCancel: nil,
4646
transactionIndexer: transactionIndexer,

relayer/chainreader/indexer/transactions_indexer.go

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/block-vision/sui-go-sdk/models"
14+
"github.com/mr-tron/base58"
1415
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1516
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
1617
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
@@ -65,12 +66,13 @@ func NewTransactionsIndexer(
6566
syncTimeout time.Duration,
6667
eventConfigs map[string]*config.ChainReaderEvent,
6768
) TransactionsIndexerApi {
68-
dataStore := database.NewDBStore(db, lggr)
69+
logInstance := logger.Named(lggr, "SuiTransactionsIndexer")
70+
dataStore := database.NewDBStore(db, logInstance)
6971

7072
return &TransactionsIndexer{
7173
db: dataStore,
7274
client: sdkClient,
73-
logger: lggr,
75+
logger: logInstance,
7476
pollingInterval: pollingInterval,
7577
syncTimeout: syncTimeout,
7678
transmitters: make(map[models.SuiAddress]string),
@@ -230,6 +232,8 @@ func (tIndexer *TransactionsIndexer) SyncAllTransmittersTransactions(ctx context
230232
return nil
231233
}
232234

235+
tIndexer.logger.Debugw("syncTransmittersTransactions start", "transmitters", transmitters)
236+
233237
var batchSize uint64 = 50
234238
var totalProcessed int
235239

@@ -269,6 +273,8 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
269273
cursor := tIndexer.transmitters[transmitter]
270274
totalProcessed := 0
271275

276+
tIndexer.logger.Debugw("syncTransmitterTransactions start", "transmitter", transmitter, "cursor", cursor)
277+
272278
eventAccountAddress, latestOfframpPackageId, err := tIndexer.getEventPackageIdFromConfig()
273279
if err != nil {
274280
return 0, fmt.Errorf("failed to get ExecutionStateChanged event config: %w", err)
@@ -279,6 +285,21 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
279285
case <-ctx.Done():
280286
return totalProcessed, ctx.Err()
281287
default:
288+
if cursor == "" {
289+
// Get the cursor from the DB store
290+
transmitterCursorFromDB, err := tIndexer.db.GetTransmitterCursor(ctx, transmitter)
291+
if err != nil {
292+
tIndexer.logger.Warnw("Failed to get transmitter cursor from DB store", "error", err)
293+
}
294+
// Attempt to check if a cursor exists in the DB store
295+
if transmitterCursorFromDB != "" {
296+
tIndexer.logger.Debugw("Found transmitter cursor in DB store", "transmitter", transmitter, "cursor", transmitterCursorFromDB)
297+
cursor = transmitterCursorFromDB
298+
} else {
299+
tIndexer.logger.Debugw("No transmitter cursor found in DB store, starting fresh sync", "transmitter", transmitter)
300+
}
301+
}
302+
282303
queryResponse, err := tIndexer.client.QueryTransactions(ctx, string(transmitter), &cursor, &batchSize)
283304
if err != nil {
284305
return totalProcessed, fmt.Errorf("failed to fetch transactions for transmitter %s: %w", transmitter, err)
@@ -292,6 +313,12 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
292313
defer func() {
293314
// Update the cursor to the last transaction digest regardless of the code path below
294315
tIndexer.transmitters[transmitter] = lastDigest
316+
317+
// Update the cursor in the DB store
318+
err := tIndexer.db.UpdateTransmitterCursor(ctx, transmitter, lastDigest)
319+
if err != nil {
320+
tIndexer.logger.Errorw("Failed to update transmitter cursor in DB store", "error", err)
321+
}
295322
}()
296323

297324
var records []database.EventRecord
@@ -358,6 +385,7 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
358385
"Expected PTB command (_::offramp::init_execute) not found in commands of failed PTB originating from known transmitter",
359386
"transmitter", transmitter,
360387
"digest", transactionRecord.Digest,
388+
"transactionRecord", transactionRecord,
361389
)
362390
continue
363391
}
@@ -473,13 +501,27 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
473501
continue
474502
}
475503

504+
// Convert the txDigest to hex
505+
txDigestHex := transactionRecord.Digest
506+
if base64Bytes, err := base58.Decode(txDigestHex); err == nil {
507+
hexTxId := hex.EncodeToString(base64Bytes)
508+
txDigestHex = "0x" + hexTxId
509+
}
510+
511+
blockHashBytes, err := base58.Decode(checkpointResponse.Digest)
512+
if err != nil {
513+
tIndexer.logger.Errorw("Failed to decode block hash", "error", err)
514+
// fallback
515+
blockHashBytes = []byte(checkpointResponse.Digest)
516+
}
517+
476518
record := database.EventRecord{
477519
EventAccountAddress: eventAccountAddress,
478520
EventHandle: eventHandle,
479521
EventOffset: 0,
480-
TxDigest: transactionRecord.Digest,
522+
TxDigest: txDigestHex,
481523
BlockHeight: checkpointResponse.SequenceNumber,
482-
BlockHash: []byte(checkpointResponse.Digest),
524+
BlockHash: blockHashBytes,
483525
BlockTimestamp: blockTimestamp,
484526
Data: executionStateChanged,
485527
}
@@ -596,7 +638,7 @@ func (tIndexer *TransactionsIndexer) getSourceChainConfig(ctx context.Context, s
596638

597639
filter := []query.Expression{
598640
query.Comparator(selector,
599-
primitives.ValueComparator{Value: sourceChainSelector, Operator: primitives.Eq},
641+
primitives.ValueComparator{Value: strconv.FormatUint(sourceChainSelector, 10), Operator: primitives.Eq},
600642
),
601643
}
602644

0 commit comments

Comments
 (0)