Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/btcindexer/db/migrations/0001_initial_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ CREATE TABLE IF NOT EXISTS indexer_state (
FOREIGN KEY (setup_id) REFERENCES setups(id)
) STRICT;

CREATE TABLE IF NOT EXISTS indexer_ika_state(
sui_network TEXT NOT NULL, -- for simplicity, we can reuse the presigns between the setups in the same network
coordinator_pkg_id TEXT NOT NULL PRIMARY KEY,
ika_cursor TEXT NOT NULL, -- last processed cursor state
updated_at INTEGER -- epoch time in ms
) STRICT;

CREATE TABLE IF NOT EXISTS presign_objects (
presign_id TEXT NOT NULL PRIMARY KEY,
sui_network TEXT NOT NULL, -- for simplicity, we can reuse the presigns between the setups in the same network
Expand Down
6 changes: 3 additions & 3 deletions packages/sui-indexer/src/graphql-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface EventsBatch {

export interface EventFetcher {
fetchEvents: (
packages: { id: string; cursor: string | null }[],
packages: { id: string; cursor: string | null; module?: string }[],
) => Promise<Record<string, EventsBatch>>;
}

Expand Down Expand Up @@ -77,15 +77,15 @@ export class SuiGraphQLClient implements EventFetcher {
}

async fetchEvents(
packages: { id: string; cursor: string | null }[],
packages: { id: string; cursor: string | null; module?: string }[],
): Promise<Record<string, EventsBatch>> {
if (packages.length === 0) return {};

const query = buildMultipleEventsQuery(packages.length);
const variables: Record<string, string | null> = {};

packages.forEach((pkg, i) => {
variables[`filter${i}`] = `${pkg.id}::nbtc`;
variables[`filter${i}`] = `${pkg.id}::${pkg.module ?? "nbtc"}`;
variables[`cursor${i}`] = pkg.cursor;
});

Expand Down
31 changes: 27 additions & 4 deletions packages/sui-indexer/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import {
type RedeemRequestEventRaw,
type SolvedEventRaw,
type SignatureRecordedEventRaw,
type CompletedSignEventRaw,
type RejectedSignEventRaw,
type SuiEventNode,
UtxoStatus,
} from "./models";
Expand All @@ -13,9 +15,9 @@ import { fromBase64 } from "@mysten/sui/utils";

export class SuiEventHandler {
private storage: D1Storage;
private setupId: number;
private setupId?: number;

constructor(storage: D1Storage, setupId: number) {
constructor(storage: D1Storage, setupId?: number) {
this.storage = storage;
this.setupId = setupId;
}
Expand All @@ -34,10 +36,21 @@ export class SuiEventHandler {
await this.handleSolved(json as SolvedEventRaw);
} else if (e.type.includes("::nbtc::redeem_request::SignatureRecordedEvent")) {
await this.handleIkaSignatureRecorded(json as SignatureRecordedEventRaw);
} else if (e.type.includes("CompletedSignEvent")) {
await this.handleCompletedSign(e.json as CompletedSignEventRaw);
} else if (e.type.includes("RejectedSignEvent")) {
await this.handleRejectedSign(e.json as RejectedSignEventRaw);
}
}
}

private getSetupId(): number {
if (this.setupId == undefined) {
throw new Error("Setup ID is not set");
}
return this.setupId;
}

private async handleMint(txDigest: string, e: MintEventRaw) {
// NOTE: Sui contract gives us the txid in big-endian, but bitcoinjs-lib's tx.getId()
// returns it in little-endian (see https://github.com/bitcoinjs/bitcoinjs-lib/blob/dc8d9e26f2b9c7380aec7877155bde97594a9ade/ts_src/transaction.ts#L617)
Expand All @@ -51,7 +64,7 @@ export class SuiEventHandler {
vout: e.btc_vout,
amount: Number(e.btc_amount),
script_pubkey: fromBase64(e.btc_script_publickey),
setup_id: this.setupId,
setup_id: this.getSetupId(),
status: UtxoStatus.Available,
locked_until: null,
});
Expand All @@ -65,7 +78,7 @@ export class SuiEventHandler {
recipient_script: fromBase64(e.recipient_script),
amount: Number(e.amount),
created_at: Number(e.created_at),
setup_id: this.setupId,
setup_id: this.getSetupId(),
sui_tx: txDigest,
});
logger.info({ msg: "Indexed Redeem Request", id: e.redeem_id });
Expand Down Expand Up @@ -105,4 +118,14 @@ export class SuiEventHandler {
utxoId: e.utxo_id,
});
}

