Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 48 additions & 33 deletions packages/sui-indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { WorkerEntrypoint } from "cloudflare:workers";
import type { BtcIndexerRpc } from "@gonative-cc/btcindexer/rpc-interface";
import HttpRouter from "./redeem-router";
import type { SuiNet } from "@gonative-cc/lib/nsui";
import type { PipelinePromise } from "stream";

const router = new HttpRouter();

Expand All @@ -23,42 +24,56 @@ export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
return router.fetch(request, env, ctx);
},
async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
const storage = new D1Storage(env.DB);
// Distributed lock to prevent overlapping cron executions from selecting
// same Sui coins. Since CF Workers doesn't guarantee sequential cron running,
// if a run exceeds the 1-min interval, the next trigger starts concurrently.
const lockToken = await storage.acquireLock("cron-sui-indexer", 5 * 60 * 1000);
if (lockToken === null) {
logger.warn({
msg: "Cron job already running, skipping this execution",
lockName: "cron-sui-indexer",
});
return;
scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
return startCronJobs(env);
},
} satisfies ExportedHandler<Env>;

async function startCronJobs(env: Env): Promise<void> {
const storage = new D1Storage(env.DB);
const activeNetworks = await storage.getActiveNetworks();
const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
const suiClients = await createSuiClients(activeNetworks, mnemonic);
const lockNames = ["CronSuiIndexer", "CronRedeemSolver"];
const minute = 60_000;

const acquiredLocks: string[] = [];

try {
const lockTokens = await storage.acquireLocks(lockNames, 5 * minute);
const jobs: Promise<void>[] = [];

for (let i = 0; i < lockTokens.length; ++i) {
const name = lockNames[i]!;
if (lockTokens[i] === null) {
logger.warn({
msg: name + " lock is busy, skipping",
});
continue;
}
acquiredLocks.push(name);
switch (i) {
case 0:
jobs.push(runSuiIndexer(storage, activeNetworks, suiClients));
break;
case 1:
jobs.push(runRedeemSolver(storage, env, suiClients, activeNetworks));
break;
default:
logger.error({ msg: "unhandled job index: " + i });
}
}

try {
const activeNetworks = await storage.getActiveNetworks();

const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
const suiClients = await createSuiClients(activeNetworks, mnemonic);

// Run both indexer and redeem solver tasks in parallel
const results = await Promise.allSettled([
runSuiIndexer(storage, activeNetworks, suiClients),
runRedeemSolver(storage, env, suiClients, activeNetworks),
]);

// Check for any rejected promises and log errors
reportErrors(results, "scheduled", "Scheduled task error", [
"SuiIndexer",
"RedeemSolver",
]);
} finally {
await storage.releaseLock("cron-sui-indexer");
if (jobs.length === 0) return;

const results = await Promise.allSettled(jobs);
reportErrors(results, "scheduled", "Scheduled task error", acquiredLocks);
} finally {
if (acquiredLocks.length > 0) {
await storage.releaseLocks(acquiredLocks);
}
},
} satisfies ExportedHandler<Env>;
}
}

async function runSuiIndexer(
storage: D1Storage,
Expand Down
51 changes: 37 additions & 14 deletions packages/sui-indexer/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,16 +607,20 @@ describe("IndexerStorage", () => {
}

it("should acquire lock and reject duplicate", async () => {
const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();
const tokens = await storage.acquireLocks(["test-lock"], 60000);
expect(tokens[0]).not.toBeNull();

const lock = await getLock("test-lock");
expect(lock).not.toBeNull();
expect(lock!.lock_name).toBe("test-lock");

// second acquire should fail
const second = await storage.acquireLock("test-lock", 60000);
expect(second).toBeNull();
let second = await storage.acquireLocks(["test-lock"], 60000);
expect(second[0]).toBeNull();

await storage.releaseLocks(["test-lock"]);
expect(await getLock("test-lock")).toBeNull();
second = await storage.acquireLocks(["test-lock"], 60000);
expect(second[0]).not.toBeNull();
});

it("should acquire lock when existing lock is expired", async () => {
Expand All @@ -628,22 +632,41 @@ describe("IndexerStorage", () => {
.bind("test-lock", expiredTime - 60000, expiredTime)
.run();

const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();
const tokens = await storage.acquireLocks(["test-lock"], 60000);
expect(tokens[0]).not.toBeNull();

const lock = await getLock("test-lock");
expect(lock!.expires_at).toBeGreaterThan(Date.now());
});

it("should release lock and allow reacquiring", async () => {
const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();
it("should acquire multiple locks with partial success", async () => {
await storage.acquireLocks(["lock-x"], 60000);

await storage.releaseLock("test-lock");
expect(await getLock("test-lock")).toBeNull();
const tokens = await storage.acquireLocks(["lock-y", "lock-x", "lock-z"], 60000);
expect(tokens).toHaveLength(3);
expect(tokens[0]).not.toBeNull();
expect(tokens[1]).toBeNull();
expect(tokens[2]).not.toBeNull();

const lockX = await getLock("lock-x");
expect(lockX).not.toBeNull();
});

it("should acquire all locks when existing locks are expired", async () => {
const now = Date.now();
const expiredTime = now - 10000;
await db
.prepare(
"INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)",
)
.bind("lock-x", expiredTime - 60000, expiredTime)
.run();

const second = await storage.acquireLock("test-lock", 60000);
expect(second).not.toBeNull();
const tokens = await storage.acquireLocks(["lock-y", "lock-x", "lock-z"], 60000);
expect(tokens).toHaveLength(3);
expect(tokens[0]).toBeGreaterThan(now);
expect(tokens[1]).toBeGreaterThan(now);
expect(tokens[2]).toBeGreaterThan(now);
});
});
});
50 changes: 32 additions & 18 deletions packages/sui-indexer/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,35 +855,49 @@ export class D1Storage {
};
}

async acquireLock(lockName: string, ttlMs: number): Promise<number | null> {
async acquireLocks(lockNames: string[], ttlMs: number): Promise<(number | null)[]> {
if (lockNames.length === 0) return [];

const now = Date.now();
const expiresAt = now + ttlMs;

const stmt = this.db.prepare(
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
VALUES (?, ?, ?)
ON CONFLICT(lock_name) DO UPDATE
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
WHERE cron_locks.expires_at <= excluded.acquired_at
RETURNING acquired_at`,
);

const batch = lockNames.map((lockName) => stmt.bind(lockName, now, expiresAt));

try {
const result = await this.db
.prepare(
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
VALUES (?, ?, ?)
ON CONFLICT(lock_name) DO UPDATE
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
WHERE cron_locks.expires_at <= excluded.acquired_at
RETURNING acquired_at`,
)
.bind(lockName, now, now + ttlMs)
.first<number>("acquired_at");
return result ?? null;
const results = await this.db.batch(batch);
return results.map((r) => {
if (!r.success || !r.results || r.results.length === 0) {
return null;
}
return (r.results[0] as unknown as { acquired_at: number }).acquired_at;
});
} catch (error) {
logError({ msg: "Failed to acquire lock", method: "acquireLock", lockName }, error);
logError({ msg: "Failed to acquire locks", method: "acquireLocks" }, error);
throw error;
}
}

async releaseLock(lockName: string): Promise<void> {
async releaseLocks(lockNames: string[]): Promise<void> {
if (lockNames.length === 0) return;

const placeholders = lockNames.map(() => "?").join(", ");

try {
await this.db
.prepare(`DELETE FROM cron_locks WHERE lock_name = ?`)
.bind(lockName)
.prepare(`DELETE FROM cron_locks WHERE lock_name IN (${placeholders})`)
.bind(...lockNames)
.run();
} catch (error) {
logError({ msg: "Failed to release lock", method: "releaseLock", lockName }, error);
logError({ msg: "Failed to release locks", method: "releaseLocks" }, error);
throw error;
}
}
Expand Down