Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/btcindexer/db/migrations/0001_initial_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ CREATE TABLE IF NOT EXISTS presign_objects (
) STRICT;

CREATE INDEX IF NOT EXISTS presign_objects_sui_network_created_at ON presign_objects(sui_network, created_at);

CREATE TABLE IF NOT EXISTS cron_locks (
lock_name TEXT NOT NULL PRIMARY KEY,
acquired_at INTEGER NOT NULL,
expires_at INTEGER NOT NULL
) STRICT;
1 change: 1 addition & 0 deletions packages/lib/src/test-helpers/init_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export const tables = [
"btc_blocks",
"indexer_state",
"presign_objects",
"cron_locks",
"setups",
];

Expand Down
34 changes: 25 additions & 9 deletions packages/sui-indexer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,32 @@ export default {
},
async scheduled(_event: ScheduledController, env: Env, _ctx: ExecutionContext): Promise<void> {
const storage = new D1Storage(env.DB);
const activeNetworks = await storage.getActiveNetworks();

// Run both indexer and redeem solver tasks in parallel
const results = await Promise.allSettled([
runSuiIndexer(storage, env, activeNetworks),
runRedeemSolver(storage, env, activeNetworks),
]);
const lockAcquired = await storage.acquireLock("sui-indexer-cron", 5 * 60 * 1000); // 5 minutes
if (!lockAcquired) {
logger.warn({
msg: "Cron job already running, skipping this execution",
lockName: "sui-indexer-cron",
});
return;
}

// Check for any rejected promises and log errors
reportErrors(results, "scheduled", "Scheduled task error", ["SuiIndexer", "RedeemSolver"]);
try {
const activeNetworks = await storage.getActiveNetworks();

// Run both indexer and redeem solver tasks in parallel
const results = await Promise.allSettled([
runSuiIndexer(storage, env, activeNetworks),
runRedeemSolver(storage, env, activeNetworks),
]);

// Check for any rejected promises and log errors
reportErrors(results, "scheduled", "Scheduled task error", [
"SuiIndexer",
"RedeemSolver",
]);
} finally {
await storage.releaseLock("sui-indexer-cron");
}
},
} satisfies ExportedHandler<Env>;

Expand Down
79 changes: 79 additions & 0 deletions packages/sui-indexer/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,83 @@ describe("IndexerStorage", () => {
expect(inputs[0]!.input_index).toBe(0);
expect(inputs[1]!.input_index).toBe(1);
});

describe("Distributed Lock", () => {
it("should acquire lock when none exists", async () => {
const acquired = await storage.acquireLock("test-lock", 60000);
expect(acquired).toBe(true);
const lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first<{ lock_name: string; acquired_at: number; expires_at: number }>();
expect(lock).not.toBeNull();
expect(lock!.lock_name).toBe("test-lock");
});

it("should fail to acquire lock when already held (not expired)", async () => {
const first = await storage.acquireLock("test-lock", 60000);
expect(first).toBe(true);
const second = await storage.acquireLock("test-lock", 60000);
expect(second).toBe(false);
});

it("should acquire lock when existing lock is expired", async () => {
const expiredTime = Date.now() - 10000;
await db
.prepare(
"INSERT INTO cron_locks (lock_name, acquired_at, expires_at) VALUES (?, ?, ?)",
)
.bind("test-lock", expiredTime - 60000, expiredTime)
.run();

const acquired = await storage.acquireLock("test-lock", 60000);
expect(acquired).toBe(true);

const lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first<{ expires_at: number }>();
expect(lock!.expires_at).toBeGreaterThan(Date.now());
});

it("should release lock", async () => {
await storage.acquireLock("test-lock", 60000);

let lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first();
expect(lock).not.toBeNull();

await storage.releaseLock("test-lock");

lock = await db
.prepare("SELECT * FROM cron_locks WHERE lock_name = ?")
.bind("test-lock")
.first();
expect(lock).toBeNull();
});

it("should allow reacquiring lock after release", async () => {
const first = await storage.acquireLock("test-lock", 60000);
expect(first).toBe(true);
await storage.releaseLock("test-lock");
const second = await storage.acquireLock("test-lock", 60000);
expect(second).toBe(true);
});

it("should handle multiple different locks independently", async () => {
const lock1 = await storage.acquireLock("lock-1", 60000);
const lock2 = await storage.acquireLock("lock-2", 60000);

expect(lock1).toBe(true);
expect(lock2).toBe(true);
await storage.releaseLock("lock-1");
const lock1Again = await storage.acquireLock("lock-1", 60000);
const lock2Again = await storage.acquireLock("lock-2", 60000);

expect(lock1Again).toBe(true);
expect(lock2Again).toBe(false);
});
});
});
24 changes: 24 additions & 0 deletions packages/sui-indexer/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,30 @@ export class D1Storage {
sui_network: toSuiNet(result.sui_network),
};
}

async acquireLock(lockName: string, ttlMs: number): Promise<boolean> {
const now = Date.now();
try {
const result = await this.db
.prepare(
`INSERT INTO cron_locks (lock_name, acquired_at, expires_at)
VALUES (?, ?, ?)
ON CONFLICT(lock_name) DO UPDATE
SET acquired_at = excluded.acquired_at, expires_at = excluded.expires_at
WHERE cron_locks.expires_at < excluded.acquired_at
RETURNING 1`,
)
.bind(lockName, now, now + ttlMs)
.first();
return !!result;
} catch {
return false;
}
}

async releaseLock(lockName: string): Promise<void> {
await this.db.prepare(`DELETE FROM cron_locks WHERE lock_name = ?`).bind(lockName).run();
}
}

export async function insertRedeemRequest(
Expand Down
Loading