private async handleCompletedSign(e: CompletedSignEventRaw) {
logger.info({ msg: "IKA sign completed", signId: e.sign_id });
//TODO: will handle the sign in the redeem-service in next PR
}

private async handleRejectedSign(e: RejectedSignEventRaw) {
logger.warn({ msg: "IKA sign rejected", signId: e.sign_id });
//TODO: will handle the sign in the redeem-service in next PR
}
}
28 changes: 20 additions & 8 deletions packages/sui-indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,27 @@ async function runSuiIndexer(storage: D1Storage, env: Env, activeNetworks: SuiNe

async function poolAndProcessEvents(netCfg: NetworkConfig, storage: D1Storage) {
const client = new SuiGraphQLClient(netCfg.url);
const packages = await storage.getActiveNbtcPkgs(netCfg.name);
if (packages.length === 0) return;
logger.info({
msg: `Processing network`,
network: netCfg.name,
packageCount: packages.length,
});
const p = new Processor(netCfg, storage, client);
await p.pollAllNbtcEvents(packages);

const nbtcPkgs = await storage.getActiveNbtcPkgs(netCfg.name);
if (nbtcPkgs.length > 0) {
logger.info({
msg: `Processing nBTC events`,
network: netCfg.name,
packageCount: nbtcPkgs.length,
});
await p.pollAllNbtcEvents(nbtcPkgs);
}

const ikaPkgs = await storage.getActiveCoordinatorPkgs(netCfg.name);
if (ikaPkgs.length > 0) {
logger.info({
msg: `Processing IKA coordinator events`,
network: netCfg.name,
packageCount: ikaPkgs.length,
});
await p.pollIkaEvents(ikaPkgs);
}
}

async function runRedeemSolver(storage: D1Storage, env: Env, activeNetworks: SuiNet[]) {
Expand Down
12 changes: 12 additions & 0 deletions packages/sui-indexer/src/models.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { SuiNet } from "@gonative-cc/lib/nsui";
import { BitcoinTxStatus } from "@gonative-cc/lib/nbtc";
import { createInterface } from "node:readline/promises";

export enum UtxoStatus {
Available = "available",
Expand Down Expand Up @@ -147,3 +148,14 @@ export interface SolveRedeemCall {
nbtcPkg: string;
nbtcContract: string;
}

export interface CompletedSignEventRaw {
sign_id: string;
signature: number[];
is_future_sign: boolean;
}

export interface RejectedSignEventRaw {
sign_id: string;
is_future_sign: boolean;
}
72 changes: 72 additions & 0 deletions packages/sui-indexer/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { D1Storage } from "./storage";
import { logError, logger } from "@gonative-cc/lib/logger";
import { SuiEventHandler } from "./handler";
import type { EventFetcher } from "./graphql-client";
import type { SuiNet } from "@gonative-cc/lib/nsui";

export class Processor {
netCfg: NetworkConfig;
Expand Down Expand Up @@ -78,4 +79,75 @@ export class Processor {
);
}
}

async pollIkaEvents(coordinatorPkgIds: string[]) {
try {
if (coordinatorPkgIds.length === 0) return;

const cursors = await this.storage.getIkaCursors(coordinatorPkgIds);

let hasAnyNextPage = true;
while (hasAnyNextPage) {
const packages = coordinatorPkgIds.map((pkgId) => ({
id: pkgId,
cursor: cursors[pkgId] || null,
module: "coordinator_inner",
}));

const results = await this.eventFetcher.fetchEvents(packages);

const cursorsToSave: {
coordinatorPkgId: string;
suiNetwork: SuiNet;
cursor: string;
}[] = [];
hasAnyNextPage = false;

for (const pkgId of coordinatorPkgIds) {
const result = results[pkgId];
if (!result) continue;

logger.debug({
msg: "Fetched IKA events",
network: this.netCfg.name,
coordinatorPkgId: pkgId,
eventsLength: result.events.length,
endCursor: result.endCursor,
});

if (result.events.length > 0) {
const handler = new SuiEventHandler(this.storage);
await handler.handleEvents(result.events);
}

if (result.endCursor && result.endCursor !== cursors[pkgId]) {
cursorsToSave.push({
coordinatorPkgId: pkgId,
suiNetwork: this.netCfg.name,
cursor: result.endCursor,
});
cursors[pkgId] = result.endCursor;
}

if (result.hasNextPage) {
hasAnyNextPage = true;
}
}

if (cursorsToSave.length > 0) {
await this.storage.saveIkaCursors(cursorsToSave);
}
}
} catch (e) {
logError(
{
msg: "Failed to index IKA coordinator events",
method: "pollIkaEvents",
network: this.netCfg,
coordinatorPkgIds,
},
e,
);
}
}
}
50 changes: 49 additions & 1 deletion packages/sui-indexer/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,54 @@ export class D1Storage {
return result;
}

async getIkaCursors(coordinatorPkgIds: string[]): Promise<Record<string, string | null>> {
if (coordinatorPkgIds.length === 0) return {};

const placeholders = coordinatorPkgIds.map(() => "?").join(",");
const res = await this.db
.prepare(
`SELECT coordinator_pkg_id, ika_cursor FROM indexer_ika_state WHERE coordinator_pkg_id IN (${placeholders})`,
)
.bind(...coordinatorPkgIds)
.all<{ coordinator_pkg_id: string; ika_cursor: string }>();

const result: Record<string, string | null> = {};
coordinatorPkgIds.forEach((id) => {
result[id] = null;
});
res.results.forEach((row) => {
result[row.coordinator_pkg_id] = row.ika_cursor || null;
});
return result;
}

async saveIkaCursors(
cursors: { coordinatorPkgId: string; suiNetwork: SuiNet; cursor: string }[],
): Promise<void> {
if (cursors.length === 0) return;

const stmt = this.db.prepare(
`INSERT INTO indexer_ika_state (coordinator_pkg_id, sui_network, ika_cursor, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(coordinator_pkg_id) DO UPDATE SET ika_cursor = excluded.ika_cursor, updated_at = excluded.updated_at`,
);

const now = Date.now();
const batch = cursors.map(({ coordinatorPkgId, suiNetwork, cursor }) =>
stmt.bind(coordinatorPkgId, suiNetwork, cursor, now),
);
await this.db.batch(batch);
}

async getActiveCoordinatorPkgs(suiNetwork: SuiNet): Promise<string[]> {
const { results } = await this.db
.prepare("SELECT coordinator_pkg_id FROM indexer_ika_state WHERE sui_network = ?")
.bind(suiNetwork)
.all<{ coordinator_pkg_id: string }>();

return results.map((r) => r.coordinator_pkg_id);
}

// Saves multiple cursor positions for querying Sui events.
async saveMultipleSuiGqlCursors(cursors: { setupId: number; cursor: string }[]): Promise<void> {
if (cursors.length === 0) return;
Expand Down Expand Up @@ -301,7 +349,7 @@ export class D1Storage {

const batch = utxoIds.map((utxoId, i) => {
// dwalletIds[i] is guaranteed to exist due to length check
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
return stmt.bind(redeemId, utxoId, i, dwalletIds[i]!, now);
});

Expand Down