Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 58 additions & 35 deletions packages/sui-indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,52 +13,75 @@ import type { BtcIndexerRpc } from "@gonative-cc/btcindexer/rpc-interface";
import HttpRouter from "./redeem-router";
import type { SuiNet } from "@gonative-cc/lib/nsui";

const router = new HttpRouter();

// Export RPC entrypoints for service bindings
export { RPC } from "./rpc";
export { RPCMock } from "./rpc-mocks";

const router = new HttpRouter();
const CRON_LOCK_TTL_MS = 5 * 60_000; // 5 minutes

export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
return router.fetch(request, env, ctx);
},
async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
const storage = new D1Storage(env.DB);
// Distributed lock to prevent overlapping cron executions from selecting
// same Sui coins. Since CF Workers doesn't guarantee sequential cron running,
// if a run exceeds the 1-min interval, the next trigger starts concurrently.
const lockToken = await storage.acquireLock("cron-sui-indexer", 5 * 60 * 1000);
if (lockToken === null) {
logger.warn({
msg: "Cron job already running, skipping this execution",
lockName: "cron-sui-indexer",
});
return;
scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
return startCronJobs(env);
},
} satisfies ExportedHandler<Env>;

type JobRunner = () => Promise<void>;

interface JobDefinition {
name: string;
run: JobRunner;
}

async function startCronJobs(env: Env): Promise<void> {
const storage = new D1Storage(env.DB);
const activeNetworks = await storage.getActiveNetworks();
const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
const suiClients = await createSuiClients(activeNetworks, mnemonic);

const jobs: JobDefinition[] = [
{
name: "CronSuiIndexer",
run: () => runSuiIndexer(storage, activeNetworks, suiClients),
},
{
name: "CronRedeemSolver",
run: () => runRedeemSolver(storage, env, suiClients, activeNetworks),
},
];

const acquiredLocks: string[] = [];

try {
const lockNames = jobs.map((j) => j.name);
const lockTokens = await storage.acquireLocks(lockNames, CRON_LOCK_TTL_MS);
const tasks: Promise<void>[] = [];

for (let i = 0; i < lockTokens.length; ++i) {
if (lockTokens[i] === null) {
logger.warn({
msg: "Lock is busy, skipping",
lockName: lockNames[i],
});
continue;
}
acquiredLocks.push(lockNames[i]!);
tasks.push(jobs[i]!.run());
}

try {
const activeNetworks = await storage.getActiveNetworks();

const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
const suiClients = await createSuiClients(activeNetworks, mnemonic);

// Run both indexer and redeem solver tasks in parallel
const results = await Promise.allSettled([
runSuiIndexer(storage, activeNetworks, suiClients),
runRedeemSolver(storage, env, suiClients, activeNetworks),
]);

// Check for any rejected promises and log errors
reportErrors(results, "scheduled", "Scheduled task error", [
"SuiIndexer",
"RedeemSolver",
]);
} finally {
await storage.releaseLock("cron-sui-indexer");
if (tasks.length === 0) return;

const results = await Promise.allSettled(tasks);
reportErrors(results, "scheduled", "Scheduled task error", acquiredLocks);
} finally {
if (acquiredLocks.length > 0) {
await storage.releaseLocks(acquiredLocks);
}
},
} satisfies ExportedHandler<Env>;
}
}

