Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions deployment/ccip/changeset/testhelpers/sui_indexing_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"fmt"
"time"

"github.com/block-vision/sui-go-sdk/models"
"github.com/block-vision/sui-go-sdk/sui"
cslclient "github.com/smartcontractkit/chainlink-sui/relayer/client"
)

// ErrSuiTxIndexingWaitTimeout is returned when the fullnode never served sui_getTransactionBlock
Expand All @@ -32,26 +31,21 @@ var (
// deadline; nesting context.WithTimeout(ctx, SuiTxIndexingWaitTimeout) caps the wait at
// whatever time remains on that parent and causes spurious "context deadline exceeded" on
// sui_getTransactionBlock long before SuiTxIndexingWaitTimeout elapses.
func WaitForSuiFullnodeTransaction(ctx context.Context, client sui.ISuiAPI, digest string) error {
func WaitForSuiFullnodeTransaction(ctx context.Context, client cslclient.SuiPTBClient, digest string) error {
_ = ctx // API stability; poll uses an independent wall-clock budget (see doc above).
if digest == "" {
return nil
}
pollCtx, cancel := context.WithTimeout(context.Background(), SuiTxIndexingWaitTimeout)
defer cancel()

req := models.SuiGetTransactionBlockRequest{
Digest: digest,
Options: models.SuiTransactionBlockOptions{
ShowEffects: true,
},
}

backoff := suiTxIndexingInitial
var lastErr error
for {
resp, err := client.SuiGetTransactionBlock(pollCtx, req)
if err == nil && resp.Digest == digest {
// A non-error result means the fullnode has indexed the transaction (and thus the
// updated owned-object versions are visible); execution status is irrelevant here.
_, err := client.GetTransactionStatus(pollCtx, digest)
if err == nil {
return nil
}
lastErr = err
Expand Down
170 changes: 105 additions & 65 deletions deployment/ccip/changeset/testhelpers/test_adapter_sui.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"

"github.com/block-vision/sui-go-sdk/models"
"github.com/block-vision/sui-go-sdk/sui"

"github.com/smartcontractkit/chainlink-common/pkg/types/ccipocr3"
sui_module_offramp "github.com/smartcontractkit/chainlink-sui/bindings/generated/ccip/ccip_offramp/offramp"
sui_ccip_offramp "github.com/smartcontractkit/chainlink-sui/bindings/packages/offramp"
cslclient "github.com/smartcontractkit/chainlink-sui/relayer/client"
"github.com/smartcontractkit/chainlink-sui/relayer/codec"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -103,105 +103,133 @@ func (a *SuiAdapter) ValidateExec(t *testing.T, sourceSelector uint64, startBloc
return executionStates
}

// suiEventCheckpointBackfill is how many checkpoints behind the current tip the
// emitter starts scanning when it subscribes, so events emitted just before the
// subscription (e.g. a commit that landed between send and validation) are not missed.
const suiEventCheckpointBackfill = uint64(50)

func SuiEventEmitter[T any](
t *testing.T,
client sui.ISuiAPI,
client cslclient.SuiPTBClient,
packageID, moduleName, event string,
done chan any,
) (<-chan struct {
Event T
Version string
}, <-chan error) {
startTime := time.Now()
t.Logf("[DEBUG] SuiEventEmitter: Starting at %s - will capture ALL historical events plus new ones", startTime.Format(time.RFC3339))
t.Logf("[DEBUG] SuiEventEmitter: Starting at %s - polling checkpoints for events", startTime.Format(time.RFC3339))
ch := make(chan struct {
Event T
Version string
}, 200)
errChan := make(chan error)
limit := uint64(50)

// The gRPC client does not implement cursor-based event queries (QueryEvents is
// "pending gRPC migration"), so we poll checkpoints and filter their events by the
// fully-qualified event type, mirroring the relayer's chain_poller.
eventType := fmt.Sprintf("%s::%s::%s", packageID, moduleName, event)

go func() {
defer close(ch)
defer close(errChan)

ticker := time.NewTicker(time.Second * 2)
defer ticker.Stop()
ctx := t.Context()

var cursor interface{}
emitErr := func(err error) {
select {
case errChan <- err:
case <-done:
}
}

// Seed the scan position from the current chain tip, backfilling a small window.
latest, err := client.GetLatestCheckpoint(ctx)
if err != nil {
emitErr(fmt.Errorf("failed to get latest checkpoint: %w", err))
return
}
var nextSeq uint64
if tip := latest.GetSequenceNumber(); tip > suiEventCheckpointBackfill {
nextSeq = tip - suiEventCheckpointBackfill
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
// Drain all available pages from the current cursor position before waiting.
for {
select {
case <-done:
t.Logf("[DEBUG] SuiEventEmitter: Stopping due to done signal")
return
default:
}

latest, err := client.GetLatestCheckpoint(ctx)
if err != nil {
emitErr(fmt.Errorf("failed to get latest checkpoint: %w", err))
return
}
tip := latest.GetSequenceNumber()

for seq := nextSeq; seq <= tip; seq++ {
select {
case <-done:
t.Logf("[DEBUG] SuiEventEmitter: Stopping due to done signal")
return
default:
}

eventFilter := models.EventFilterByMoveEventType{
MoveEventType: fmt.Sprintf("%s::%s::%s", packageID, moduleName, event),
}

events, err := client.SuiXQueryEvents(t.Context(), models.SuiXQueryEventsRequest{
SuiEventFilter: eventFilter,
Cursor: cursor,
Limit: limit,
DescendingOrder: false,
})
data, err := client.GetCheckpointData(ctx, seq)
if err != nil {
t.Logf("[DEBUG] SuiEventEmitter: Query error: %v", err)
select {
case errChan <- err:
case <-done:
return
if isSuiCheckpointNotFound(err) {
// Tip advanced past a checkpoint not yet available; retry next tick.
break
}
emitErr(err)
return
}

if len(events.Data) == 0 {
t.Logf("[DEBUG] SuiEventEmitter: No new events found")
break
}

t.Logf("[DEBUG] SuiEventEmitter: Processing %d events", len(events.Data))

for _, ev := range events.Data {
eventID := fmt.Sprintf("%s:%s", ev.Id.TxDigest, ev.Id.EventSeq)

var out T
if err := codec.DecodeSuiJsonValue(ev.ParsedJson, &out); err != nil {
t.Logf("[DEBUG] SuiEventEmitter: Decode error for event %s: %v (skipping)", eventID, err)
continue
}

eventData := struct {
Event T
Version string
}{
Event: out,
Version: ev.Id.EventSeq,
}

select {
case ch <- eventData:
t.Logf("[DEBUG] SuiEventEmitter: Sent event %s with type %s at timestamp %s", eventID, ev.Type, ev.TimestampMs)
case <-done:
t.Logf("[DEBUG] SuiEventEmitter: Stopping due to done signal during send")
return
default:
t.Logf("[WARNING] SuiEventEmitter: Channel full, dropping event %s", eventID)
for _, tx := range data.Transactions {
for _, ev := range tx.GetEvents().GetEvents() {
// Sui event struct types always carry the original defining
// package's ID, so match on the fully-qualified handle.
qualified := strings.Join([]string{ev.GetPackageId(), ev.GetModule(), ev.GetEventType()}, "::")
if qualified != eventType {
continue
}
if ev.GetJson() == nil {
continue
}

var out T
if err := codec.DecodeSuiJsonValue(ev.GetJson().AsInterface(), &out); err != nil {
t.Logf("[DEBUG] SuiEventEmitter: Decode error at checkpoint %d: %v (skipping)", seq, err)
continue
}

eventData := struct {
Event T
Version string
}{
Event: out,
Version: strconv.FormatUint(seq, 10),
}

select {
case ch <- eventData:
t.Logf("[DEBUG] SuiEventEmitter: Sent %s event from checkpoint %d", eventType, seq)
case <-done:
t.Logf("[DEBUG] SuiEventEmitter: Stopping due to done signal during send")
return
default:
t.Logf("[WARNING] SuiEventEmitter: Channel full, dropping event at checkpoint %d", seq)
}
}
}

// Advance the cursor so the next query picks up where we left off.
cursor = events.NextCursor

if !events.HasNextPage {
break
}
nextSeq = seq + 1
}

select {
case <-done:
t.Logf("[DEBUG] SuiEventEmitter: Stopping due to done signal in ticker loop")
Expand All @@ -214,6 +242,18 @@ func SuiEventEmitter[T any](
return ch, errChan
}

// isSuiCheckpointNotFound reports whether err indicates a checkpoint that is not yet
// available on the fullnode (the tip can advance past the latest indexed checkpoint).
func isSuiCheckpointNotFound(err error) bool {
for err != nil {
if strings.Contains(strings.ToLower(err.Error()), "not found") {
return true
}
err = errors.Unwrap(err)
}
return false
}

func confirmCommitWithExpectedSeqNumRangeSui(
t *testing.T,
srcSelector uint64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,11 @@ func LatestBlock(ctx context.Context, env cldf.Environment, chainSelector uint64
return env.BlockChains.SolanaChains()[chainSelector].Client.GetSlot(ctx, solconfig.DefaultCommitment)
case chainsel.FamilySui:
suiClient := env.BlockChains.SuiChains()[chainSelector].Client
seqNum, err := suiClient.SuiGetLatestCheckpointSequenceNumber(ctx)
checkpoint, err := suiClient.GetLatestCheckpoint(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get sui latest checkpoint: %w", err)
}
return seqNum, nil
return checkpoint.GetSequenceNumber(), nil
case chainsel.FamilyAptos:
chainInfo, err := env.BlockChains.AptosChains()[chainSelector].Client.Info()
if err != nil {
Expand Down
Loading