Skip to content

Commit 58c150d

Browse files
feat: don't prevent sui-indexer running on locked RedeemSolver (#362)
* feat: don't prevent sui-indexer running on locked RedeemSolver Signed-off-by: Robert Zaremba <robert@zaremba.ch> * optimise cron Signed-off-by: Robert Zaremba <robert@zaremba.ch> * lint Signed-off-by: Robert Zaremba <robert@zaremba.ch> * updates Signed-off-by: Robert Zaremba <robert@zaremba.ch> --------- Signed-off-by: Robert Zaremba <robert@zaremba.ch>
1 parent 29981ca commit 58c150d

File tree

3 files changed

+128
-63
lines changed

3 files changed

+128
-63
lines changed

packages/sui-indexer/src/index.ts

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,52 +13,75 @@ import type { BtcIndexerRpc } from "@gonative-cc/btcindexer/rpc-interface";
1313
import HttpRouter from "./redeem-router";
1414
import type { SuiNet } from "@gonative-cc/lib/nsui";
1515

16-
const router = new HttpRouter();
17-
1816
// Export RPC entrypoints for service bindings
1917
export { RPC } from "./rpc";
2018
export { RPCMock } from "./rpc-mocks";
2119

20+
const router = new HttpRouter();
21+
const CRON_LOCK_TTL_MS = 5 * 60_000; // 5 minutes
22+
2223
export default {
2324
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
2425
return router.fetch(request, env, ctx);
2526
},
26-
async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
27-
const storage = new D1Storage(env.DB);
28-
// Distributed lock to prevent overlapping cron executions from selecting
29-
// same Sui coins. Since CF Workers doesn't guarantee sequential cron running,
30-
// if a run exceeds the 1-min interval, the next trigger starts concurrently.
31-
const lockToken = await storage.acquireLock("cron-sui-indexer", 5 * 60 * 1000);
32-
if (lockToken === null) {
33-
logger.warn({
34-
msg: "Cron job already running, skipping this execution",
35-
lockName: "cron-sui-indexer",
36-
});
37-
return;
27+
scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
28+
return startCronJobs(env);
29+
},
30+
} satisfies ExportedHandler<Env>;
31+
32+
type JobRunner = () => Promise<void>;
33+
34+
interface JobDefinition {
35+
name: string;
36+
run: JobRunner;
37+
}
38+
39+
async function startCronJobs(env: Env): Promise<void> {
40+
const storage = new D1Storage(env.DB);
41+
const activeNetworks = await storage.getActiveNetworks();
42+
const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
43+
const suiClients = await createSuiClients(activeNetworks, mnemonic);
44+
45+
const jobs: JobDefinition[] = [
46+
{
47+
name: "CronSuiIndexer",
48+
run: () => runSuiIndexer(storage, activeNetworks, suiClients),
49+
},
50+
{
51+
name: "CronRedeemSolver",
52+
run: () => runRedeemSolver(storage, env, suiClients, activeNetworks),
53+
},
54+
];
55+
56+
const acquiredLocks: string[] = [];
57+
58+
try {
59+
const lockNames = jobs.map((j) => j.name);
60+
const lockTokens = await storage.acquireLocks(lockNames, CRON_LOCK_TTL_MS);
61+
const tasks: Promise<void>[] = [];
62+
63+
for (let i = 0; i < lockTokens.length; ++i) {
64+
if (lockTokens[i] === null) {
65+
logger.warn({
66+
msg: "Lock is busy, skipping",
67+
lockName: lockNames[i],
68+
});
69+
continue;
70+
}
71+
acquiredLocks.push(lockNames[i]!);
72+
tasks.push(jobs[i]!.run());
3873
}
3974

40-
try {
41-
const activeNetworks = await storage.getActiveNetworks();
42-
43-
const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
44-
const suiClients = await createSuiClients(activeNetworks, mnemonic);
45-
46-
// Run both indexer and redeem solver tasks in parallel
47-
const results = await Promise.allSettled([
48-
runSuiIndexer(storage, activeNetworks, suiClients),
49-
runRedeemSolver(storage, env, suiClients, activeNetworks),
50-
]);
51-
52-
// Check for any rejected promises and log errors
53-
reportErrors(results, "scheduled", "Scheduled task error", [
54-
"SuiIndexer",
55-
"RedeemSolver",
56-
]);
57-
} finally {
58-
await storage.releaseLock("cron-sui-indexer");
75+
if (tasks.length === 0) return;
76+
77+
const results = await Promise.allSettled(tasks);
78+
reportErrors(results, "scheduled", "Scheduled task error", acquiredLocks);
79+
} finally {
80+
if (acquiredLocks.length > 0) {
81+
await storage.releaseLocks(acquiredLocks);
5982
}
60-
},
61-
} satisfies ExportedHandler<Env>;
83+
}
84+
}
6285

