godex implements a high-performance producer-consumer pipeline with concurrent fetching, ordered processing, and fault-tolerant storage designed for production blockchain indexing.
flowchart LR
subgraph SDKCore["SDK Core"]
subgraph Processor["Processor (per chain)"]
F["Fetchers\n- concurrent RPC\n- batch requests\n- rate limiting"]
A["Arbiter\n- ordered processing\n- reorg detection\n- LRU hash cache"]
DEC["Decoder\n- ABI-based\n- event transformation"]
RPC["RPC Client\n(HTTP + retry)"]
F -->|"GetLogs/GetBlocks"| RPC
end
SINK_IF["Sink interface\n(Store, Rollback, LoadCursor)"]
end
subgraph Adapters["Adapters / Implementations"]
PSINK["PostgresSink\n- atomic storage\n- tx rollback\n- cursor persistence"]
METRICS["Metrics (Prometheus)\n- counters/gauges/histograms"]
end
subgraph UserApp["User Application"]
APP["Application\n- configures chains\n- provides decoder\n- handles events"]
end
%% Data flow
APP -->|"NewProcessor(metrics, sink)"| Processor
APP -->|"AddChain(chain, opts, decoder)"| Processor
F -->|"raw logs + timestamps"| A
A -->|"decoded events"| DEC
DEC -->|"structured events"| SINK_IF
SINK_IF --> PSINK
%% Metrics flow
F -->|"ObservedBlockFetchDuration"| METRICS
A -->|"IncReorgs"| METRICS
PSINK -->|"IncSinkWrites/Errors\nObservedSinkWriteDuration\nSetIndexedHeight"| METRICS
%% External
RPC ---|"JSON-RPC calls"| CHAIN["Blockchain node(s)"]
| Component | Responsibility | Implementation |
|---|---|---|
| Processor | Orchestrates multi-chain indexing lifecycle with error isolation | Concurrent per-chain processing with shared resources |
| Fetchers | Concurrent RPC workers fetching logs and timestamps | Rate-limited batch requests with individual timeouts and retry logic |
| Arbiter | Maintains block order and coordinates processing pipeline | LRU cache for reorg detection, bounded buffering with context cancellation |
| Decoder Router | Intelligent event routing to appropriate decoders | Match-based routing with support for multiple contract types |
| Sink | Persistent storage with atomic rollback support | Transactional writes with reorg recovery and cursor management |
- Initialization: Load cursor from sink or use
StartBlockconfiguration - Head Determination: Fetch latest block height via
RPC.Head()with retry logic - Range Planning: Divide work into windows of
RangeSizeblocks - Concurrent Fetching:
FetcherConcurrencyworkers process ranges in parallel - Ordered Processing: Arbiter processes fetch results sequentially for reorg safety
- Event Decoding: Router routes logs to appropriate decoders
- Storage: Decoded events stored atomically via sink with transaction rollback support
Persistent storage abstraction with transactional rollback support.
type Sink interface {
Store(ctx context.Context, events []types.Event) error
Rollback(ctx context.Context, chainId string, toBlock uint64, blockHash string) error
LoadCursor(ctx context.Context, chainId string) (blockNum uint64, blockHash string, err error)
UpdateCursor(ctx context.Context, chainId string, newBlock uint64, blockHash string) error
}Event transformation with pluggable ABI support.
type Decoder interface {
Decode(name string, chainId string, log types.Log) (*types.Event, error)
DecodeBatch(logs []types.Log) (*[]types.Event, error)
}Intelligent routing of logs to appropriate decoders based on configurable conditions.
type DecoderRouter struct {
routes []DecoderRoute
}
func NewDecoderRouter() *DecoderRouter
func (r *DecoderRouter) Register(match MatchFunc, abiName string, dec Decoder) *DecoderRouter
func (r *DecoderRouter) Decode(chainId string, log types.Log) (*types.Event, error)Blockchain node communication with batching and rate limiting.
type RPC interface {
Head(ctx context.Context) (string, error)
GetBlock(ctx context.Context, blockNumber string) (types.Block, error)
GetBlocks(ctx context.Context, blockNumbers []string) (map[string]types.Block, error)
GetLogs(ctx context.Context, filter types.Filter) ([]types.Log, error)
GetBlockReceipts(ctx context.Context, blockNumber string) ([]types.Receipt, error)
}Observability and monitoring interface.
type Metrics interface {
IncBlocksProcessed(chainId string, n uint64)
ObservedBlockLag(chainId string, lag uint64)
ObservedBlockFetchDuration(chainId string, d time.Duration, success bool)
SetIndexedHeight(chainId string, height uint64)
IncSinkWrites(chainId string, n uint64)
IncSinkErrors(chainId string)
ObservedSinkWriteDuration(chainId string, d time.Duration, success bool)
SetProcessorConcurrency(chainId string, n uint64)
IncReorgs(chainId string)
}Producer Layer (Fetchers):
- Concurrent workers fetch logs and timestamps using batch RPC requests
- Rate-limited and retry-enabled communication with blockchain nodes
- Individual context timeouts prevent indefinite blocking
- Bounded buffering provides natural backpressure
Consumer Layer (Arbiter):
- Single-threaded coordinator ensures in-order processing for reorg safety
- LRU cache maintains block hash history for efficient reorg detection
- Direct integration with decoder router and sink for atomic event processing
- Context-aware processing with graceful cancellation support
Chain Isolation:
- Each chain maintains independent state and cursor tracking
- Individual chain failures do not affect other chains
- Shared resources (decoder, sink) accessed safely via appropriate synchronization
Configuration Flexibility:
- Different options per chain (range size, concurrency, confirmation depth)
- Chain-specific retry configurations and rate limits
- Independent progress tracking and metrics collection
Blockchain reorganizations are automatically detected and resolved with minimal data loss and efficient recovery.
Detection Mechanism:
- Maintains LRU cache of processed block hashes (
BlockHashCache) - Verifies parent hash continuity during sequential window processing
- Detects divergence when
block.ParentHash != cachedHash[block.Number-1]
Recovery Process:
- Ancestor Search: Binary search backward through cached hashes to find common ancestor
- Sink Rollback: Call
sink.Rollback(chainId, ancestor, blockHash)to remove orphaned events atomically - State Reset: Update cursor to ancestor block and clear future hash cache entries
- Resume Processing: Restart from ancestor + 1 with fresh batch processing
Performance Characteristics:
- O(1) hash lookups via LRU cache
- Bounded memory usage with configurable cache size
- Minimal RPC overhead (only fetches headers during reorg detection)
- Transient Errors: Network timeouts, RPC rate limits - automatic retry with exponential backoff
- Permanent Errors: Invalid configuration, authentication failures - immediate chain termination
- Reorg Errors: Blockchain reorganizations - graceful rollback and recovery from ancestor
- Non-Recoverable Errors: Corrupted data, schema mismatches - chain isolation and logging
- Chain Independence: Individual chain failures do not affect concurrent chain processing
- Resource Cleanup: Context cancellation ensures prompt cleanup of goroutines and connections
- Graceful Degradation: Failed chains log errors while others continue processing
- Atomic Operations: Sink operations use transactions with automatic rollback on failures
Events are processed in batches rather than individually:
- Reduces transaction overhead
- Enables bulk operations (COPY mode in PostgreSQL)
- Better throughput
The PostgreSQL adapter uses pgxpool for connection management:
- Connection reuse reduces overhead
- Automatic connection lifecycle management
- Configurable pool size based on workload
- Bounded Caching: LRU block hash cache prevents unbounded memory growth
- Window Processing: Natural backpressure limits concurrent memory usage
- Channel Buffering: Sized channels provide backpressure without excessive queuing
- Resource Cleanup: Context cancellation ensures prompt resource release
- Atomicity: All operations are transactional
- Pluggability: Interface-based design allows custom implementations
- Performance: Adaptive strategies optimize for different workloads
- Consistency: Cursors and events always consistent
- Extensibility: Handler pattern and migration system enable customization
For detailed documentation on specific components, see: