Kafka consumer for Bitquery Solana Kafka Stream with reorg detection and rollback. Uses block hash as identity so chain reversals are handled correctly.
| File | Role |
|---|---|
consumer.py |
Kafka consumer, message parsing, and orchestration. Owns in-memory chain state (_chain, _tip_hash) and calls buffer + computations. |
buffer.py |
Reorg buffer: collect blocks, sort by slot, yield batches. ReorgBuffer.add() and ReorgBuffer.flush(). |
computations.py |
Chain/reorg logic: hash_bytes, is_reorg, find_fork_point, get_chain_length, get_orphaned_hashes, apply_block_to_chain, rollback_orphaned. |
config.py |
Credentials (e.g. solana_username, solana_password). |
-
Buffer (
buffer.py): Single purpose — hold the resulting tree at the head (~30 slots) so we can determine which branch is longer. Messages are appended viaReorgBuffer.add(block_hash, parent_hash, slot, tx_block). When the buffer reachesREORG_BUFFER_SIZE, a batch is returned sorted by slot and the buffer is cleared. On shutdown,flush()returns any remaining blocks. Parent hash is always set (no special handling for missing). -
Process batch (
consumer.py): For each batch (fromadd()orflush()), the consumer runs reorg logic in slot order viaapply_block_to_chainfromcomputations.py, then logs rollbacks and incrementsprocessed_count.
Blocks are reordered by slot before chain/reorg updates. A single consumer (NUM_CONSUMERS = 1) is used so one process sees one sequence; multiple consumers would see interleaved messages and can trigger false reorgs.
On Solana, the same slot can be produced by different blocks (forks). Block identity is the block hash. Confirmed blocks can be reverted; the only stable identifier is Hash, never Slot. The consumer tracks a chain keyed by hash and detects reorgs by comparing parent hashes to the current tip.
chain:dict[bytes, BlockInfo]— map from block hash →BlockInfo(slot, parent_hash, depth).tip_hash: hash of the current chain tip (head), orNonebefore the first block.
Each block is stored once by its Header.Hash. BlockInfo holds slot, parent_hash, and depth (chain length from start). Depth is used to compare branch lengths on a fork.
Reorg happens only when the incoming branch length is greater than the current head length. Depth is tracked per block: first block has depth 1, then depth(block) = 1 + depth(parent). On a fork we compare depth(tip) vs depth(incoming_block); we only roll back and switch tip when depth(incoming) > depth(tip). If the incoming branch is not longer, we still add the block to the chain but keep the current tip.
A reorg is when the next block’s parent is not our current tip:
is_reorg(new_parent_hash, tip_hash) → (tip_hash is not None) and (new_parent_hash != tip_hash)
- First block (
tip_hash is None): not a reorg; we just extend. - Next block’s parent equals our tip: normal extend; not a reorg.
- Next block’s parent ≠ our tip: we’re on a different fork → reorg.
Comparing only slots would miss these cases.
genesis
│
└──► [A] slot 100
│
├──► [B] slot 101 ← canonical tip (depth 2)
│ ✕ orphaned when D arrives
│
└──► [C] slot 102 (depth 2, fork — no reorg yet, equal depth)
│
└──► [D] slot 103 ← new tip after reorg (depth 3 > 2)
A reorg only fires when the incoming fork branch becomes strictly longer than the current canonical tip. A competing block at equal depth is stored but does not trigger a rollback.
New block arrives
│
▼
tip_hash == None?
├── yes ──► add as first block, done
└── no
│
▼
parent_hash == tip_hash? (is_reorg check)
├── yes (no fork) ──► extend canonical chain, done
└── no (fork detected!)
│
▼
find_fork_point()
├── walk local chain backwards → build visited set
│ tip → parent → parent → ...
└── walk fork chain backwards → find first hash in visited
incoming_parent → its parent → ...
│
▼
common ancestor found?
├── no ──► add block silently, done (gap/unknown parent)
└── yes
│
▼
incoming branch length > current head length?
├── no ──► keep current tip, store fork block, done
└── yes
│
▼
🔁 REORG
get_orphaned_hashes() → walk local chain: tip → fork point
pop orphaned blocks from chain
add new block as tip
return (new_tip, orphaned_list)
-
Detect fork Incoming block’s
parent_hash≠ ourtip_hash→ we have a fork. -
Find fork point (common ancestor) Two-pass walk: first build a visited set by walking the local chain backwards from tip, then walk backwards from
incoming_parent_hashuntil a hash in the visited set is found. Returns that common ancestor hash, orNoneif no ancestor is found (gap/unknown parent), in which case the block is added silently. -
Compare branch lengths, then maybe roll back Current head length =
depth(tip). Incoming branch length =1 + depth(parent). Only if incoming length > current head length do we orphan and roll back (remove blocks from tip down to fork point), add the new block, and set it as tip. Otherwise we add the block to the chain but keep the current tip. In this repo,rollback_orphaned()only logs the orphaned hashes; in production you would delete DB rows by those hashes.
-
apply_block_to_chain(block_hash, parent_hash, slot, chain, tip_hash)(incomputations.py)
Mutateschainin place and returns(new_tip_hash, orphaned_or_None).- If no tip yet → add block (depth=1), return new tip, no reorg.
- If
parent_hash == tip→ extend chain (depth = 1 + parent depth), return new tip, no reorg. - If fork → find fork point; compare current head length and incoming branch length; only if incoming length > current head length → roll back orphaned blocks, add new block as tip, return new tip and orphaned list; else add block to chain but keep current tip, return that tip and
None.
-
rollback_orphaned(orphaned_hashes)(incomputations.py)
Called whenapply_block_to_chainreturns a non-empty orphaned list. Here it only logs; you can replace or extend it to perform DB deletes by hash.
WALK_BACK(start_hash, chain):
h := start_hash
LOOP:
use h (e.g. add to visited set, or append to orphaned list)
parent := chain[h].parent_hash
IF parent is empty OR parent not in chain:
STOP
h := parent
Start from a hash (tip), follow parent_hash until parent is empty or missing. Uses linear walk.
-
Install dependencies
pip install -r requirements.txt
-
Provide credentials via a local
configmodule (e.g.config.pyinprotobuf/withsolana_usernameandsolana_password). -
Run the consumer:
python consumer.py
Schema: ParsedIdlBlockMessage.
Python pb2 package: bitquery-pb2-kafka-package.