Skip to content

Implement RocksDB-backed LedgerStore "Reader" for Full History Ledger Retrieval #584

@karthikiyer56

Description

@karthikiyer56

This ticket covers the implementation of a RocksDB-backed ledger store reader that enables getLedgerBySequence functionality for full history access in the Stellar RPC service.

This is the read path only - a separate ticket will cover the generation/ingestion of RocksDB stores.

Background

Currently, the RPC service has limited historical ledger access (only 2 weeks).
By leveraging pre-built RocksDB stores (one per year, containing ledgerSeq → compressed LedgerCloseMeta mappings), we can provide efficient full history ledger retrieval spanning 10+ years of Stellar data.


Data Store Characteristics

  • Storage Format: RocksDB key-value stores
  • Key: 4-byte big-endian encoded ledgerSeq (uint32)
  • Value: zstd-compressed XDR-encoded LedgerCloseMeta
  • Organization: One RocksDB store per year (e.g., year-2020, year-2021, etc.)
  • Ledger Ranges: Sequential, non-overlapping across stores
  • Typical Size: ~5-6 million ledgers per store, varying in size (earlier years smaller, recent years larger)

Configuration Design

TOML Configuration Schema

# Top-level RocksDB configuration
[rocksdb]
# Master switch - if false, all RocksDB functionality is disabled
enabled = false

# Full history ledger retrieval configuration
[rocksdb.fullHistory]
# Enable getLedgerBySequence via RocksDB stores
getLedgers = false

# Bloom filter bits per key (set to 0 to disable)
# Default matches how stores were built
bloomFilterBits = 10


# Block cache size in MB shared across all RocksDB stores
# Recommended: 8192-16384 MB depending on available RAM
blockCacheSizeMB = 8192


# Option 1: Top-level directory paths (auto-discovers year-* subdirectories)
# Cannot be used together with 'stores'
# This requires that there be  actual rocksdb stores/directories nested under the storePaths like so - 
#      '/mnt/nvme1/rocksdb-stores/year-2020'
#      '/mnt/nvme2/rocksdb-stores/year-2022'
# The main reason for this is if the rockdb stores are spread across multiple disks/paths on the filesystem
storePaths = [
  "/mnt/nvme1/rocksdb-stores",
  "/mnt/nvme2/rocksdb-stores"
]

# Option 2: Explicitly list individual store paths
# Cannot be used together with 'storePaths'
# stores = [
#   "/mnt/nvme1/rocksdb-stores/year-2020",
#   "/mnt/nvme1/rocksdb-stores/year-2021",
#   "/mnt/nvme2/rocksdb-stores/year-2022",
#   "/mnt/nvme2/rocksdb-stores/year-2023"
# ]

Configuration Validation Rules

  1. If rocksdb.enabled = false, skip all RocksDB initialization
  2. If rocksdb.enabled = true but rocksdb.fullHistory.getLedgers = false, skip ledger store initialization
  3. If rocksdb.fullHistory.getLedgers = true:
    • Exactly one of storePaths or stores must be specified (not both, not neither)
    • All specified paths must exist and be accessible
    • For storePaths: auto-discover subdirectories matching pattern year-*
    • For stores: each path must be a valid RocksDB database
  4. blockCacheSizeMB must be > 0
  5. bloomFilterBits must be >= 0 (0 disables bloom filters)

Directory Structure Examples

Option 1: Top-level paths with auto-discovery

/mnt/nvme1/rocksdb-stores/
├── year-2015/
│   ├── CURRENT
│   ├── MANIFEST-*
│   ├── OPTIONS-*
│   └── *.sst
├── year-2016/
├── year-2017/
└── ...

/mnt/nvme2/rocksdb-stores/
├── year-2022/
├── year-2023/
└── year-2024/

Option 2: Explicit store listing

stores = [
  "/mnt/nvme1/rocksdb-stores/year-2015",
  "/mnt/nvme1/rocksdb-stores/year-2016",
  "/mnt/nvme2/rocksdb-stores/year-2022",
  "/custom/path/stellar-2023-ledgers"  # Custom naming supported
]

