Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/btcindexer/db/migrations/0001_initial_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,9 @@ CREATE TABLE IF NOT EXISTS presign_objects (
) STRICT;

CREATE INDEX IF NOT EXISTS presign_objects_sui_network_created_at ON presign_objects(sui_network, created_at);

CREATE TABLE IF NOT EXISTS cron_locks (
lock_name TEXT NOT NULL PRIMARY KEY,
acquired_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
) STRICT;
1 change: 1 addition & 0 deletions packages/lib/src/test-helpers/init_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export const tables = [
"btc_blocks",
"indexer_state",
"presign_objects",
"cron_locks",
"setups",
];

Expand Down
40 changes: 28 additions & 12 deletions packages/sui-indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,35 @@ export default {
},
async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
const storage = new D1Storage(env.DB);
const activeNetworks = await storage.getActiveNetworks();

const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
const suiClients = await createSuiClients(activeNetworks, mnemonic);

// Run both indexer and redeem solver tasks in parallel
const results = await Promise.allSettled([
runSuiIndexer(storage, activeNetworks, suiClients),
runRedeemSolver(storage, env, suiClients, activeNetworks),
]);
const lockToken = await storage.acquireLock("sui-indexer-cron", 5 * 60 * 1000); // 5 minutes
if (lockToken === null) {
logger.warn({
msg: "Cron job already running, skipping this execution",
lockName: "sui-indexer-cron",
});
return;
}

// Check for any rejected promises and log errors
reportErrors(results, "scheduled", "Scheduled task error", ["SuiIndexer", "RedeemSolver"]);
try {
const activeNetworks = await storage.getActiveNetworks();

const mnemonic = await getSecret(env.NBTC_MINTING_SIGNER_MNEMONIC);
const suiClients = await createSuiClients(activeNetworks, mnemonic);

// Run both indexer and redeem solver tasks in parallel
const results = await Promise.allSettled([
runSuiIndexer(storage, activeNetworks, suiClients),
runRedeemSolver(storage, env, suiClients, activeNetworks),
]);

// Check for any rejected promises and log errors
reportErrors(results, "scheduled", "Scheduled task error", [
"SuiIndexer",
"RedeemSolver",
]);
} finally {
await storage.releaseLock("sui-indexer-cron", lockToken);
}
},
} satisfies ExportedHandler<Env>;

Expand Down
81 changes: 81 additions & 0 deletions packages/sui-indexer/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,85 @@ describe("IndexerStorage", () => {
expect(inputs[0]!.input_index).toBe(0);
expect(inputs[1]!.input_index).toBe(1);
});

describe("Distributed Lock", () => {
it("should acquire lock when none exists", async () => {
const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();

const lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first<{ lock_name: string }>();
expect(lock).not.toBeNull();
expect(lock!.lock_name).toBe("test-lock");
});

it("should fail to acquire lock when already held (not expired)", async () => {
const first = await storage.acquireLock("test-lock", 60000);
expect(first).not.toBeNull();

const second = await storage.acquireLock("test-lock", 60000);
expect(second).toBeNull();
});

it("should acquire lock when existing lock is expired", async () => {
const expiredTime = Date.now() - 10000;
await db
.prepare(
"INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)",
)
.bind("test-lock", expiredTime - 60000, expiredTime)
.run();

const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();

const lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first<{ expires_at: number }>();
expect(lock!.expires_at).toBeGreaterThan(Date.now());
});

it("should release lock with matching token", async () => {
const token = await storage.acquireLock("test-lock", 60000);
expect(token).not.toBeNull();

await storage.releaseLock("test-lock", token!);

const lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first();
expect(lock).toBeNull();
});

it("should allow reacquiring lock after release", async () => {
const first = await storage.acquireLock("test-lock", 60000);
expect(first).not.toBeNull();

await storage.releaseLock("test-lock", first!);

const second = await storage.acquireLock("test-lock", 60000);
expect(second).not.toBeNull();
});

it("should not release another instance's lock after expiry", async () => {
const tokenA = await storage.acquireLock("test-lock", 10);
expect(tokenA).not.toBeNull();
await new Promise((resolve) => setTimeout(resolve, 20));
const tokenB = await storage.acquireLock("test-lock", 60000);
expect(tokenB).not.toBeNull();

await storage.releaseLock("test-lock", tokenA!);

const lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first<{ acquired_at: number }>();
expect(lock).not.toBeNull();
expect(lock!.acquired_at).toBe(tokenB!);
});
});
});
36 changes: 36 additions & 0 deletions packages/sui-indexer/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,42 @@ export class D1Storage {
sui_network: toSuiNet(result.sui_network),
};
}

async acquireLock(lockName: string, ttlMs: number): Promise<number | null> {
const now = Date.now();
try {
const result = await this.db
.prepare(
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
VALUES (?, ?, ?)
ON CONFLICT(lock_name) DO UPDATE
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
WHERE cron_locks.expires_at <= excluded.acquired_at
RETURNING acquired_at`,
)
.bind(lockName, now, now + ttlMs)
.first<number>("acquired_at");
return result ?? null;
} catch (error) {
logError({ msg: "Failed to acquire lock", method: "acquireLock", lockName }, error);
throw error;
}
}

async releaseLock(lockName: string, acquiredAt: number): Promise<void> {
try {
await this.db
.prepare(`DELETE FROM cron_locks WHERE lock_name = ? AND acquired_at = ?`)
.bind(lockName, acquiredAt)
.run();
} catch (error) {
logError(
{ msg: "Failed to release lock", method: "releaseLock", lockName, acquiredAt },
error,
);
throw error;
}
}
}

export async function insertRedeemRequest(
Expand Down