6386
async function runSuiIndexer(
6487
storage: D1Storage,

packages/sui-indexer/src/storage.test.ts

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -606,17 +606,28 @@ describe("IndexerStorage", () => {
606606
.first<{ lock_name: string; acquired_at: number; expires_at: number }>();
607607
}
608608

609+
it("should handle empty lock names array", async () => {
610+
const tokens = await storage.acquireLocks([], 60000);
611+
expect(tokens).toEqual([]);
612+
613+
await storage.releaseLocks([]);
614+
});
615+
609616
it("should acquire lock and reject duplicate", async () => {
610-
const token = await storage.acquireLock("test-lock", 60000);
611-
expect(token).not.toBeNull();
617+
const tokens = await storage.acquireLocks(["test-lock"], 60000);
618+
expect(tokens[0]).not.toBeNull();
612619

613620
const lock = await getLock("test-lock");
614621
expect(lock).not.toBeNull();
615622
expect(lock!.lock_name).toBe("test-lock");
616623

617-
// second acquire should fail
618-
const second = await storage.acquireLock("test-lock", 60000);
619-
expect(second).toBeNull();
624+
let second = await storage.acquireLocks(["test-lock"], 60000);
625+
expect(second[0]).toBeNull();
626+
627+
await storage.releaseLocks(["test-lock"]);
628+
expect(await getLock("test-lock")).toBeNull();
629+
second = await storage.acquireLocks(["test-lock"], 60000);
630+
expect(second[0]).not.toBeNull();
620631
});
621632

622633
it("should acquire lock when existing lock is expired", async () => {
@@ -628,22 +639,41 @@ describe("IndexerStorage", () => {
628639
.bind("test-lock", expiredTime - 60000, expiredTime)
629640
.run();
630641

631-
const token = await storage.acquireLock("test-lock", 60000);
632-
expect(token).not.toBeNull();
642+
const tokens = await storage.acquireLocks(["test-lock"], 60000);
643+
expect(tokens[0]).not.toBeNull();
633644

634645
const lock = await getLock("test-lock");
635646
expect(lock!.expires_at).toBeGreaterThan(Date.now());
636647
});
637648

638-
it("should release lock and allow reacquiring", async () => {
639-
const token = await storage.acquireLock("test-lock", 60000);
640-
expect(token).not.toBeNull();
649+
it("should acquire multiple locks with partial success", async () => {
650+
await storage.acquireLocks(["lock-x"], 60000);
641651

642-
await storage.releaseLock("test-lock");
643-
expect(await getLock("test-lock")).toBeNull();
652+
const tokens = await storage.acquireLocks(["lock-y", "lock-x", "lock-z"], 60000);
653+
expect(tokens).toHaveLength(3);
654+
expect(tokens[0]).not.toBeNull();
655+
expect(tokens[1]).toBeNull();
656+
expect(tokens[2]).not.toBeNull();
657+
658+
const lockX = await getLock("lock-x");
659+
expect(lockX).not.toBeNull();
660+
});
661+
662+
it("should acquire all locks when existing locks are expired", async () => {
663+
const now = Date.now();
664+
const expiredTime = now - 10000;
665+
await db
666+
.prepare(
667+
"INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)",
668+
)
669+
.bind("lock-x", expiredTime - 60000, expiredTime)
670+
.run();
644671

645-
const second = await storage.acquireLock("test-lock", 60000);
646-
expect(second).not.toBeNull();
672+
const tokens = await storage.acquireLocks(["lock-y", "lock-x", "lock-z"], 60000);
673+
expect(tokens).toHaveLength(3);
674+
expect(tokens[0]).toBeGreaterThan(now);
675+
expect(tokens[1]).toBeGreaterThan(now);
676+
expect(tokens[2]).toBeGreaterThan(now);
647677
});
648678
});
649679
});

packages/sui-indexer/src/storage.ts

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -855,35 +855,47 @@ export class D1Storage {
855855
};
856856
}
857857

858-
async acquireLock(lockName: string, ttlMs: number): Promise<number | null> {
858+
async acquireLocks(lockNames: string[], ttlMs: number): Promise<(number | null)[]> {
859+
if (lockNames.length === 0) return [];
860+
859861
const now = Date.now();
862+
const expiresAt = now + ttlMs;
863+
const valueRows = lockNames.map(() => "(?, ?, ?)").join(", ");
864+
const params = lockNames.flatMap((name) => [name, now, expiresAt]);
865+
860866
try {
861-
const result = await this.db
867+
const { results } = await this.db
862868
.prepare(
863869
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
864-
VALUES (?, ?, ?)
870+
VALUES ${valueRows}
865871
ON CONFLICT(lock_name) DO UPDATE
866-
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
867-
WHERE cron_locks.expires_at <= excluded.acquired_at
868-
RETURNING acquired_at`,
872+
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
873+
WHERE cron_locks.expires_at <= excluded.acquired_at
874+
RETURNING lock_name, acquired_at`,
869875
)
870-
.bind(lockName, now, now + ttlMs)
871-
.first<number>("acquired_at");
872-
return result ?? null;
876+
.bind(...params)
877+
.all<{ lock_name: string; acquired_at: number }>();
878+
879+
const acquiredMap = new Map(results.map((r) => [r.lock_name, r.acquired_at]));
880+
return lockNames.map((name) => acquiredMap.get(name) ?? null);
873881
} catch (error) {
874-
logError({ msg: "Failed to acquire lock", method: "acquireLock", lockName }, error);
882+
logError({ msg: "Failed to acquire locks", method: "acquireLocks" }, error);
875883
throw error;
876884
}
877885
}
878886

879-
async releaseLock(lockName: string): Promise<void> {
887+
async releaseLocks(lockNames: string[]): Promise<void> {
888+
if (lockNames.length === 0) return;
889+
890+
const placeholders = lockNames.map(() => "?").join(", ");
891+
880892
try {
881893
await this.db
882-
.prepare(`DELETE FROM cron_locks WHERE lock_name = ?`)
883-
.bind(lockName)
894+
.prepare(`DELETE FROM cron_locks WHERE lock_name IN (${placeholders})`)
895+
.bind(...lockNames)
884896
.run();
885897
} catch (error) {
886-
logError({ msg: "Failed to release lock", method: "releaseLock", lockName }, error);
898+
logError({ msg: "Failed to release locks", method: "releaseLocks" }, error);
887899
throw error;
888900
}
889901
}

0 commit comments

Comments
 (0)