Interface Design

RocksDBLedgerStoreReader Interface

package ledgerstore

import (
    "context"
    "io"
    
    "github.com/stellar/go/xdr"
)

// LedgerRange represents the range of ledgers in a store
type LedgerRange struct {
    MinLedger uint32
    MaxLedger uint32
}

// StoreInfo contains metadata about a single RocksDB store
type StoreInfo struct {
    Path        string
    Name        string      // Derived from directory name (e.g., "year-2024")
    LedgerRange LedgerRange
    SizeBytes   int64       // Total size of SST files
}

// RocksDBLedgerStoreReader defines the interface for reading ledgers from RocksDB stores
type RocksDBLedgerStoreReader interface {
    // Initialize opens all configured RocksDB stores and builds the ledger range index
    // Must be called before any other methods
    // Returns error if any store fails to open or has invalid data
    Initialize(ctx context.Context) error

    // Close cleanly shuts down all RocksDB stores and releases resources
    // Implements io.Closer
    Close() error

    // GetLedger retrieves a single ledger by sequence number
    // Returns the decompressed, unmarshaled LedgerCloseMeta
    // Returns ErrLedgerNotFound if ledger doesn't exist in any store
    GetLedger(ctx context.Context, ledgerSeq uint32) (xdr.LedgerCloseMeta, error)

    // GetLedgerRange retrieves multiple consecutive ledgers efficiently
    // Returns ledgers in ascending order by sequence
    // Stops at first missing ledger or endSeq (inclusive)
    GetLedgerRange(ctx context.Context, startSeq, endSeq uint32) ([]xdr.LedgerCloseMeta, error)

    // GetRawLedger retrieves compressed ledger data without decompression
    // Useful for proxying/streaming without processing overhead
    GetRawLedger(ctx context.Context, ledgerSeq uint32) ([]byte, error)

    // HasLedger checks if a ledger exists without retrieving it
    // Uses the pre-built range index for O(1) lookup
    HasLedger(ledgerSeq uint32) bool

    // GetStoreInfo returns metadata about all loaded stores
    GetStoreInfo() []StoreInfo

    // GetTotalLedgerRange returns the min and max ledger across all stores
    GetTotalLedgerRange() LedgerRange

    // AddStore dynamically adds a new RocksDB store at runtime
    // Useful for hot-loading new stores without restart
    // Returns error if store overlaps with existing ranges
    AddStore(ctx context.Context, path string) error

    // RemoveStore dynamically removes a store at runtime
    // Identified by path or name
    // Returns error if store is not found
    RemoveStore(path string) error

    // Health returns the health status of all stores
    // Can be used for health check endpoints
    Health(ctx context.Context) HealthStatus
}

// HealthStatus represents the health of the ledger store system
type HealthStatus struct {
    Healthy     bool
    TotalStores int
    StoreHealth []StoreHealthInfo
}

type StoreHealthInfo struct {
    Name    string
    Path    string
    Healthy bool
    Error   string // Empty if healthy
}

// Errors
var (
    ErrLedgerNotFound       = errors.New("ledger not found in any store")
    ErrStoreNotFound        = errors.New("store not found")
    ErrOverlappingRange     = errors.New("store ledger range overlaps with existing store")
    ErrNotInitialized       = errors.New("ledger store reader not initialized")
    ErrAlreadyInitialized   = errors.New("ledger store reader already initialized")
    ErrInvalidConfiguration = errors.New("invalid configuration")
)

Configuration Struct

package ledgerstore

// RocksDBConfig holds the complete RocksDB configuration
type RocksDBConfig struct {
    Enabled     bool                    `toml:"enabled"`
    FullHistory RocksDBFullHistoryConfig `toml:"fullHistory"`
}

