Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions packages/sui-indexer/src/graphql-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface EventsBatch {

export interface EventFetcher {
fetchEvents: (
packages: { id: string; cursor: string | null }[],
packages: { id: string; module: string; cursor: string | null }[],
) => Promise<Record<string, EventsBatch>>;
}

Expand Down Expand Up @@ -77,15 +77,15 @@ export class SuiGraphQLClient implements EventFetcher {
}

async fetchEvents(
packages: { id: string; cursor: string | null }[],
packages: { id: string; module: string; cursor: string | null }[],
): Promise<Record<string, EventsBatch>> {
if (packages.length === 0) return {};

const query = buildMultipleEventsQuery(packages.length);
const variables: Record<string, string | null> = {};

packages.forEach((pkg, i) => {
variables[`filter${i}`] = `${pkg.id}::nbtc`;
variables[`filter${i}`] = `${pkg.id}::${pkg.module}`;
variables[`cursor${i}`] = pkg.cursor;
});

Expand All @@ -105,7 +105,9 @@ export class SuiGraphQLClient implements EventFetcher {
txDigest: node.transaction.digest,
}));

result[pkg.id] = {
// Use composite key: packageId::module for unique identification
const key = `${pkg.id}::${pkg.module}`;
result[key] = {
events,
endCursor: eventsData.pageInfo.endCursor,
hasNextPage: eventsData.pageInfo.hasNextPage,
Expand Down
61 changes: 60 additions & 1 deletion packages/sui-indexer/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import {
type SolvedEventRaw,
type SignatureRecordedEventRaw,
type SuiEventNode,
type IkaCompletedSignEventRaw,
type IkaRejectedSignEventRaw,
UtxoStatus,
} from "./models";
import { logger } from "@gonative-cc/lib/logger";
import { logError, logger } from "@gonative-cc/lib/logger";
import { fromBase64 } from "@mysten/sui/utils";

export class SuiEventHandler {
Expand Down Expand Up @@ -105,4 +107,61 @@ export class SuiEventHandler {
utxoId: e.utxo_id,
});
}

public async handleIkaEvents(events: SuiEventNode[]) {
const results = await Promise.allSettled(
events.map((e) => {
if (e.type.includes("::coordinator_inner::CompletedSignEvent")) {
return this.handleCompletedSign(e);
} else if (e.type.includes("::coordinator_inner::RejectedSignEvent")) {
return this.handleRejectedSign(e);
}
return Promise.resolve();
}),
);

results.forEach((result, i) => {
if (result.status === "rejected") {
const e = events[i];
logError(
{
msg: "Failed to handle Ika event",
method: "handleIkaEvents",
eventType: e?.type,
txDigest: e?.txDigest,
},
result.reason,
);
}
});
}

private async handleCompletedSign(e: SuiEventNode) {
const data = e.json as IkaCompletedSignEventRaw;

logger.info({
msg: "Ika signature completed",
sign_id: data.sign_id,
is_future_sign: data.is_future_sign,
signature_length: data.signature.length,
txDigest: e.txDigest,
setupId: this.setupId,
});

// TODO: Call redeem_solver service binding to record signature
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private async handleRejectedSign(e: SuiEventNode) {
const data = e.json as IkaRejectedSignEventRaw;

logger.warn({
msg: "Ika signature rejected",
sign_id: data.sign_id,
is_future_sign: data.is_future_sign,
txDigest: e.txDigest,
setupId: this.setupId,
});

// TODO: Call redeem_solver service binding to mark signature as rejected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
2 changes: 1 addition & 1 deletion packages/sui-indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async function poolAndProcessEvents(netCfg: NetworkConfig, storage: D1Storage) {
packageCount: packages.length,
});
const p = new Processor(netCfg, storage, client);
await p.pollAllNbtcEvents(packages);
await p.pollEvents(packages);
}

async function runRedeemSolver(storage: D1Storage, env: Env, activeNetworks: SuiNet[]) {
Expand Down
11 changes: 11 additions & 0 deletions packages/sui-indexer/src/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ export interface PkgCfg {
nbtc_pkg: string;
}

export interface IkaCompletedSignEventRaw {
sign_id: string;
signature: number[]; // vector<u8>
is_future_sign: boolean;
}

export interface IkaRejectedSignEventRaw {
sign_id: string;
is_future_sign: boolean;
}

// Arguments for the contract call
export interface ProposeRedeemCall {
redeemId: number;
Expand Down
100 changes: 80 additions & 20 deletions packages/sui-indexer/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { D1Storage } from "./storage";
import { logError, logger } from "@gonative-cc/lib/logger";
import { SuiEventHandler } from "./handler";
import type { EventFetcher } from "./graphql-client";
import { getNetworkConfig } from "@ika.xyz/sdk";

export class Processor {
netCfg: NetworkConfig;
Expand All @@ -15,46 +16,106 @@ export class Processor {
this.eventFetcher = eventFetcher;
}

// poll Nbtc events by multiple package ids
async pollAllNbtcEvents(nbtcPkgs: PkgCfg[]) {
const setupIds = nbtcPkgs.map((pkg) => pkg.id);
// Polls events (nBTC + Ika coordinator) from multiple packages
async pollEvents(nbtcPkgs: PkgCfg[]) {
try {
if (nbtcPkgs.length === 0) return;

const cursors = await this.storage.getMultipleSuiGqlCursors(setupIds);
// Get coordinator package from Ika SDK
const network = this.netCfg.name;
const coordinatorPkg =
network === "mainnet" || network === "testnet"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a note why we only handle two networks here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

? getNetworkConfig(network).packages.ikaSystemPackage
: null;

const packages: {
cursorId: number;
setupId: number;
pkg: string;
module: string;
isCoordinator: boolean;
}[] = [];

for (const pkg of nbtcPkgs) {
packages.push({
cursorId: pkg.id,
setupId: pkg.id,
pkg: pkg.nbtc_pkg,
module: "nbtc",
isCoordinator: false,
});
}

if (coordinatorPkg) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets extract the ika events to a separate function and just call it from here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

packages.push({
cursorId: -1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain the -1? i am a little bit lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use -1 to avoid collisions with the nBTC cursor IDs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you give an example of how it works

setupId: -1,
pkg: coordinatorPkg,
module: "coordinator_inner",
isCoordinator: true,
});
}

const cursors = await this.storage.getMultipleSuiGqlCursors(
packages.map((p) => p.cursorId),
);

let hasAnyNextPage = true;
while (hasAnyNextPage) {
const packages = nbtcPkgs.map((pkg) => ({
id: pkg.nbtc_pkg,
cursor: cursors[pkg.id] || null,
const fetchRequests = packages.map((p) => ({
id: p.pkg,
module: p.module,
cursor: cursors[p.cursorId] || null,
}));

const results = await this.eventFetcher.fetchEvents(packages);

const results = await this.eventFetcher.fetchEvents(fetchRequests);
const cursorsToSave: { setupId: number; cursor: string }[] = [];
hasAnyNextPage = false;

for (const pkg of nbtcPkgs) {
const result = results[pkg.nbtc_pkg];
for (const p of packages) {
const key = `${p.pkg}::${p.module}`;
const result = results[key];
if (!result) continue;

logger.debug({
msg: `Fetched events`,
network: this.netCfg.name,
setupIds,
module: p.module,
setupId: p.setupId,
eventsLength: result.events.length,
endCursor: result.endCursor,
});

let processingSucceeded = true;
if (result.events.length > 0) {
const handler = new SuiEventHandler(this.storage, pkg.id);
await handler.handleEvents(result.events);
try {
const handler = new SuiEventHandler(this.storage, p.setupId);
if (p.isCoordinator) {
await handler.handleIkaEvents(result.events);
} else {
await handler.handleEvents(result.events);
}
} catch (error) {
processingSucceeded = false;
logError(
{
msg: "Failed to process events",
method: "pollEvents",
setupId: p.setupId,
module: p.module,
},
error,
);
}
}

if (result.endCursor && result.endCursor !== cursors[pkg.id]) {
cursorsToSave.push({ setupId: pkg.id, cursor: result.endCursor });
cursors[pkg.id] = result.endCursor;
// Only advance cursor if processing succeeded or there were no events to process
if (
processingSucceeded &&
result.endCursor &&
result.endCursor !== cursors[p.cursorId]
) {
cursorsToSave.push({ setupId: p.cursorId, cursor: result.endCursor });
cursors[p.cursorId] = result.endCursor;
}

if (result.hasNextPage) {
Expand All @@ -70,9 +131,8 @@ export class Processor {
logError(
{
msg: "Failed to index packages",
method: "queryNewEvents",
method: "pollEvents",
network: this.netCfg,
setupIds,
},
e,
);
Expand Down