async function runSuiIndexer(
storage: D1Storage,
Expand Down
58 changes: 44 additions & 14 deletions packages/sui-indexer/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -606,17 +606,28 @@ describe("IndexerStorage", () => {
.first<{ lock_name: string; acquired_at: number; expires_at: number }>();
}

it("should handle empty lock names array", async () => {
const tokens = await storage.acquireLocks([], 60000);
expect(tokens).toEqual([]);

await storage.releaseLocks([]);
});

it("should acquire lock and reject duplicate", async () => {
const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();
const tokens = await storage.acquireLocks(["test-lock"], 60000);
expect(tokens[0]).not.toBeNull();

const lock = await getLock("test-lock");
expect(lock).not.toBeNull();
expect(lock!.lock_name).toBe("test-lock");

// second acquire should fail
const second = await storage.acquireLock("test-lock", 60000);
expect(second).toBeNull();
let second = await storage.acquireLocks(["test-lock"], 60000);
expect(second[0]).toBeNull();

await storage.releaseLocks(["test-lock"]);
expect(await getLock("test-lock")).toBeNull();
second = await storage.acquireLocks(["test-lock"], 60000);
expect(second[0]).not.toBeNull();
});

it("should acquire lock when existing lock is expired", async () => {
Expand All @@ -628,22 +639,41 @@ describe("IndexerStorage", () => {
.bind("test-lock", expiredTime - 60000, expiredTime)
.run();

const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();
const tokens = await storage.acquireLocks(["test-lock"], 60000);
expect(tokens[0]).not.toBeNull();

const lock = await getLock("test-lock");
expect(lock!.expires_at).toBeGreaterThan(Date.now());
});

it("should release lock and allow reacquiring", async () => {
const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();
it("should acquire multiple locks with partial success", async () => {
await storage.acquireLocks(["lock-x"], 60000);

await storage.releaseLock("test-lock");
expect(await getLock("test-lock")).toBeNull();
const tokens = await storage.acquireLocks(["lock-y", "lock-x", "lock-z"], 60000);
expect(tokens).toHaveLength(3);
expect(tokens[0]).not.toBeNull();
expect(tokens[1]).toBeNull();
expect(tokens[2]).not.toBeNull();

const lockX = await getLock("lock-x");
expect(lockX).not.toBeNull();
});

it("should acquire all locks when existing locks are expired", async () => {
const now = Date.now();
const expiredTime = now - 10000;
await db
.prepare(
"INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)",
)
.bind("lock-x", expiredTime - 60000, expiredTime)
.run();

const second = await storage.acquireLock("test-lock", 60000);
expect(second).not.toBeNull();
const tokens = await storage.acquireLocks(["lock-y", "lock-x", "lock-z"], 60000);
expect(tokens).toHaveLength(3);
expect(tokens[0]).toBeGreaterThan(now);
expect(tokens[1]).toBeGreaterThan(now);
expect(tokens[2]).toBeGreaterThan(now);
});
});
});
40 changes: 26 additions & 14 deletions packages/sui-indexer/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,35 +855,47 @@ export class D1Storage {
};
}

async acquireLock(lockName: string, ttlMs: number): Promise<number | null> {
async acquireLocks(lockNames: string[], ttlMs: number): Promise<(number | null)[]> {
if (lockNames.length === 0) return [];

const now = Date.now();
const expiresAt = now + ttlMs;
const valueRows = lockNames.map(() => "(?, ?, ?)").join(", ");
const params = lockNames.flatMap((name) => [name, now, expiresAt]);

try {
const result = await this.db
const { results } = await this.db
.prepare(
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
VALUES (?, ?, ?)
VALUES ${valueRows}
ON CONFLICT(lock_name) DO UPDATE
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
WHERE cron_locks.expires_at <= excluded.acquired_at
RETURNING acquired_at`,
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
WHERE cron_locks.expires_at <= excluded.acquired_at
RETURNING lock_name, acquired_at`,
)
.bind(lockName, now, now + ttlMs)
.first<number>("acquired_at");
return result ?? null;
.bind(...params)
.all<{ lock_name: string; acquired_at: number }>();

const acquiredMap = new Map(results.map((r) => [r.lock_name, r.acquired_at]));
return lockNames.map((name) => acquiredMap.get(name) ?? null);
} catch (error) {
logError({ msg: "Failed to acquire lock", method: "acquireLock", lockName }, error);
logError({ msg: "Failed to acquire locks", method: "acquireLocks" }, error);
throw error;
}
}

async releaseLock(lockName: string): Promise<void> {
async releaseLocks(lockNames: string[]): Promise<void> {
if (lockNames.length === 0) return;

const placeholders = lockNames.map(() => "?").join(", ");

try {
await this.db
.prepare(`DELETE FROM cron_locks WHERE lock_name = ?`)
.bind(lockName)
.prepare(`DELETE FROM cron_locks WHERE lock_name IN (${placeholders})`)
.bind(...lockNames)
.run();
} catch (error) {
logError({ msg: "Failed to release lock", method: "releaseLock", lockName }, error);
logError({ msg: "Failed to release locks", method: "releaseLocks" }, error);
throw error;
}
}
Expand Down