// RocksDBFullHistoryConfig holds configuration for full history ledger retrieval
type RocksDBFullHistoryConfig struct {
    GetLedgers       bool     `toml:"getLedgers"`
    BlockCacheSizeMB int      `toml:"blockCacheSizeMB"`
    BloomFilterBits  int      `toml:"bloomFilterBits"`
    StorePaths       []string `toml:"storePaths"`  // Top-level directories
    Stores           []string `toml:"stores"`      // Individual store paths
}

// Validate checks the configuration for errors
func (c *RocksDBConfig) Validate() error {
    if !c.Enabled {
        return nil // Nothing to validate if disabled
    }
    
    if !c.FullHistory.GetLedgers {
        return nil // Nothing to validate if getLedgers is disabled
    }
    
    // Validate mutual exclusivity
    hasStorePaths := len(c.FullHistory.StorePaths) > 0
    hasStores := len(c.FullHistory.Stores) > 0
    
    if hasStorePaths && hasStores {
        return fmt.Errorf("%w: cannot specify both 'storePaths' and 'stores'", ErrInvalidConfiguration)
    }
    
    if !hasStorePaths && !hasStores {
        return fmt.Errorf("%w: must specify either 'storePaths' or 'stores'", ErrInvalidConfiguration)
    }
    
    if c.FullHistory.BlockCacheSizeMB <= 0 {
        return fmt.Errorf("%w: blockCacheSizeMB must be > 0", ErrInvalidConfiguration)
    }
    
    if c.FullHistory.BloomFilterBits < 0 {
        return fmt.Errorf("%w: bloomFilterBits must be >= 0", ErrInvalidConfiguration)
    }
    
    return nil
}

Implementation Tasks

Task 1: Configuration Parsing and Validation

File: internal/config/rocksdb.go

  • Define RocksDBConfig and RocksDBFullHistoryConfig structs with TOML tags
  • Implement Validate() method with all validation rules
  • Implement DiscoverStores() method that:
    • For storePaths: scans each directory for year-* subdirectories
    • For stores: validates each path exists
    • Returns deduplicated list of store paths
  • Add RocksDBConfig to main configuration struct
  • Add configuration documentation to example TOML file

Acceptance Criteria:

  • Configuration loads correctly from TOML
  • Validation catches all error cases (both options specified, neither specified, invalid paths)
  • Auto-discovery finds all year-* directories
  • Clear error messages for configuration problems

Task 2: Core RocksDBLedgerStoreReader Implementation

Use this as reference for implementation

File: internal/ledgerstore/rocksdb_reader.go

  • Implement rocksDBLedgerStoreReader struct with:

    • Slice of *RocksDBStore (individual store wrappers)
    • Shared *grocksdb.Cache for block cache
    • Shared *grocksdb.ReadOptions
    • Shared *grocksdb.Options
    • sync.RWMutex for thread-safe store management
    • Pre-built ledger range index for O(1) store lookup
    • Initialization state flag
  • Implement RocksDBStore struct (per-store wrapper):

  type RocksDBStore struct {
      DB        *grocksdb.DB
      Path      string
      Name      string
      MinLedger uint32
      MaxLedger uint32
  }
  • Implement Initialize(ctx context.Context) error:

    • Create shared LRU block cache with configured size
    • Create shared BlockBasedTableOptions with bloom filter
    • Create shared Options with table options
    • Create shared ReadOptions
    • Open each store in read-only mode
    • For each store, call SeekToFirst() and SeekToLast() to determine ledger range
    • Validate no overlapping ranges
    • Sort stores by MinLedger for efficient lookup
    • Log initialization summary (store count, total ledger range, cache size)
  • Implement Close() error:

    • Close all RocksDB database handles
    • Destroy ReadOptions, Options, BlockBasedTableOptions
    • Release block cache
    • Log shutdown
  • Implement findStoreForLedger(ledgerSeq uint32) *RocksDBStore:

    • O(n) scan through sorted stores checking range
    • Could optimize to O(log n) with binary search if needed
    • Returns nil if not found
  • Implement helper decompressAndUnmarshal(data []byte) (xdr.LedgerCloseMeta, error):

    • Create zstd decoder (consider pooling for performance)
    • Decompress data
    • Unmarshal XDR to LedgerCloseMeta
    • Return result

