Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/block-ingestor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export default {

try {
const blocks = PutBlocksReq.decode(await request.arrayBuffer());
await handleIngestBlocks(blocks, env.BlockStore, env.BlockQueue);
await handleIngestBlocks(blocks, env.BtcBlocks, env.BlockQueue);
return new Response("Blocks ingested successfully", { status: 200 });
} catch (e) {
console.error("Failed to ingest blocks", e);
Expand Down
44 changes: 20 additions & 24 deletions packages/block-ingestor/src/ingest.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,34 @@
import { type PutBlock } from "./api/put-blocks";
import { type BlockQueueRecord } from "@gonative-cc/lib/nbtc";
import { type BlockQueueRecord, kvBlocksKey } from "@gonative-cc/lib/nbtc";

/// Enequeue new blocks to the indexer processing queue.
export async function handleIngestBlocks(
blocks: PutBlock[],
blockStore: KVNamespace,
btcBlocksStore: KVNamespace,
blockQueue: Queue,
): Promise<void> {
if (blocks.length === 0) {
throw new Error("Empty block batch");
}

const blockMetas = blocks.map((block) => {
const blockHash = block.block.getId();
const kvKey = `b:${block.network}:${blockHash}`;
return { block, blockHash, kvKey };
});

// Batch KV puts
const timestamp_ms = Date.now();
const batch: MessageSendRequest<BlockQueueRecord>[] = [];
await Promise.all(
blockMetas.map((meta) => blockStore.put(meta.kvKey, meta.block.block.toBuffer())),
blocks.map((b) => {
const hash = b.block.getId();
batch.push({
body: {
hash,
timestamp_ms,
height: b.height,
network: b.network,
},
});
const kvKey = kvBlocksKey(b.network, b.block.getId());
return btcBlocksStore.put(kvKey, b.block.toBuffer());
}),
);

const messages: BlockQueueRecord[] = [];
for (const meta of blockMetas) {
messages.push({
hash: meta.blockHash,
height: meta.block.height,
network: meta.block.network,
kv_key: meta.kvKey,
});
}

// Enqueue parsing requests
if (messages.length > 0) {
await blockQueue.sendBatch(messages.map((body: BlockQueueRecord) => ({ body })));
if (batch.length > 0) {
await blockQueue.sendBatch(batch);
}
}
Loading
Loading