Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/block-ingestor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default {

try {
const blocks = PutBlocksReq.decode(await request.arrayBuffer());
await handleIngestBlocks(blocks, env.BlockStore, env.BlockQueue);
await handleIngestBlocks(blocks, env.BtcBlocks, env.BlockQueue);
return new Response("Blocks ingested successfully", { status: 200 });
} catch (e) {
console.error("Failed to ingest blocks", e);
Expand Down
44 changes: 20 additions & 24 deletions packages/block-ingestor/src/ingest.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,34 @@
import { type PutBlock } from "./api/put-blocks";
import { type BlockQueueRecord } from "@gonative-cc/lib/nbtc";
import { type BlockQueueRecord, kvBlocksKey } from "@gonative-cc/lib/nbtc";

/// Enequeue new blocks to the indexer processing queue.
export async function handleIngestBlocks(
blocks: PutBlock[],
blockStore: KVNamespace,
btcBlocksStore: KVNamespace,
blockQueue: Queue,
): Promise<void> {
if (blocks.length === 0) {
throw new Error("Empty block batch");
}

const blockMetas = blocks.map((block) => {
const blockHash = block.block.getId();
const kvKey = `b:${block.network}:${blockHash}`;
return { block, blockHash, kvKey };
});

// Batch KV puts
const timestamp_ms = Date.now();
const batch: MessageSendRequest<BlockQueueRecord>[] = [];
await Promise.all(
blockMetas.map((meta) => blockStore.put(meta.kvKey, meta.block.block.toBuffer())),
blocks.map((b) => {
const hash = b.block.getId();
batch.push({
body: {
hash,
timestamp_ms,
height: b.height,
network: b.network,
},
});
const kvKey = kvBlocksKey(b.network, b.block.getId());
return btcBlocksStore.put(kvKey, b.block.toBuffer());
}),
);

const messages: BlockQueueRecord[] = [];
for (const meta of blockMetas) {
messages.push({
hash: meta.blockHash,
height: meta.block.height,
network: meta.block.network,
kv_key: meta.kvKey,
});
}

// Enqueue parsing requests
if (messages.length > 0) {
await blockQueue.sendBatch(messages.map((body: BlockQueueRecord) => ({ body })));
if (batch.length > 0) {
await blockQueue.sendBatch(batch);
}
}
Loading