Acceptance Criteria:

  • All stores open successfully with shared cache
  • Ledger ranges correctly determined at startup
  • Thread-safe initialization (only once)
  • Clean shutdown releases all resources

Task 3: Ledger Retrieval Methods

File: internal/ledgerstore/rocksdb_reader.go (continued)

  • Implement GetLedger(ctx context.Context, ledgerSeq uint32) (xdr.LedgerCloseMeta, error):

    • Check initialization state
    • Find store for ledger using range index
    • If no store found, return ErrLedgerNotFound
    • Encode ledgerSeq as 4-byte big-endian key
    • Query RocksDB
    • If key not found, return ErrLedgerNotFound
    • Decompress and unmarshal
    • Return LedgerCloseMeta
  • Implement GetLedgerRange(ctx context.Context, startSeq, endSeq uint32) ([]xdr.LedgerCloseMeta, error):

    • Validate startSeq <= endSeq
    • Pre-allocate result slice
    • Iterate from startSeq to endSeq
    • For each ledger, call GetLedger (or optimize with iterator if same store)
    • Stop on first error/missing ledger
    • Return collected ledgers
  • Implement GetRawLedger(ctx context.Context, ledgerSeq uint32) ([]byte, error):

    • Same as GetLedger but skip decompression/unmarshal
    • Return raw compressed bytes
    • Useful for streaming/proxying
  • Implement HasLedger(ledgerSeq uint32) bool:

    • Check if ledgerSeq falls within any store's range
    • O(1) using pre-built range index
    • Does NOT query RocksDB (range check only)

Acceptance Criteria:

  • GetLedger returns correct data for existing ledgers
  • GetLedger returns ErrLedgerNotFound for non-existent ledgers
  • GetLedgerRange handles cross-store ranges correctly
  • HasLedger is fast (no disk I/O)
  • Context cancellation is respected

Task 4: Store Management Methods

File: internal/ledgerstore/rocksdb_reader.go (continued)

  • Implement GetStoreInfo() []StoreInfo:

    • Return metadata for all loaded stores
    • Include path, name, ledger range, size
  • Implement GetTotalLedgerRange() LedgerRange:

    • Return min of all MinLedgers, max of all MaxLedgers
  • Implement AddStore(ctx context.Context, path string) error:

    • Acquire write lock
    • Open new store in read-only mode
    • Determine ledger range
    • Check for overlap with existing stores
    • If overlap, close new store and return error
    • Add to store list
    • Re-sort stores by MinLedger
    • Release lock
  • Implement RemoveStore(path string) error:

    • Acquire write lock
    • Find store by path or name
    • If not found, return error
    • Close the store
    • Remove from store list
    • Release lock
  • Implement Health(ctx context.Context) HealthStatus:

    • Check each store is accessible (simple key lookup)
    • Return aggregate health status

Acceptance Criteria:

  • AddStore correctly validates no overlap
  • RemoveStore correctly releases resources
  • Operations are thread-safe
  • Health check is fast and accurate

Task 5: Factory Function and Dependency Injection

File: internal/ledgerstore/factory.go

  • Implement factory function:
  func NewRocksDBLedgerStoreReader(config RocksDBFullHistoryConfig) (RocksDBLedgerStoreReader, error) {
      // Validate config
      // Discover stores from storePaths or use explicit stores
      // Create reader instance
      // Return (reader, nil) - caller must call Initialize()
  }
  • Implement conditional initialization in RPC service startup:
  func initializeLedgerStore(cfg *config.Config) (RocksDBLedgerStoreReader, error) {
      if !cfg.RocksDB.Enabled {
          return nil, nil // RocksDB disabled
      }
      
      if !cfg.RocksDB.FullHistory.GetLedgers {
          return nil, nil // getLedgers disabled
      }
      
      reader, err := NewRocksDBLedgerStoreReader(cfg.RocksDB.FullHistory)
      if err != nil {
          return nil, fmt.Errorf("failed to create ledger store reader: %w", err)
      }
      
      if err := reader.Initialize(context.Background()); err != nil {
          return nil, fmt.Errorf("failed to initialize ledger store reader: %w", err)
      }
      
      return reader, nil
  }

