Skip to content

Commit 29981ca

Browse files
authored
PoC: Race condition prevention in worker (#352)
* PoC: Race condition in worker prevention for cron execution to ensure it runs one at a time across all worker instances Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved comments Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved comments Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved prettier fix Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved comments Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Taking agent.md file from master Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Fixed agent.md lint and prettier error Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved merged conflicts Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved comments Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> * Resolved comments Signed-off-by: Rayane Charif <rayane.charif@gonative.cc> --------- Signed-off-by: Rayane Charif <rayane.charif@gonative.cc>
1 parent d54fd72 commit 29981ca

File tree

5 files changed

+120
-12
lines changed

5 files changed

+120
-12
lines changed

packages/btcindexer/db/migrations/0001_initial_schema.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,9 @@ CREATE TABLE IF NOT EXISTS presign_objects (
127127
) STRICT;
128128

129129
CREATE INDEX IF NOT EXISTS presign_objects_sui_network_created_at ON presign_objects(sui_network, created_at);
130+
131+
CREATE TABLE IF NOT EXISTS cron_locks (
132+
lock_name TEXT NOT NULL PRIMARY KEY,
133+
acquired_at INTEGER NOT NULL,
134+
expires_at INTEGER NOT NULL
135+
) STRICT;

packages/lib/src/test-helpers/init_db.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ export const tables = [
4242
"btc_blocks",
4343
"indexer_state",
4444
"presign_objects",
45+
"cron_locks",
4546
"setups",
4647
];
4748

packages/sui-indexer/src/index.ts

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,38 @@ export default {
2525
},
2626
async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
2727
const storage = new D1Storage(env.DB);
28-
const activeNetworks = await storage.getActiveNetworks();
29-
30-
const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
31-
const suiClients = await createSuiClients(activeNetworks, mnemonic);
32-
33-
// Run both indexer and redeem solver tasks in parallel
34-
const results = await Promise.allSettled([
35-
runSuiIndexer(storage, activeNetworks, suiClients),
36-
runRedeemSolver(storage, env, suiClients, activeNetworks),
37-
]);
28+
// Distributed lock to prevent overlapping cron executions from selecting
29+
// same Sui coins. Since CF Workers doesn't guarantee sequential cron running,
30+
// if a run exceeds the 1-min interval, the next trigger starts concurrently.
31+
const lockToken = await storage.acquireLock("cron-sui-indexer", 5 * 60 * 1000);
32+
if (lockToken === null) {
33+
logger.warn({
34+
msg: "Cron job already running, skipping this execution",
35+
lockName: "cron-sui-indexer",
36+
});
37+
return;
38+
}
3839

39-
// Check for any rejected promises and log errors
40-
reportErrors(results, "scheduled", "Scheduled task error", ["SuiIndexer", "RedeemSolver"]);
40+
try {
41+
const activeNetworks = await storage.getActiveNetworks();
42+
43+
const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
44+
const suiClients = await createSuiClients(activeNetworks, mnemonic);
45+
46+
// Run both indexer and redeem solver tasks in parallel
47+
const results = await Promise.allSettled([
48+
runSuiIndexer(storage, activeNetworks, suiClients),
49+
runRedeemSolver(storage, env, suiClients, activeNetworks),
50+
]);
51+
52+
// Check for any rejected promises and log errors
53+
reportErrors(results, "scheduled", "Scheduled task error", [
54+
"SuiIndexer",
55+
"RedeemSolver",
56+
]);
57+
} finally {
58+
await storage.releaseLock("cron-sui-indexer");
59+
}
4160
},
4261
} satisfies ExportedHandler<Env>;
4362

packages/sui-indexer/src/storage.test.ts

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,4 +597,53 @@ describe("IndexerStorage", () => {
597597
expect(inputs[0]!.input_index).toBe(0);
598598
expect(inputs[1]!.input_index).toBe(1);
599599
});
600+
601+
describe("Distributed Lock", () => {
602+
async function getLock(lockName: string) {
603+
return db
604+
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
605+
.bind(lockName)
606+
.first<{ lock_name: string; acquired_at: number; expires_at: number }>();
607+
}
608+
609+
it("should acquire lock and reject duplicate", async () => {
610+
const token = await storage.acquireLock("test-lock", 60000);
611+
expect(token).not.toBeNull();
612+
613+
const lock = await getLock("test-lock");
614+
expect(lock).not.toBeNull();
615+
expect(lock!.lock_name).toBe("test-lock");
616+
617+
// second acquire should fail
618+
const second = await storage.acquireLock("test-lock", 60000);
619+
expect(second).toBeNull();
620+
});
621+
622+
it("should acquire lock when existing lock is expired", async () => {
623+
const expiredTime = Date.now() - 10000;
624+
await db
625+
.prepare(
626+
"INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)",
627+
)
628+
.bind("test-lock", expiredTime - 60000, expiredTime)
629+
.run();
630+
631+
const token = await storage.acquireLock("test-lock", 60000);
632+
expect(token).not.toBeNull();
633+
634+
const lock = await getLock("test-lock");
635+
expect(lock!.expires_at).toBeGreaterThan(Date.now());
636+
});
637+
638+
it("should release lock and allow reacquiring", async () => {
639+
const token = await storage.acquireLock("test-lock", 60000);
640+
expect(token).not.toBeNull();
641+
642+
await storage.releaseLock("test-lock");
643+
expect(await getLock("test-lock")).toBeNull();
644+
645+
const second = await storage.acquireLock("test-lock", 60000);
646+
expect(second).not.toBeNull();
647+
});
648+
});
600649
});

packages/sui-indexer/src/storage.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,39 @@ export class D1Storage {
854854
sui_network: toSuiNet(result.sui_network),
855855
};
856856
}
857+
858+
async acquireLock(lockName: string, ttlMs: number): Promise<number | null> {
859+
const now = Date.now();
860+
try {
861+
const result = await this.db
862+
.prepare(
863+
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
864+
VALUES (?, ?, ?)
865+
ON CONFLICT(lock_name) DO UPDATE
866+
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
867+
WHERE cron_locks.expires_at <= excluded.acquired_at
868+
RETURNING acquired_at`,
869+
)
870+
.bind(lockName, now, now + ttlMs)
871+
.first<number>("acquired_at");
872+
return result ?? null;
873+
} catch (error) {
874+
logError({ msg: "Failed to acquire lock", method: "acquireLock", lockName }, error);
875+
throw error;
876+
}
877+
}
878+
879+
async releaseLock(lockName: string): Promise<void> {
880+
try {
881+
await this.db
882+
.prepare(`DELETE FROM cron_locks WHERE lock_name = ?`)
883+
.bind(lockName)
884+
.run();
885+
} catch (error) {
886+
logError({ msg: "Failed to release lock", method: "releaseLock", lockName }, error);
887+
throw error;
888+
}
889+
}
857890
}
858891

859892
export async function insertRedeemRequest(

0 commit comments

Comments
 (0)