diff --git a/packages/btcindexer/db/migrations/0001_initial_schema.sql b/packages/btcindexer/db/migrations/0001_initial_schema.sql index 560bc03d..018f9f2f 100644 --- a/packages/btcindexer/db/migrations/0001_initial_schema.sql +++ b/packages/btcindexer/db/migrations/0001_initial_schema.sql @@ -127,3 +127,9 @@ CREATE TABLE IF NOT EXISTS presign_objects ( ) STRICT; CREATE INDEX IF NOT EXISTS presign_objects_sui_network_created_at ON presign_objects(sui_network, created_at); + +CREATE TABLE IF NOT EXISTS cron_locks ( + lock_name TEXT NOT NULL PRIMARY KEY, + acquired_at INTEGER NOT NULL, + expires_at INTEGER NOT NULL +) STRICT; \ No newline at end of file diff --git a/packages/lib/src/test-helpers/init_db.ts b/packages/lib/src/test-helpers/init_db.ts index 1db3649e..8e9e9029 100644 --- a/packages/lib/src/test-helpers/init_db.ts +++ b/packages/lib/src/test-helpers/init_db.ts @@ -42,6 +42,7 @@ export const tables = [ "btc_blocks", "indexer_state", "presign_objects", + "cron_locks", "setups", ]; diff --git a/packages/sui-indexer/src/index.ts b/packages/sui-indexer/src/index.ts index fd3ccbf3..087f534e 100644 --- a/packages/sui-indexer/src/index.ts +++ b/packages/sui-indexer/src/index.ts @@ -25,19 +25,38 @@ export default { }, async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise { const storage = new D1Storage(env.DB); - const activeNetworks = await storage.getActiveNetworks(); - - const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC); - const suiClients = await createSuiClients(activeNetworks, mnemonic); - - // Run both indexer and redeem solver tasks in parallel - const results = await Promise.allSettled([ - runSuiIndexer(storage, activeNetworks, suiClients), - runRedeemSolver(storage, env, suiClients, activeNetworks), - ]); + // Distributed lock to prevent overlapping cron executions from selecting + // same Sui coins. Since CF Workers doesn't guarantee sequential cron running, + // if a run exceeds the 1-min interval, the next trigger starts concurrently. + const lockToken = await storage.acquireLock("cron-sui-indexer", 5 * 60 * 1000); + if (lockToken === null) { + logger.warn({ + msg: "Cron job already running, skipping this execution", + lockName: "cron-sui-indexer", + }); + return; + } - // Check for any rejected promises and log errors - reportErrors(results, "scheduled", "Scheduled task error", ["SuiIndexer", "RedeemSolver"]); + try { + const activeNetworks = await storage.getActiveNetworks(); + + const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC); + const suiClients = await createSuiClients(activeNetworks, mnemonic); + + // Run both indexer and redeem solver tasks in parallel + const results = await Promise.allSettled([ + runSuiIndexer(storage, activeNetworks, suiClients), + runRedeemSolver(storage, env, suiClients, activeNetworks), + ]); + + // Check for any rejected promises and log errors + reportErrors(results, "scheduled", "Scheduled task error", [ + "SuiIndexer", + "RedeemSolver", + ]); + } finally { + await storage.releaseLock("cron-sui-indexer"); + } }, } satisfies ExportedHandler; diff --git a/packages/sui-indexer/src/storage.test.ts b/packages/sui-indexer/src/storage.test.ts index b12874f9..f1101ade 100644 --- a/packages/sui-indexer/src/storage.test.ts +++ b/packages/sui-indexer/src/storage.test.ts @@ -597,4 +597,53 @@ describe("IndexerStorage", () => { expect(inputs[0]!.input_index).toBe(0); expect(inputs[1]!.input_index).toBe(1); }); + + describe("Distributed Lock", () => { + async function getLock(lockName: string) { + return db + .prepare("SELECT * FROM cron_locks WHERE lock_name = ?") + .bind(lockName) + .first<{ lock_name: string; acquired_at: number; expires_at: number }>(); + } + + it("should acquire lock and reject duplicate", async () => { + const token = await storage.acquireLock("test-lock", 60000); + expect(token).not.toBeNull(); + + const lock = await getLock("test-lock"); + expect(lock).not.toBeNull(); + expect(lock!.lock_name).toBe("test-lock"); + + // second acquire should fail + const second = await storage.acquireLock("test-lock", 60000); + expect(second).toBeNull(); + }); + + it("should acquire lock when existing lock is expired", async () => { + const expiredTime = Date.now() - 10000; + await db + .prepare( + "INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)", + ) + .bind("test-lock", expiredTime - 60000, expiredTime) + .run(); + + const token = await storage.acquireLock("test-lock", 60000); + expect(token).not.toBeNull(); + + const lock = await getLock("test-lock"); + expect(lock!.expires_at).toBeGreaterThan(Date.now()); + }); + + it("should release lock and allow reacquiring", async () => { + const token = await storage.acquireLock("test-lock", 60000); + expect(token).not.toBeNull(); + + await storage.releaseLock("test-lock"); + expect(await getLock("test-lock")).toBeNull(); + + const second = await storage.acquireLock("test-lock", 60000); + expect(second).not.toBeNull(); + }); + }); }); diff --git a/packages/sui-indexer/src/storage.ts b/packages/sui-indexer/src/storage.ts index ab24e4b6..a54f7a27 100644 --- a/packages/sui-indexer/src/storage.ts +++ b/packages/sui-indexer/src/storage.ts @@ -854,6 +854,39 @@ export class D1Storage { sui_network: toSuiNet(result.sui_network), }; } + + async acquireLock(lockName: string, ttlMs: number): Promise { + const now = Date.now(); + try { + const result = await this.db + .prepare( + `INSERT INTO cron_locks (lock_name, acquired_at, expires_at) + VALUES (?, ?, ?) + ON CONFLICT(lock_name) DO UPDATE + SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at + WHERE cron_locks.expires_at <= excluded.acquired_at + RETURNING acquired_at`, + ) + .bind(lockName, now, now + ttlMs) + .first("acquired_at"); + return result ?? null; + } catch (error) { + logError({ msg: "Failed to acquire lock", method: "acquireLock", lockName }, error); + throw error; + } + } + + async releaseLock(lockName: string): Promise { + try { + await this.db + .prepare(`DELETE FROM cron_locks WHERE lock_name = ?`) + .bind(lockName) + .run(); + } catch (error) { + logError({ msg: "Failed to release lock", method: "releaseLock", lockName }, error); + throw error; + } + } } export async function insertRedeemRequest(