Acceptance Criteria:

  • Factory correctly handles all configuration scenarios
  • Disabled configurations return nil without error
  • Initialization errors are properly propagated
  • Clean integration with RPC service startup

Task 6: Integration with RPC Service

File: cmd/soroban-rpc/internal/methods/get_ledger.go (or similar)

  • Add RocksDBLedgerStoreReader to RPC handler dependencies

  • Modify getLedger handler to:

    • First check if ledger is in primary store (existing behavior)
    • If not found AND RocksDB reader is available, query RocksDB
    • Return appropriate response
  • Add graceful shutdown:

    • Call reader.Close() on service shutdown
  • Add health check endpoint integration:

    • Include RocksDB store health in /health endpoint

Acceptance Criteria:

  • getLedger seamlessly falls back to RocksDB for historical ledgers
  • No change in behavior when RocksDB is disabled
  • Health endpoint reports RocksDB status
  • Graceful shutdown closes all stores

Task 7: Logging and Metrics

File: Various

  • Add structured logging for:

    • Store initialization (path, ledger range, size)
    • Store addition/removal
    • Lookup failures (ledger not found)
    • Errors (decompression, unmarshal, RocksDB errors)
  • Add metrics for:

    • rocksdb_ledger_lookups_total (counter, labels: store, status)
    • rocksdb_ledger_lookup_duration_seconds (histogram, labels: store)
    • rocksdb_stores_total (gauge)
    • rocksdb_block_cache_hit_ratio (gauge, from RocksDB stats)
    • rocksdb_total_ledger_range (gauge, min and max labels)

Acceptance Criteria:

  • All significant operations are logged
  • Metrics are exposed in Prometheus format
  • Log levels are appropriate (info for startup, debug for lookups, error for failures)

Task 8: Testing

File: internal/ledgerstore/rocksdb_reader_test.go

  • Unit tests for configuration validation
  • Unit tests for store discovery (mock filesystem)
  • Integration tests with real RocksDB stores:
    • Initialize with single store
    • Initialize with multiple stores
    • GetLedger for existing ledger
    • GetLedger for non-existent ledger
    • GetLedgerRange within single store
    • GetLedgerRange spanning multiple stores
    • HasLedger for various cases
    • AddStore with valid store
    • AddStore with overlapping range (should fail)
    • RemoveStore
    • Concurrent access (multiple goroutines calling GetLedger)

Dependencies

  • github.com/linxGnu/grocksdb - RocksDB Go bindings

Performance Considerations

  1. Shared Block Cache: All stores share a single LRU cache to maximize hit rate
  2. Read-Only Mode: Stores opened in read-only mode (no write locks, better concurrency)
  3. Zstd Decoder Pooling: Consider pooling decoders to reduce allocation overhead
  4. Range Index: Pre-built at startup for O(1) store selection
  5. Bloom Filters: 10-bit bloom filters reduce disk reads for non-existent keys

Out of Scope

  • RocksDB store generation/ingestion (separate ticket)
  • Store compaction/maintenance
  • Replication/backup

Final Checklist

  • Task 1: Configuration Parsing and Validation
  • Task 2: Core RocksDBLedgerStoreReader Implementation
  • Task 3: Ledger Retrieval Methods
  • Task 4: Store Management Methods
  • Task 5: Factory Function and Dependency Injection
  • Task 6: Integration with RPC Service
  • Task 7: Logging and Metrics
  • Task 8: Testing
  • Documentation updated
  • Example configuration added
  • Code review completed

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    To Do

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions