diff --git a/.dockerignore b/.dockerignore index be3f035d..eaa59cdc 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,4 @@ target/ +storage/ .soroban/ .cargo/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 2976ef21..a6e519d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,10 @@ go get -u github.com/stellar/go-stellar-sdk/protocols/rpc ``` ### Added +- Added `--backfill` configuration parameter providing synchronous backfilling of `HISTORY_RETENTION_WINDOW` ledgers to the local DB prior to RPC starting. For one week of ledgers (approximately 150Gb), this can be expected to complete in under three hours and use <3 Gb of memory (less than core itself). To use this, one must enable a datastore and `SERVE_LEDGERS_FROM_DATASTORE`, which also enables `getLedger` ([#571](https://github.com/stellar/stellar-rpc/pull/571)). - Expanded `getLatestLedger` endpoint to also return `closeTime`, `headerXdr`, and `metadataXdr` ([#554](https://github.com/stellar/stellar-rpc/pull/554)). - Added `soroban-env-host` info to `version` command ([#550](https://github.com/stellar/stellar-rpc/pull/550)). -- Added a new `--network` configuration parameter, allowing users to specify a default Stellar network (`testnet`, `pubnet`, or `futurenet`) ([#540](https://github.com/stellar/stellar-rpc/pull/540), [#543](https://github.com/stellar/stellar-rpc/pull/543)). +- Added `--network` configuration parameter, allowing users to specify a default Stellar network (`testnet`, `pubnet`, or `futurenet`) ([#540](https://github.com/stellar/stellar-rpc/pull/540), [#543](https://github.com/stellar/stellar-rpc/pull/543)). - Simulation has been updated to support Protocol 25 ([#548](https://github.com/stellar/stellar-rpc/pull/548)). ### Fixed diff --git a/cmd/stellar-rpc/internal/config/main.go b/cmd/stellar-rpc/internal/config/main.go index 429792e0..c74d92ad 100644 --- a/cmd/stellar-rpc/internal/config/main.go +++ b/cmd/stellar-rpc/internal/config/main.go @@ -18,6 +18,7 @@ type Config struct { Strict bool StellarCoreURL string + Backfill bool CaptiveCoreStoragePath string StellarCoreBinaryPath string CaptiveCoreConfigPath string diff --git a/cmd/stellar-rpc/internal/config/options.go b/cmd/stellar-rpc/internal/config/options.go index 34dd02e9..909e2939 100644 --- a/cmd/stellar-rpc/internal/config/options.go +++ b/cmd/stellar-rpc/internal/config/options.go @@ -82,6 +82,19 @@ func (cfg *Config) options() Options { return nil }, }, + { + Name: "backfill", + Usage: "Populates database with `history-retention-window` ledgers synchronously on startup. This defaults to a week of ledgers if unspecified", + ConfigKey: &cfg.Backfill, + DefaultValue: false, + Validate: func(_ *Option) error { + // Ensure config is valid for backfill + if cfg.Backfill && !cfg.ServeLedgersFromDatastore { + return errors.New("backfill requires serving ledgers from datastore to be enabled. See the `--serve-ledgers-from-datastore` flag") + } + return nil + }, + }, { Name: "stellar-core-timeout", Usage: "Timeout used when submitting requests to stellar-core", diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index ae3bdd80..1f0a2982 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -190,10 +190,41 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { feewindows := daemon.mustInitializeStorage(cfg) + // Create the read-writer once and reuse in ingest service/backfill + rw := db.NewReadWriter( + logger, + daemon.db, + daemon, + maxLedgerEntryWriteBatchSize, + cfg.HistoryRetentionWindow, + cfg.NetworkPassphrase, + ) if cfg.ServeLedgersFromDatastore { daemon.dataStore, daemon.dataStoreSchema = mustCreateDataStore(cfg, logger) } - daemon.ingestService = createIngestService(cfg, logger, daemon, feewindows, historyArchive) + var ingestCfg ingest.Config + daemon.ingestService, ingestCfg = createIngestService(cfg, logger, daemon, feewindows, historyArchive, rw) + if cfg.Backfill { + backfillMeta, err := ingest.NewBackfillMeta( + logger, + daemon.ingestService, + db.NewLedgerReader(daemon.db), + daemon.dataStore, + daemon.dataStoreSchema, + ) + if err != nil { + logger.WithError(err).Fatal("failed to create backfill metadata") + } + if err := backfillMeta.RunBackfill(cfg); err != nil { + logger.WithError(err).Fatal("failed to backfill ledgers") + } + // Clear the DB cache and fee windows so they re-populate from the database + daemon.db.ResetCache() + feewindows.Reset() + } + // Start ingestion service only after backfill is complete + daemon.ingestService.Start(ingestCfg) + daemon.preflightWorkerPool = createPreflightWorkerPool(cfg, logger, daemon) daemon.jsonRPCHandler = createJSONRPCHandler(cfg, logger, daemon, feewindows) @@ -286,22 +317,15 @@ func createHighperfStellarCoreClient(cfg *config.Config) interfaces.FastCoreClie } func createIngestService(cfg *config.Config, logger *supportlog.Entry, daemon *Daemon, - feewindows *feewindow.FeeWindows, historyArchive *historyarchive.ArchiveInterface, -) *ingest.Service { + feewindows *feewindow.FeeWindows, historyArchive *historyarchive.ArchiveInterface, rw db.ReadWriter, +) (*ingest.Service, ingest.Config) { onIngestionRetry := func(err error, _ time.Duration) { logger.WithError(err).Error("could not run ingestion. Retrying") } - return ingest.NewService(ingest.Config{ - Logger: logger, - DB: db.NewReadWriter( - logger, - daemon.db, - daemon, - maxLedgerEntryWriteBatchSize, - cfg.HistoryRetentionWindow, - cfg.NetworkPassphrase, - ), + ingestCfg := ingest.Config{ + Logger: logger, + DB: rw, NetworkPassPhrase: cfg.NetworkPassphrase, Archive: *historyArchive, LedgerBackend: daemon.core, @@ -309,7 +333,8 @@ func createIngestService(cfg *config.Config, logger *supportlog.Entry, daemon *D OnIngestionRetry: onIngestionRetry, Daemon: daemon, FeeWindows: feewindows, - }) + } + return ingest.NewService(ingestCfg), ingestCfg } func createPreflightWorkerPool(cfg *config.Config, logger *supportlog.Entry, daemon *Daemon) *preflight.WorkerPool { diff --git a/cmd/stellar-rpc/internal/db/db.go b/cmd/stellar-rpc/internal/db/db.go index fe5c24fc..f5fe2a7c 100644 --- a/cmd/stellar-rpc/internal/db/db.go +++ b/cmd/stellar-rpc/internal/db/db.go @@ -58,6 +58,13 @@ type DB struct { cache *dbCache } +func (d *DB) ResetCache() { + d.cache.Lock() + defer d.cache.Unlock() + d.cache.latestLedgerSeq = 0 + d.cache.latestLedgerCloseTime = 0 +} + func openSQLiteDB(dbFilePath string) (*db.Session, error) { // 1. Use Write-Ahead Logging (WAL). // 2. Disable WAL auto-checkpointing (we will do the checkpointing ourselves with wal_checkpoint pragmas @@ -156,7 +163,7 @@ func getLatestLedgerSequence(ctx context.Context, ledgerReader LedgerReader, cac // Add missing ledger sequence and close time to the top cache. // Otherwise, the write-through cache won't get updated until the first ingestion commit cache.Lock() - if cache.latestLedgerSeq == 0 { + if cache.latestLedgerSeq < ledgerRange.LastLedger.Sequence { // Only update the cache if the value is missing (0), otherwise // we may end up overwriting the entry with an older version cache.latestLedgerSeq = ledgerRange.LastLedger.Sequence @@ -335,8 +342,10 @@ func (w writeTx) Commit(ledgerCloseMeta xdr.LedgerCloseMeta, durationMetrics map if err := w.tx.Commit(); err != nil { return err } - w.globalCache.latestLedgerSeq = ledgerSeq - w.globalCache.latestLedgerCloseTime = ledgerCloseTime + if ledgerSeq > w.globalCache.latestLedgerSeq { + w.globalCache.latestLedgerSeq = ledgerSeq + w.globalCache.latestLedgerCloseTime = ledgerCloseTime + } return nil } startTime = time.Now() diff --git a/cmd/stellar-rpc/internal/db/ledger.go b/cmd/stellar-rpc/internal/db/ledger.go index c28b1782..b5ca3a3b 100644 --- a/cmd/stellar-rpc/internal/db/ledger.go +++ b/cmd/stellar-rpc/internal/db/ledger.go @@ -25,6 +25,7 @@ type LedgerReader interface { GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) + GetLedgerCountInRange(ctx context.Context, start uint32, end uint32) (uint32, uint32, uint32, error) StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error NewTx(ctx context.Context) (LedgerReaderTx, error) GetLatestLedgerSequence(ctx context.Context) (uint32, error) @@ -208,6 +209,10 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le return getLedgerRangeWithoutCache(ctx, r.db) } +func (r ledgerReader) GetLedgerCountInRange(ctx context.Context, start, end uint32) (uint32, uint32, uint32, error) { + return getLedgerCountInRange(ctx, r.db, start, end) +} + func (r ledgerReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { return getLatestLedgerSequence(ctx, r, r.db.cache) } @@ -273,6 +278,30 @@ func getLedgerRangeWithoutCache(ctx context.Context, db readDB) (ledgerbucketwin }, nil } +// Queries a local DB, and in the inclusive range [start, end], returns the count of ledgers, and min/max sequence nums +func getLedgerCountInRange(ctx context.Context, db readDB, start, end uint32) (uint32, uint32, uint32, error) { + sql := sq.Select("COUNT(*) as count", "MIN(sequence) as min_seq", "MAX(sequence) as max_seq"). + From(ledgerCloseMetaTableName). + Where(sq.And{ + sq.GtOrEq{"sequence": start}, + sq.LtOrEq{"sequence": end}, + }) + + var results []struct { + Count uint32 `db:"count"` + MinSeq uint32 `db:"min_seq"` + MaxSeq uint32 `db:"max_seq"` + } + if err := db.Select(ctx, &results, sql); err != nil { + return 0, 0, 0, err + } + if len(results) == 0 || results[0].Count == 0 { + return 0, 0, 0, nil + } + + return results[0].Count, results[0].MinSeq, results[0].MaxSeq, nil +} + type ledgerWriter struct { stmtCache *sq.StmtCache } diff --git a/cmd/stellar-rpc/internal/db/mocks.go b/cmd/stellar-rpc/internal/db/mocks.go index de7c0b39..513083de 100644 --- a/cmd/stellar-rpc/internal/db/mocks.go +++ b/cmd/stellar-rpc/internal/db/mocks.go @@ -117,6 +117,13 @@ func (m *MockLedgerReader) NewTx(_ context.Context) (LedgerReaderTx, error) { return nil, errors.New("mock NewTx error") } +func (m *MockLedgerReader) GetLedgerCountInRange(_ context.Context, + _ uint32, + _ uint32, +) (uint32, uint32, uint32, error) { + return 0, 0, 0, nil +} + var ( _ TransactionReader = &MockTransactionHandler{} _ TransactionWriter = &MockTransactionHandler{} diff --git a/cmd/stellar-rpc/internal/feewindow/feewindow.go b/cmd/stellar-rpc/internal/feewindow/feewindow.go index 4faefbae..77ae56c1 100644 --- a/cmd/stellar-rpc/internal/feewindow/feewindow.go +++ b/cmd/stellar-rpc/internal/feewindow/feewindow.go @@ -64,6 +64,14 @@ func (fw *FeeWindow) AppendLedgerFees(fees ledgerbucketwindow.LedgerBucket[[]uin return nil } +// Reset clears all ledger fees from the window +func (fw *FeeWindow) Reset() { + fw.lock.Lock() + defer fw.lock.Unlock() + fw.feesPerLedger.Reset() + fw.distribution = FeeDistribution{} +} + func computeFeeDistribution(fees []uint64, ledgerCount uint32) FeeDistribution { if len(fees) == 0 { return FeeDistribution{} @@ -207,6 +215,12 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { return nil } +// Reset clears all fee windows +func (fw *FeeWindows) Reset() { + fw.SorobanInclusionFeeWindow.Reset() + fw.ClassicFeeWindow.Reset() +} + func (fw *FeeWindows) AsMigration(seqRange db.LedgerSeqRange) db.Migration { return &feeWindowMigration{ firstLedger: seqRange.First, diff --git a/cmd/stellar-rpc/internal/ingest/backfill.go b/cmd/stellar-rpc/internal/ingest/backfill.go new file mode 100644 index 00000000..0c434794 --- /dev/null +++ b/cmd/stellar-rpc/internal/ingest/backfill.go @@ -0,0 +1,383 @@ +package ingest + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + + checkpoint "github.com/stellar/go-stellar-sdk/historyarchive" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/support/datastore" + supportlog "github.com/stellar/go-stellar-sdk/support/log" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" +) + +const ( + // Number of ledgers to read/write per commit during backfill + ChunkSize uint32 = config.OneDayOfLedgers / 18 // = 960 ledgers, approx. 2Gb of RAM usage + // Acceptable number of ledgers that may be missing from the backfill tail/head + ledgerThreshold uint32 = 384 // six checkpoints/~30 minutes of ledgers +) + +// This struct holds the metadata/constructs necessary for most backfilling operations, including +// the local database reader and writer, the cloud datastore info, and a logger. +type BackfillMeta struct { + logger *supportlog.Entry + ingestService *Service + dsInfo datastoreInfo + dbInfo databaseInfo +} + +type datastoreInfo struct { + ds datastore.DataStore + schema datastore.DataStoreSchema + sequences db.LedgerSeqRange // holds the sequence numbers of the oldest and current tip ledgers in the datastore +} + +// This struct holds the local database read/write constructs and metadata initially associated with it +type databaseInfo struct { + reader db.LedgerReader + sequences db.LedgerSeqRange // holds the sequence numbers of the oldest and newest ledgers in the local database + isNewDb bool +} + +type fillBounds struct { + backfill db.LedgerSeqRange + frontfill db.LedgerSeqRange + nBackfill uint32 + checkpointAligner checkpoint.CheckpointManager +} + +// Creates a new BackfillMeta struct +func NewBackfillMeta( + logger *supportlog.Entry, + service *Service, + reader db.LedgerReader, + ds datastore.DataStore, + dsSchema datastore.DataStoreSchema, +) (BackfillMeta, error) { + ctx, cancelInit := context.WithTimeout(context.Background(), time.Minute) + defer cancelInit() + + // Query local DB to determine min and max sequence numbers among the written ledgers + var dbIsEmpty bool + ledgerRange, err := reader.GetLedgerRange(ctx) + if errors.Is(err, db.ErrEmptyDB) { + dbIsEmpty = true + } else if err != nil { + return BackfillMeta{}, errors.Wrap(err, "could not get ledger range from local DB") + } + // Query remote datastore to determine min and max sequence numbers among the written ledgers + minWrittenDSLedger, err := datastore.FindOldestLedgerSequence(ctx, ds, dsSchema) + if err != nil { + return BackfillMeta{}, errors.Wrap(err, "could not get oldest ledger sequence from datastore") + } + + return BackfillMeta{ + logger: logger.WithField("subservice", "backfill"), + ingestService: service, + dbInfo: databaseInfo{ + reader: reader, + sequences: db.LedgerSeqRange{ + First: ledgerRange.FirstLedger.Sequence, + Last: ledgerRange.LastLedger.Sequence, + }, + isNewDb: dbIsEmpty, + }, + dsInfo: datastoreInfo{ + ds: ds, + schema: dsSchema, + sequences: db.LedgerSeqRange{ + First: minWrittenDSLedger, + // last is set any time getLatestSeqInCDP is called + }, + }, + }, nil +} + +// This function backfills the local database with ledgers from the datastore +// It guarantees the backfill of the most recent cfg.HistoryRetentionWindow ledgers +// Requires that no sequence number gaps exist in the local DB prior to backfilling +func (b *BackfillMeta) RunBackfill(cfg *config.Config) error { + ctx := context.Background() + + // Ensure no pre-existing gaps in local DB + if _, _, err := b.verifyDbGapless(ctx, cfg.IngestionTimeout); err != nil { + return err + } + bounds, err := b.setBounds(ctx, cfg.HistoryRetentionWindow, cfg.CheckpointFrequency) + if err != nil { + return errors.Wrap(err, "could not set backfill bounds") + } + + // Fill backwards from local DB tail to the left edge of retention window if the DB already has ledgers + if bounds, err = b.runBackfill(ctx, bounds); err != nil { + return err + } + // Fill forward from local DB head (or left edge of retention window, if empty) to current tip of datastore + if bounds, err = b.runFrontfill(ctx, bounds); err != nil { + return err + } + + // Ensure no gaps introduced and retention window requirements are (at least approximately) met + minSeq, maxSeq, err := b.verifyDbGapless(ctx, cfg.IngestionTimeout) + if err != nil { + return err + } + return b.verifyBounds(bounds.nBackfill, minSeq, maxSeq) +} + +// Sets the bounds for backfill and frontfill phases, determines number of ledgers to backfill, and +// whether backfill phase should be skipped +func (b *BackfillMeta) setBounds( + ctx context.Context, + retentionWindow uint32, + checkpointFrequency uint32, +) (fillBounds, error) { + currentTipLedger, err := datastore.FindLatestLedgerSequence(ctx, b.dsInfo.ds) + if err != nil { + return fillBounds{}, errors.Wrap(err, "could not get latest ledger number from cloud datastore") + } + bounds := fillBounds{ + nBackfill: min(retentionWindow, currentTipLedger), + checkpointAligner: checkpoint.NewCheckpointManager(checkpointFrequency), + } + + // Determine the oldest/starting ledger to fill from + var fillStartMin uint32 // minimum possible ledger to start from + if currentTipLedger >= bounds.nBackfill+1 { + fillStartMin = max(currentTipLedger-bounds.nBackfill+1, b.dsInfo.sequences.First) + } else { + fillStartMin = b.dsInfo.sequences.First + } + + minDbSeq, maxDbSeq := b.dbInfo.sequences.First, b.dbInfo.sequences.Last + var fillCount uint32 + // if initial DB empty or tail covers edge of filling window, skip backwards backfill + switch { + case b.dbInfo.isNewDb: + bounds.frontfill.First = fillStartMin + fillCount = currentTipLedger - fillStartMin + 1 + case minDbSeq <= fillStartMin: + // DB tail already covers left edge of retention window + bounds.frontfill.First = maxDbSeq + 1 + fillCount = currentTipLedger - maxDbSeq + default: + if currentTipLedger < b.dbInfo.sequences.First { + // this would introduce a gap missing ledgers of sequences between the current tip and local DB minimum + return fillBounds{}, errors.New("current datastore tip is older than local DB minimum ledger") + } + bounds.backfill.First = fillStartMin + bounds.backfill.Last = minDbSeq - 1 + bounds.frontfill.First = maxDbSeq + 1 + fillCount = bounds.nBackfill - (maxDbSeq - minDbSeq + 1) + // frontfill last changes dynamically based on current tip ledger + } + b.logger.Infof("Current tip ledger in cloud datastore is %d, going to backfill %d ledgers", + currentTipLedger, fillCount) + return bounds, nil +} + +// Backfills the local DB with older ledgers from newest to oldest within the retention window +func (b *BackfillMeta) runBackfill(ctx context.Context, bounds fillBounds) (fillBounds, error) { + var err error + if bounds.needBackfillPhase() { + b.logger.Infof("Backfilling to the left edge of retention window, ledgers [%d <- %d]", + bounds.backfill.First, bounds.backfill.Last) + if bounds, err = b.backfillChunks(ctx, bounds); err != nil { + return fillBounds{}, errors.Wrap(err, "backfill failed") + } + b.dbInfo.sequences.First = bounds.backfill.First + } + b.logger.Infof("Backfill of old ledgers complete") + return bounds, nil +} + +// Backfills the local DB with older ledgers from oldest to newest within the retention window +func (b *BackfillMeta) runFrontfill(ctx context.Context, bounds fillBounds) (fillBounds, error) { + numIterations := 1 + // If we skipped backfilling, we want to fill forwards twice because the latest ledger may be + // significantly further in the future after the first fill completes and fills are faster than catch-up. + if !bounds.needBackfillPhase() { + numIterations = 2 + } + for range numIterations { + currentTipLedger, err := datastore.FindLatestLedgerSequence(ctx, b.dsInfo.ds) + if err != nil { + return fillBounds{}, errors.Wrap(err, "could not get latest ledger number from cloud datastore") + } + currentTipLedger = bounds.checkpointAligner.PrevCheckpoint(currentTipLedger) + if bounds.frontfill.First < currentTipLedger { + b.logger.Infof("Frontfilling to the current datastore tip, ledgers [%d -> %d]", + bounds.frontfill.First, currentTipLedger) + if err := b.frontfillChunks(ctx, bounds); err != nil { + return fillBounds{}, errors.Wrap(err, "frontfill failed") + } + } else { + b.logger.Infof("No extra filling needed, local DB head already at datastore tip") + } + // Update frontfill.First for next iteration (if any) + bounds.frontfill.First = currentTipLedger + 1 + } + b.dbInfo.sequences.Last = max(bounds.frontfill.First-1, b.dbInfo.sequences.Last) + b.logger.Infof("Forward backfill of recent ledgers complete") + return bounds, nil +} + +// Verifies backfilled ledgers meet retention window requirements and warns if not +func (b *BackfillMeta) verifyBounds(nBackfill, minSeq, maxSeq uint32) error { + count := maxSeq - minSeq + 1 + if count+ledgerThreshold < nBackfill { + b.logger.Warnf("post-backfill verification warning: expected at least %d ledgers, "+ + "got %d ledgers (exceeds acceptable threshold of %d missing ledgers)", nBackfill, count, ledgerThreshold) + b.logger.Warn("You may wish to run backfill again to avoid a long post-backfill catch-up period") + } + b.logger.Infof("Backfill process complete, ledgers [%d -> %d] are now in local DB", minSeq, maxSeq) + return nil +} + +// Checks to ensure state of local DB is acceptable for backfilling +func (b *BackfillMeta) verifyDbGapless(ctx context.Context, timeout time.Duration) (uint32, uint32, error) { + ctx, cancelCheckNoGaps := context.WithTimeout(ctx, timeout) + defer cancelCheckNoGaps() + + ledgerRange, err := b.dbInfo.reader.GetLedgerRange(ctx) + if errors.Is(err, db.ErrEmptyDB) { + return 0, 0, nil // empty DB is considered gapless + } else if err != nil { + return 0, 0, errors.Wrap(err, "db verify: could not get ledger range") + } + // Get sequence number of highest/lowest ledgers in local DB + minDbSeq, maxDbSeq := ledgerRange.FirstLedger.Sequence, ledgerRange.LastLedger.Sequence + b.logger.Debugf("DB verify: checking for gaps in [%d, %d]", minDbSeq, maxDbSeq) + expectedCount := maxDbSeq - minDbSeq + 1 + count, sequencesMin, sequencesMax, err := b.dbInfo.reader.GetLedgerCountInRange(ctx, minDbSeq, maxDbSeq) + if err != nil { + return 0, 0, errors.Wrap(err, "db verify: could not get ledger sequences in local DB") + } + if count != expectedCount { + return 0, 0, fmt.Errorf("db verify: gap detected in local DB: expected %d ledgers, got %d ledgers", + expectedCount, count) + } + return sequencesMin, sequencesMax, nil +} + +// Backfills the local DB with ledgers in [lBound, rBound] from the cloud datastore +// Used to fill local DB backwards towards older ledgers (starting from newest) +func (b *BackfillMeta) backfillChunks(ctx context.Context, bounds fillBounds) (fillBounds, error) { + lBound, rBound := bounds.backfill.First, bounds.backfill.Last + for i, rChunkBound := 0, rBound; rChunkBound >= lBound; i++ { // note: lBound changes in the loop body + if err := ctx.Err(); err != nil { + return fillBounds{}, err + } + // Create temporary backend for backward-filling chunks + // Note monotonicity constraint of the ledger backend + tempBackend, err := makeBackend(b.dsInfo) + if err != nil { + return fillBounds{}, errors.Wrap(err, "couldn't create backend") + } + defer func() { + if err := tempBackend.Close(); err != nil { + b.logger.WithError(err).Error("error closing temporary backend") + } + }() + + lChunkBound := lBound + // Underflow-safe check for setting left chunk bound + if rChunkBound >= lBound+ChunkSize-1 { + lChunkBound = max(lBound, rChunkBound-ChunkSize+1) + } + + b.logger.Infof("Backfill: filling ledgers [%d, %d]", lChunkBound, rChunkBound) + chunkRange := ledgerbackend.BoundedRange(lChunkBound, rChunkBound) + if err := tempBackend.PrepareRange(ctx, chunkRange); err != nil { + return fillBounds{}, err + } + if err := b.ingestService.ingestRange(ctx, tempBackend, chunkRange); err != nil { + return fillBounds{}, errors.Wrapf(err, "couldn't fill chunk [%d, %d]", lChunkBound, rChunkBound) + } + b.logger.Infof("Backfill: committed ledgers [%d, %d]; %d%% done", + lChunkBound, rChunkBound, 100*(rBound-lChunkBound)/max(rBound-lBound, 1)) + + if lChunkBound <= lBound { + break + } + rChunkBound = lChunkBound - 1 + // Refresh lBound periodically to account for ledgers coming into the datastore + if i > 0 && i%10 == 0 { + currentTipLedger, err := datastore.FindLatestLedgerSequence(ctx, b.dsInfo.ds) + if err != nil { + return fillBounds{}, err + } + lBound = max(currentTipLedger-bounds.nBackfill+1, b.dsInfo.sequences.First) + } + } + bounds.backfill.First = lBound + return bounds, nil +} + +// Backfills the local DB with ledgers in [lBound, rBound] from the cloud datastore +// Used to fill local DB forwards towards the current ledger tip +func (b *BackfillMeta) frontfillChunks(ctx context.Context, bounds fillBounds) error { + rBound, err := datastore.FindLatestLedgerSequence(ctx, b.dsInfo.ds) + if err != nil { + return errors.Wrap(err, "could not get latest ledger number from cloud datastore") + } + lBound := bounds.frontfill.First + // Backend for frontfill can be persistent over multiple chunks + backend, err := makeBackend(b.dsInfo) + if err != nil { + return errors.Wrap(err, "could not create ledger backend") + } + defer func() { + if err := backend.Close(); err != nil { + b.logger.WithError(err).Error("error closing ledger backend") + } + }() + + ledgerRange := ledgerbackend.BoundedRange(lBound, rBound) + if err := backend.PrepareRange(ctx, ledgerRange); err != nil { + return err + } + for lChunkBound := lBound; lChunkBound <= rBound; lChunkBound += ChunkSize { + if err := ctx.Err(); err != nil { + return err + } + rChunkBound := min(rBound, lChunkBound+ChunkSize-1) + chunkRange := ledgerbackend.BoundedRange(lChunkBound, rChunkBound) + + b.logger.Infof("Frontfill: filling ledgers [%d, %d]", lChunkBound, rChunkBound) + if err := b.ingestService.ingestRange(ctx, backend, chunkRange); err != nil { + return errors.Wrapf(err, "couldn't fill chunk [%d, %d]", lChunkBound, rChunkBound) + } + b.logger.Infof("Frontfill: committed ledgers [%d, %d]; %d%% done", + lChunkBound, rChunkBound, 100*(rChunkBound-lBound)/max(rBound-lBound, 1)) + } + return nil +} + +// Returns a buffered storage backend for the given datastore +func makeBackend(dsInfo datastoreInfo) (ledgerbackend.LedgerBackend, error) { + ledgersPerFile := dsInfo.schema.LedgersPerFile + bufferSize := max(1024/ledgersPerFile, 10) // use fewer files if many ledgers per file + numWorkers := max(bufferSize/10, 5) // approx. 1 worker per 10 buffered files + return ledgerbackend.NewBufferedStorageBackend( + ledgerbackend.BufferedStorageBackendConfig{ + BufferSize: bufferSize, // number of files to buffer + NumWorkers: numWorkers, // number of concurrent GCS fetchers; each shares one buffer of above size + RetryLimit: 3, + RetryWait: 5 * time.Second, + }, + dsInfo.ds, + dsInfo.schema, + ) +} + +// Determines if backfill phase should be skipped +func (bounds fillBounds) needBackfillPhase() bool { + return !bounds.backfill.Empty() +} diff --git a/cmd/stellar-rpc/internal/ingest/backfill_test.go b/cmd/stellar-rpc/internal/ingest/backfill_test.go new file mode 100644 index 00000000..c238b40a --- /dev/null +++ b/cmd/stellar-rpc/internal/ingest/backfill_test.go @@ -0,0 +1,80 @@ +package ingest + +import ( + "path" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/stellar/go-stellar-sdk/network" + supportlog "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" +) + +func TestGapDetection(t *testing.T) { + ctx := t.Context() + testLogger := supportlog.New() + + tmp := t.TempDir() + dbPath := path.Join(tmp, "test.sqlite") + testDB, err := db.OpenSQLiteDB(dbPath) + require.NoError(t, err) + defer testDB.Close() + + rw := db.NewReadWriter(testLogger, testDB, interfaces.MakeNoOpDeamon(), 10, 10, + network.TestNetworkPassphrase) + + writeTx, err := rw.NewTx(ctx) + require.NoError(t, err) + + // Missing ledger 103 + ledgers := []xdr.LedgerCloseMeta{ + createLedger(100), + createLedger(101), + createLedger(102), + createLedger(104), + createLedger(105), + } + for _, ledger := range ledgers { + require.NoError(t, writeTx.LedgerWriter().InsertLedger(ledger)) + } + require.NoError(t, writeTx.Commit(ledgers[len(ledgers)-1], nil)) + backfill := &BackfillMeta{ + logger: testLogger, + dbInfo: databaseInfo{reader: db.NewLedgerReader(testDB)}, + } + _, _, err = backfill.verifyDbGapless(ctx, 5*time.Second) + require.Error(t, err) + require.ErrorContains(t, err, "gap detected in local DB") + + // Now insert the missing ledger and verify no gap is detected + writeTx, err = rw.NewTx(ctx) + require.NoError(t, err) + require.NoError(t, writeTx.LedgerWriter().InsertLedger(createLedger(103))) + require.NoError(t, writeTx.Commit(ledgers[len(ledgers)-1], nil)) + + _, _, err = backfill.verifyDbGapless(ctx, 5*time.Second) + require.NoError(t, err) +} + +func createLedger(ledgerSequence uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Hash: xdr.Hash{}, + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(ledgerSequence), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{}, + }, + }, + } +} diff --git a/cmd/stellar-rpc/internal/ingest/service.go b/cmd/stellar-rpc/internal/ingest/service.go index 3963abfa..afdb9787 100644 --- a/cmd/stellar-rpc/internal/ingest/service.go +++ b/cmd/stellar-rpc/internal/ingest/service.go @@ -41,7 +41,6 @@ type Config struct { func NewService(cfg Config) *Service { service := newService(cfg) - startService(service, cfg) return service } @@ -91,20 +90,20 @@ func newService(cfg Config) *Service { return service } -func startService(service *Service, cfg Config) { +func (s *Service) Start(cfg Config) { ctx, done := context.WithCancel(context.Background()) - service.done = done - service.wg.Add(1) + s.done = done + s.wg.Add(1) panicGroup := util.UnrecoverablePanicGroup.Log(cfg.Logger) panicGroup.Go(func() { - defer service.wg.Done() + defer s.wg.Done() // Retry running ingestion every second for 5 seconds. constantBackoff := backoff.WithMaxRetries(backoff.NewConstantBackOff(1*time.Second), maxRetries) // Don't want to keep retrying if the context gets canceled. contextBackoff := backoff.WithContext(constantBackoff, ctx) err := backoff.RetryNotify( func() error { - err := service.run(ctx, cfg.Archive) + err := s.run(ctx, cfg.Archive) if errors.Is(err, errEmptyArchives) { // keep retrying until history archives are published constantBackoff.Reset() @@ -114,7 +113,7 @@ func startService(service *Service, cfg Config) { contextBackoff, cfg.OnIngestionRetry) if err != nil && !errors.Is(err, context.Canceled) { - service.logger.WithError(err).Fatal("could not run ingestion") + s.logger.WithError(err).Fatal("could not run ingestion") } }) } @@ -135,6 +134,7 @@ type Service struct { done context.CancelFunc wg sync.WaitGroup metrics Metrics + latestIngestedSeq uint32 } func (s *Service) Close() error { @@ -207,6 +207,15 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { return err } + // Abstracted from ingestLedgerCloseMeta to allow fee window ingestion to be optional + startTime = time.Now() + if err := s.feeWindows.IngestFees(ledgerCloseMeta); err != nil { + return err + } + s.metrics.ingestionDurationMetric. + With(prometheus.Labels{"type": "fee-window"}). + Observe(time.Since(startTime).Seconds()) + durationMetrics := map[string]time.Duration{} if err := tx.Commit(ledgerCloseMeta, durationMetrics); err != nil { return err @@ -224,7 +233,60 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": "total"}). Observe(time.Since(startTime).Seconds()) - s.metrics.latestLedgerMetric.Set(float64(sequence)) + if sequence > s.latestIngestedSeq { + s.latestIngestedSeq = sequence + s.metrics.latestLedgerMetric.Set(float64(sequence)) + } + return nil +} + +// Ingests a range of ledgers from a provided ledgerBackend +func (s *Service) ingestRange(ctx context.Context, backend backends.LedgerBackend, seqRange backends.Range) error { + s.logger.Debugf("Ingesting ledgers [%d, %d]", seqRange.From(), seqRange.To()) + startTime := time.Now() + tx, err := s.db.NewTx(ctx) + if err != nil { + return err + } + + defer func() { + if err := tx.Rollback(); err != nil { + s.logger.WithError(err).Warn("could not rollback ingest write transactions") + } + }() + + var ledgerCloseMeta xdr.LedgerCloseMeta + for seq := seqRange.From(); seq <= seqRange.To(); seq++ { + ledgerCloseMeta, err = backend.GetLedger(ctx, seq) + if err != nil { + return err + } + if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta); err != nil { + return err + } + } + + durationMetrics := map[string]time.Duration{} + if err := tx.Commit(ledgerCloseMeta, durationMetrics); err != nil { + return err + } + for key, duration := range durationMetrics { + s.metrics.ingestionDurationMetric. + With(prometheus.Labels{"type": key}). + Observe(duration.Seconds()) + } + + s.logger. + WithField("duration", time.Since(startTime).Seconds()). + Debugf("Ingested ledgers [%d, %d]", seqRange.From(), seqRange.To()) + + s.metrics.ingestionDurationMetric. + With(prometheus.Labels{"type": "total"}). + Observe(time.Since(startTime).Seconds()) + if seqRange.To() > s.latestIngestedSeq { + s.latestIngestedSeq = seqRange.To() + s.metrics.latestLedgerMetric.Set(float64(seqRange.To())) + } return nil } @@ -253,13 +315,5 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge With(prometheus.Labels{"type": "events"}). Observe(time.Since(startTime).Seconds()) - startTime = time.Now() - if err := s.feeWindows.IngestFees(ledgerCloseMeta); err != nil { - return err - } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "fee-window"}). - Observe(time.Since(startTime).Seconds()) - return nil } diff --git a/cmd/stellar-rpc/internal/ingest/service_test.go b/cmd/stellar-rpc/internal/ingest/service_test.go index 4d0067d9..ed90eca3 100644 --- a/cmd/stellar-rpc/internal/ingest/service_test.go +++ b/cmd/stellar-rpc/internal/ingest/service_test.go @@ -54,6 +54,7 @@ func TestRetryRunningIngestion(t *testing.T) { Daemon: interfaces.MakeNoOpDeamon(), } service := NewService(config) + service.Start(config) retryWg.Wait() service.Close() assert.Equal(t, 1, numRetries) diff --git a/cmd/stellar-rpc/internal/integrationtest/backfill_test.go b/cmd/stellar-rpc/internal/integrationtest/backfill_test.go new file mode 100644 index 00000000..48974ab6 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/backfill_test.go @@ -0,0 +1,237 @@ +package integrationtest + +import ( + "fmt" + "path" + "testing" + "time" + + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/stretchr/testify/require" + + client "github.com/stellar/go-stellar-sdk/clients/rpcclient" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/network" + protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" + "github.com/stellar/go-stellar-sdk/support/datastore" + supportlog "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/daemon/interfaces" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure" +) + +func TestBackfillEmptyDB(t *testing.T) { + // GCS has ledgers from 2-192; history retention window is 128 + var localDbStart, localDbEnd uint32 = 0, 0 + testBackfillWithSeededDbLedgers(t, localDbStart, localDbEnd) +} + +// Backfill with some ledgers in middle of local DB (simulates quitting mid-backfill-backwards phase) +// This induces a backfill backwards from localStart-1 to (datastoreEnd - retentionWindow), +// then forwards from localEnd+1 to datastoreEnd +func TestBackfillLedgersInMiddleOfDB(t *testing.T) { + // GCS has ledgers from 2-38; history retention window is 24 + var localDbStart, localDbEnd uint32 = 24, 30 + testBackfillWithSeededDbLedgers(t, localDbStart, localDbEnd) +} + +// Backfill with some ledgers at start of DB (simulates pulling plug when backfilling forwards) +// This is a "only backfill forwards" scenario +func TestBackfillLedgersAtStartOfDB(t *testing.T) { + // GCS has ledgers from 2-38; history retention window is 24 + var localDbStart, localDbEnd uint32 = 2, 28 + testBackfillWithSeededDbLedgers(t, localDbStart, localDbEnd) +} + +func testBackfillWithSeededDbLedgers(t *testing.T, localDbStart, localDbEnd uint32) { + var ( + datastoreStart, datastoreEnd uint32 = 2, 38 // ledgers present in datastore + retentionWindow uint32 = 64 // 8 artificial checkpoints worth of ledgers + stopLedger = 66 // final ledger to ingest + ) + + gcsServer, makeDatastoreConfig := makeNewFakeGCSServer(t, datastoreStart, datastoreEnd, retentionWindow) + defer gcsServer.Stop() + + // Create temporary SQLite DB populated with dummy ledgers + dbPath := createDbWithLedgers(t, localDbStart, localDbEnd, retentionWindow) + + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + SQLitePath: dbPath, + DatastoreConfigFunc: makeDatastoreConfig, + NoParallel: true, // can't use parallel due to env vars + DelayDaemonForLedgerN: int(datastoreEnd), // stops daemon start until core has at least the datastore ledgers + IgnoreLedgerCloseTimes: true, // fake/seeded ledgers don't need correct close times relative to core's + }) + + testDb := test.GetDaemon().GetDB() + client := test.GetRPCLient() + + backfillComplete := waitUntilLedgerIngested(t, test, client, + func(l protocol.GetLatestLedgerResponse) bool { + return l.Sequence >= datastoreEnd + }, 60*time.Second, false) + t.Logf("Successfully backfilled, ledger %d fetched from DB", backfillComplete.Sequence) + + coreIngestionComplete := waitUntilLedgerIngested(t, test, client, + func(l protocol.GetLatestLedgerResponse) bool { + return l.Sequence >= uint32(stopLedger) + }, time.Duration(stopLedger)*time.Second, true) // stop core ingestion once we reach the target + t.Logf("Core ingestion complete, ledger %d fetched from captive core", coreIngestionComplete.Sequence) + time.Sleep(100 * time.Millisecond) // let final ledgers writes commit to DB before reading + + // Verify ledgers present in DB + // We cannot use GetLedgers as it will fall back to the datastore, which is cheating + reader := db.NewLedgerReader(testDb) + count, minSeq, maxSeq, err := reader.GetLedgerCountInRange(t.Context(), datastoreStart, uint32(stopLedger)) + require.NoError(t, err) + require.Equal(t, retentionWindow, count, "expected to have ingested %d ledgers, got %d", retentionWindow, count) + // Ensure at least one ledger from datastore and at least one from core ingestion + require.LessOrEqual(t, minSeq, datastoreEnd, "did not ingest ledgers from datastore: "+ + fmt.Sprintf("expected first ledger <= %d, got %d", datastoreEnd, minSeq)) + require.Greater(t, maxSeq, datastoreEnd, "did not ingest ledgers from core after backfill: "+ + fmt.Sprintf("expected last ledger > %d, got %d", datastoreEnd, maxSeq)) + // Verify they're contiguous + require.Equal(t, maxSeq-minSeq+1, count, + "gap detected: expected %d ledgers in [%d, %d], got %d", maxSeq-minSeq+1, minSeq, maxSeq, count) + + t.Logf("Verified ledgers %d-%d present in local DB", minSeq, maxSeq) +} + +func waitUntilLedgerIngested(t *testing.T, test *infrastructure.Test, rpcClient *client.Client, + cond func(l protocol.GetLatestLedgerResponse) bool, + timeout time.Duration, + cancelIngest bool, +) protocol.GetLatestLedgerResponse { + var last protocol.GetLatestLedgerResponse + require.Eventually(t, func() bool { + resp, err := rpcClient.GetLatestLedger(t.Context()) + require.NoError(t, err) + last = resp + if cancelIngest && cond(resp) { + // This prevents an unlikely race caused by further ingestion by core. Ask me how I know! + test.StopCore() + } + return cond(resp) + }, timeout, 100*time.Millisecond, "last ledger backfilled: %+v", last.Sequence) + return last +} + +func makeNewFakeGCSServer(t *testing.T, + datastoreStart, + datastoreEnd, + retentionWindow uint32, +) (*fakestorage.Server, func(*config.Config)) { + opts := fakestorage.Options{ + Scheme: "http", + PublicHost: "127.0.0.1", + } + gcsServer, err := fakestorage.NewServerWithOptions(opts) + require.NoError(t, err, "failed to start fake GCS server") + bucketName := "test-bucket" + t.Setenv("STORAGE_EMULATOR_HOST", gcsServer.URL()) + + gcsServer.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName}) + objPrefix := "v1/ledgers/testnet" + bucketPath := bucketName + "/" + objPrefix + // datastore config + schema := datastore.DataStoreSchema{ + FilesPerPartition: 64000, + LedgersPerFile: 1, + } + // Configure with backfill enabled and retention window of 128 ledgers + makeDatastoreConfig := func(cfg *config.Config) { + cfg.ServeLedgersFromDatastore = true + cfg.Backfill = true + cfg.BufferedStorageBackendConfig = ledgerbackend.BufferedStorageBackendConfig{ + BufferSize: 15, + NumWorkers: 2, + } + cfg.DataStoreConfig = datastore.DataStoreConfig{ + Type: "GCS", + Params: map[string]string{"destination_bucket_path": bucketPath}, + Schema: schema, + } + cfg.HistoryRetentionWindow = retentionWindow + cfg.ClassicFeeStatsLedgerRetentionWindow = retentionWindow + cfg.SorobanFeeStatsLedgerRetentionWindow = retentionWindow + } + // Add ledger files to datastore + for seq := datastoreStart; seq <= datastoreEnd; seq++ { + gcsServer.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: bucketName, + Name: objPrefix + "/" + schema.GetObjectKeyFromSequenceNumber(seq), + }, + Content: createLCMBatchBuffer(seq, xdr.TimePoint(time.Now().Unix())), + }) + } + + return gcsServer, makeDatastoreConfig +} + +func createDbWithLedgers(t *testing.T, start, end, retentionWindow uint32) string { + tmp := t.TempDir() + dbPath := path.Join(tmp, "test.sqlite") + testDB, err := db.OpenSQLiteDB(dbPath) + require.NoError(t, err) + defer func() { + require.NoError(t, testDB.Close()) // will be reopened in NewTest + }() + + testLogger := supportlog.New() + rw := db.NewReadWriter(testLogger, testDB, interfaces.MakeNoOpDeamon(), + int(retentionWindow), retentionWindow, network.TestNetworkPassphrase) + + // Insert dummy ledgers into the DB + writeTx, err := rw.NewTx(t.Context()) + require.NoError(t, err) + + var lastLedger xdr.LedgerCloseMeta + if end != 0 { + for seq := start; seq <= end; seq++ { + ledger := createLedger(seq) + require.NoError(t, writeTx.LedgerWriter().InsertLedger(ledger)) + lastLedger = ledger + } + require.NoError(t, writeTx.Commit(lastLedger, nil)) + } + if end != 0 { + t.Logf("Created local DB, seeded with ledgers %d-%d", start, end) + } else { + t.Logf("Created empty local DB") + } + return dbPath +} + +func createLedger(ledgerSequence uint32) xdr.LedgerCloseMeta { + now := time.Now().Unix() + return xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Hash: xdr.Hash{}, + Header: makeLedgerHeader(ledgerSequence, 25, xdr.TimePoint(now)), + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{}, + }, + }, + } +} + +func makeLedgerHeader(ledgerSequence, protocolVersion uint32, closeTime xdr.TimePoint) xdr.LedgerHeader { + return xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(ledgerSequence), + LedgerVersion: xdr.Uint32(protocolVersion), + ScpValue: xdr.StellarValue{ + CloseTime: closeTime, + TxSetHash: xdr.Hash{}, + Upgrades: nil, + }, + } +} diff --git a/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go b/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go index 9258fab2..decb5eda 100644 --- a/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go +++ b/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go @@ -146,7 +146,7 @@ func TestGetLedgersFromDatastore(t *testing.T) { BucketName: bucketName, Name: schema.GetObjectKeyFromSequenceNumber(seq), }, - Content: createLCMBatchBuffer(seq), + Content: createLCMBatchBuffer(seq, xdr.TimePoint(0)), }) } @@ -250,7 +250,7 @@ func TestGetLedgersFromDatastore(t *testing.T) { }) } -func createLCMBatchBuffer(seq uint32) []byte { +func createLCMBatchBuffer(seq uint32, closeTime xdr.TimePoint) []byte { lcm := xdr.LedgerCloseMetaBatch{ StartSequence: xdr.Uint32(seq), EndSequence: xdr.Uint32(seq), @@ -259,9 +259,7 @@ func createLCMBatchBuffer(seq uint32) []byte { V: int32(0), V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(seq), - }, + Header: makeLedgerHeader(seq, 25, closeTime), }, }, }, diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml b/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml index 2c6dd5b1..80221bfc 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/docker/docker-compose.yml @@ -13,7 +13,7 @@ services: # Note: Please keep the image pinned to an immutable tag matching the Captive Core version. # This avoids implicit updates which break compatibility between # the Core container and captive core. - image: ${CORE_IMAGE:-stellar/stellar-core:23.0.1-2670.050eacf11.focal} + image: ${CORE_IMAGE:-stellar/stellar-core:25.0.0-2911.e9748b05a.jammy} depends_on: - core-postgres environment: diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go index 90f0c64d..3a08998c 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go @@ -1,3 +1,4 @@ +//nolint:funcorder // exported and unexported methods interleaved for readability package infrastructure import ( @@ -44,6 +45,7 @@ const ( FriendbotURL = "http://localhost:8000/friendbot" // Needed when Core is run with ARTIFICIALLY_ACCELERATE_TIME_FOR_TESTING=true checkpointFrequency = 8 + ledgerCloseTime = time.Second // seconds per ledger with accelerated time captiveCoreConfigFilename = "captive-core-integration-tests.cfg" captiveCoreConfigTemplateFilename = captiveCoreConfigFilename + ".tmpl" @@ -87,6 +89,9 @@ type TestConfig struct { // empty string to skip upgrading altogether. ApplyLimits *string + IgnoreLedgerCloseTimes bool // disregard close times when ingesting ledgers + DelayDaemonForLedgerN int // don't start daemon until ledger N reached by core + DatastoreConfigFunc func(*config.Config) } @@ -129,10 +134,11 @@ type Test struct { daemon *daemon.Daemon - masterAccount txnbuild.Account - shutdownOnce sync.Once - shutdown func() - onlyRPC bool + masterAccount txnbuild.Account + shutdownOnce sync.Once + shutdown func() + onlyRPC bool + ignoreLedgerCloseTimes bool datastoreConfigFunc func(*config.Config) } @@ -157,6 +163,7 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { i.captiveCoreStoragePath = cfg.CaptiveCoreStoragePath parallel = !cfg.NoParallel i.datastoreConfigFunc = cfg.DatastoreConfigFunc + i.ignoreLedgerCloseTimes = cfg.IgnoreLedgerCloseTimes if cfg.OnlyRPC != nil { i.onlyRPC = true @@ -209,6 +216,10 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { i.waitForCheckpoint() } if !i.runRPCInContainer() { + if cfg != nil && cfg.DelayDaemonForLedgerN != 0 { + i.t.Logf("Delaying daemon start until core reaches ledger %d", cfg.DelayDaemonForLedgerN) + i.waitForCoreAtLedger(cfg.DelayDaemonForLedgerN) + } i.spawnRPCDaemon() } @@ -252,6 +263,13 @@ func (i *Test) spawnContainers() { i.fillContainerPorts() } +func (i *Test) StopCore() { + if !i.onlyRPC && i.areThereContainers() { + i.runSuccessfulComposeCommand("stop", "core") + i.t.Log("Stopped Core container") + } +} + func (i *Test) stopContainers() { // There were containerized workloads we should bring down downCmd := []string{"down"} @@ -308,6 +326,18 @@ func (i *Test) waitForCheckpoint() { ) } +func (i *Test) waitForCoreAtLedger(ledger int) { + i.t.Logf("Waiting for ledger %d...", ledger) + require.Eventually(i.t, + func() bool { + info, err := i.getCoreInfo() + return err == nil && info.Info.Ledger.Num >= ledger + }, + time.Duration(ledger+5)*ledgerCloseTime, + time.Second, + ) +} + func (i *Test) getRPConfigForContainer() rpcConfig { return rpcConfig{ // The container needs to listen on all interfaces, not just localhost @@ -343,6 +373,7 @@ func (i *Test) getRPConfigForDaemon() rpcConfig { archiveURL: "http://" + i.testPorts.CoreArchiveHostPort, sqlitePath: i.sqlitePath, captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, + ignoreLedgerCloseTimes: i.ignoreLedgerCloseTimes, } } @@ -357,9 +388,15 @@ type rpcConfig struct { captiveCoreHTTPPort uint16 archiveURL string sqlitePath string + ignoreLedgerCloseTimes bool } func (vars rpcConfig) toMap() map[string]string { + maxHealthyLedgerLatency := "10s" + if vars.ignoreLedgerCloseTimes { + // If we're ignoring close times, permit absurdly high latencies + maxHealthyLedgerLatency = time.Duration(1<<63 - 1).String() + } return map[string]string{ "ENDPOINT": vars.endPoint, "ADMIN_ENDPOINT": vars.adminEndpoint, @@ -381,23 +418,24 @@ func (vars rpcConfig) toMap() map[string]string { "INGESTION_TIMEOUT": "10m", "HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers), "CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency), - "MAX_HEALTHY_LEDGER_LATENCY": "10s", + "MAX_HEALTHY_LEDGER_LATENCY": maxHealthyLedgerLatency, "PREFLIGHT_ENABLE_DEBUG": "true", } } func (i *Test) waitForRPC() { i.t.Log("Waiting for RPC to be healthy...") - + var err error require.Eventually(i.t, func() bool { - result, err := i.GetRPCLient().GetHealth(context.Background()) - i.t.Logf("getHealth: %+v", result) + var result protocol.GetHealthResponse + result, err = i.GetRPCLient().GetHealth(context.Background()) + i.t.Logf("getHealth: %+v; err: %v", result, err) return err == nil && result.Status == "healthy" }, 30*time.Second, time.Second, - "RPC never got healthy", + fmt.Sprintf("RPC never got healthy: %+v", err), ) } diff --git a/cmd/stellar-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go b/cmd/stellar-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go index baf88014..4e0ca509 100644 --- a/cmd/stellar-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go +++ b/cmd/stellar-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go @@ -110,3 +110,9 @@ func (w *LedgerBucketWindow[T]) Get(i uint32) *LedgerBucket[T] { index := (w.start + i) % length return &w.buckets[index] } + +// Reset clears all buckets from the window +func (w *LedgerBucketWindow[T]) Reset() { + w.buckets = w.buckets[:0] + w.start = 0 +} diff --git a/cmd/stellar-rpc/internal/methods/get_latest_ledger_test.go b/cmd/stellar-rpc/internal/methods/get_latest_ledger_test.go index 369c17c8..0e89dca6 100644 --- a/cmd/stellar-rpc/internal/methods/get_latest_ledger_test.go +++ b/cmd/stellar-rpc/internal/methods/get_latest_ledger_test.go @@ -33,6 +33,13 @@ func (ledgerReader *ConstantLedgerReader) GetLedgerRange(_ context.Context) (led return ledgerbucketwindow.LedgerRange{}, nil } +func (ledgerReader *ConstantLedgerReader) GetLedgerCountInRange( + _ context.Context, + _, _ uint32, +) (uint32, uint32, uint32, error) { + return 0, 0, 0, nil +} + func (ledgerReader *ConstantLedgerReader) NewTx(_ context.Context) (db.LedgerReaderTx, error) { return nil, errors.New("mock NewTx error") } diff --git a/cmd/stellar-rpc/internal/methods/mocks.go b/cmd/stellar-rpc/internal/methods/mocks.go index 39dd0e1d..fc1b4fdf 100644 --- a/cmd/stellar-rpc/internal/methods/mocks.go +++ b/cmd/stellar-rpc/internal/methods/mocks.go @@ -38,6 +38,15 @@ func (m *MockLedgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwind return args.Get(0).(ledgerbucketwindow.LedgerRange), args.Error(1) //nolint:forcetypeassert } +func (m *MockLedgerReader) GetLedgerCountInRange( + ctx context.Context, + start uint32, + end uint32, +) (uint32, uint32, uint32, error) { + args := m.Called(ctx, start, end) + return args.Get(0).(uint32), args.Get(1).(uint32), args.Get(2).(uint32), args.Error(3) //nolint:forcetypeassert +} + func (m *MockLedgerReader) StreamLedgerRange(ctx context.Context, startLedger, endLedger uint32, f db.StreamLedgerFn, ) error {