Skip to content

Commit 59d2f68

Browse files
authored
simplify runAtTime and remove debounceMs (#9)
* simplify runAtTime and remove debounceMs * .
1 parent 2d8e0ef commit 59d2f68

File tree

5 files changed

+42
-73
lines changed

5 files changed

+42
-73
lines changed

example/convex/_generated/api.d.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ export declare const components: {
5959
fnType: "action" | "mutation";
6060
options: {
6161
actionTimeoutMs?: number;
62-
debounceMs?: number;
6362
fastHeartbeatMs?: number;
6463
logLevel?: "DEBUG" | "INFO" | "WARN" | "ERROR";
6564
maxParallelism: number;
@@ -105,7 +104,6 @@ export declare const components: {
105104
fnType: "action" | "mutation";
106105
options: {
107106
actionTimeoutMs?: number;
108-
debounceMs?: number;
109107
fastHeartbeatMs?: number;
110108
logLevel?: "DEBUG" | "INFO" | "WARN" | "ERROR";
111109
maxParallelism: number;

src/client/index.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ export class WorkPool {
2020
constructor(
2121
private component: UseApi<typeof api>,
2222
// TODO(emma) reduce the number of options. consider removing the timeout options.
23-
// consider removing the debounceMs option and the heartbeats.
23+
// consider removing the heartbeats.
2424
private options: {
2525
/** How many actions/mutations can be running at once within this pool.
2626
* Min 1, Max 300.
@@ -42,11 +42,6 @@ export class WorkPool {
4242
* Default 15 minutes.
4343
*/
4444
unknownTimeoutMs?: number;
45-
/** When there is something to do, wait this long between loop iterations,
46-
* to allow more work to accumulate.
47-
* Default 50ms.
48-
*/
49-
debounceMs?: number;
5045
/** When something is running, wait this long to check if anything has
5146
* been canceled or failed unexpectedly.
5247
* Default 10s.

src/component/_generated/api.d.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ export type Mounts = {
4444
fnType: "action" | "mutation";
4545
options: {
4646
actionTimeoutMs?: number;
47-
debounceMs?: number;
4847
fastHeartbeatMs?: number;
4948
logLevel?: "DEBUG" | "INFO" | "WARN" | "ERROR";
5049
maxParallelism: number;

src/component/lib.ts

Lines changed: 36 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ export const enqueue = mutation({
3030
maxParallelism: v.number(),
3131
actionTimeoutMs: v.optional(v.number()),
3232
mutationTimeoutMs: v.optional(v.number()),
33-
debounceMs: v.optional(v.number()),
3433
fastHeartbeatMs: v.optional(v.number()),
3534
slowHeartbeatMs: v.optional(v.number()),
3635
logLevel: v.optional(logLevel),
@@ -39,12 +38,10 @@ export const enqueue = mutation({
3938
},
4039
returns: v.id("pendingWork"),
4140
handler: async (ctx, { fnHandle, fnName, options, fnArgs, fnType }) => {
42-
const debounceMs = options.debounceMs ?? 50;
4341
await ensurePoolExists(ctx, {
4442
maxParallelism: options.maxParallelism,
4543
actionTimeoutMs: options.actionTimeoutMs ?? 15 * 60 * 1000,
4644
mutationTimeoutMs: options.mutationTimeoutMs ?? 30 * 1000,
47-
debounceMs,
4845
fastHeartbeatMs: options.fastHeartbeatMs ?? 10 * 1000,
4946
slowHeartbeatMs: options.slowHeartbeatMs ?? 2 * 60 * 60 * 1000,
5047
ttl: options.ttl ?? 24 * 60 * 60 * 1000,
@@ -56,7 +53,7 @@ export const enqueue = mutation({
5653
fnArgs,
5754
fnType,
5855
});
59-
await kickMainLoop(ctx, debounceMs, false);
56+
await kickMainLoop(ctx, 0, false);
6057
return workId;
6158
},
6259
});
@@ -90,37 +87,16 @@ const BATCH_SIZE = 10;
9087
// There should only ever be at most one of these scheduled or running.
9188
// The scheduled one is in the "mainLoop" table.
9289
export const mainLoop = internalMutation({
93-
args: {
94-
generation: v.number(),
95-
},
96-
handler: async (ctx, args) => {
90+
args: { },
91+
handler: async (ctx, _args) => {
9792
const console_ = await console(ctx);
9893

99-
// Make sure mainLoop is serialized.
100-
const loopDoc = await ctx.db.query("mainLoop").unique();
101-
const expectedGeneration = loopDoc?.generation ?? 0;
102-
if (expectedGeneration !== args.generation) {
103-
throw new Error(
104-
`mainLoop generation mismatch ${expectedGeneration} !== ${args.generation}`
105-
);
106-
}
107-
if (loopDoc) {
108-
await ctx.db.patch(loopDoc._id, { generation: args.generation + 1 });
109-
} else {
110-
await ctx.db.insert("mainLoop", {
111-
generation: args.generation + 1,
112-
// Don't know when it will next run. This will get patched later.
113-
runAtTime: Number.POSITIVE_INFINITY,
114-
});
115-
}
116-
11794
const options = await getOptions(ctx.db);
11895
if (!options) {
11996
console_.info("no pool, skipping mainLoop");
120-
await kickMainLoop(ctx, 60 * 60 * 1000, true);
12197
return;
12298
}
123-
const { maxParallelism, debounceMs, fastHeartbeatMs, slowHeartbeatMs } =
99+
const { maxParallelism, fastHeartbeatMs, slowHeartbeatMs } =
124100
options;
125101

126102
console_.time("inProgress count");
@@ -235,7 +211,7 @@ export const mainLoop = internalMutation({
235211
console_.time("kickMainLoop");
236212
if (didSomething) {
237213
// There might be more to do.
238-
await kickMainLoop(ctx, debounceMs, true);
214+
await kickMainLoop(ctx, 0, true);
239215
} else {
240216
// Decide when to wake up.
241217
const allInProgressWork = await ctx.db.query("inProgressWork").collect();
@@ -378,12 +354,11 @@ async function saveResultHandler(
378354
if (!options) {
379355
throw new Error("cannot save result with no pool");
380356
}
381-
const { debounceMs } = options;
382357
await ctx.db.insert("pendingCompletion", {
383358
completionStatus,
384359
workId,
385360
});
386-
await kickMainLoop(ctx, debounceMs, false);
361+
await kickMainLoop(ctx, 0, false);
387362
}
388363

389364
export const runMutationWrapper = internalMutation({
@@ -410,25 +385,21 @@ async function startMainLoopHandler(ctx: MutationCtx) {
410385
const console_ = await console(ctx);
411386
if (!mainLoop) {
412387
console_.debug("starting mainLoop");
413-
const fn = await ctx.scheduler.runAfter(0, internal.lib.mainLoop, {
414-
generation: 0,
415-
});
388+
await ctx.scheduler.runAfter(0, internal.lib.mainLoop, {});
416389
await ctx.db.insert("mainLoop", {
417-
fn,
418-
generation: 0,
419-
runAtTime: Date.now(),
390+
runAtTime: null,
391+
fn: null,
420392
});
421393
return;
422394
}
423-
const existingFn = mainLoop.fn ? await ctx.db.system.get(mainLoop.fn) : null;
424-
if (existingFn === null || existingFn.completedTime) {
425-
// mainLoop stopped, so we restart it.
426-
const fn = await ctx.scheduler.runAfter(0, internal.lib.mainLoop, {
427-
generation: mainLoop.generation,
428-
});
429-
await ctx.db.patch(mainLoop._id, { fn });
430-
console_.debug("mainLoop stopped, so we restarted it");
395+
if (mainLoop.fn === null) {
396+
console_.info("mainLoop should be actively running; if it's not, run `mainLoop` directly");
397+
return;
431398
}
399+
console_.debug("mainLoop is scheduled to run later, so run it now");
400+
await ctx.scheduler.cancel(mainLoop.fn);
401+
await ctx.db.patch(mainLoop._id, { fn: null, runAtTime: null });
402+
await ctx.scheduler.runAfter(0, internal.lib.mainLoop, {});
432403
}
433404

434405
export const startMainLoop = mutation({
@@ -464,10 +435,15 @@ async function kickMainLoop(
464435
delayMs: number,
465436
isCurrentlyExecuting: boolean
466437
): Promise<void> {
467-
const debounceMs = (await getOptions(ctx.db))?.debounceMs ?? 50;
468-
const delay = Math.max(delayMs, debounceMs);
469-
const runAtTime = Date.now() + delay;
470438
const console_ = await console(ctx);
439+
440+
if (delayMs <= 0 && isCurrentlyExecuting) {
441+
console_.debug("mainLoop is actively running and wants to keep running");
442+
await ctx.scheduler.runAfter(0, internal.lib.mainLoop, {});
443+
return;
444+
}
445+
446+
const runAtTime = Date.now() + delayMs;
471447
// Look for mainLoop documents that we want to reschedule.
472448
// If we're currently running mainLoop, we definitely want to reschedule.
473449
// Otherwise, only reschedule if the new runAtTime is earlier than the existing one.
@@ -493,13 +469,18 @@ async function kickMainLoop(
493469
if (!isCurrentlyExecuting && mainLoop.fn) {
494470
await ctx.scheduler.cancel(mainLoop.fn);
495471
}
496-
const fn = await ctx.scheduler.runAt(runAtTime, internal.lib.mainLoop, {
497-
generation: mainLoop.generation,
498-
});
499-
await ctx.db.patch(mainLoop._id, { fn, runAtTime });
500-
console_.debug(
501-
"mainLoop was scheduled later, so reschedule it to run sooner"
502-
);
472+
const fn = await ctx.scheduler.runAt(runAtTime, internal.lib.mainLoop, {});
473+
if (delayMs <= 0) {
474+
console_.debug(
475+
"mainLoop was scheduled later, so reschedule it to run now"
476+
);
477+
await ctx.db.patch(mainLoop._id, { fn: null, runAtTime: null });
478+
} else {
479+
console_.debug(
480+
"mainLoop was scheduled later, so reschedule it to run sooner"
481+
);
482+
await ctx.db.patch(mainLoop._id, { fn, runAtTime });
483+
}
503484
}
504485

505486
export const status = query({
@@ -567,9 +548,6 @@ async function ensurePoolExists(
567548
if (opts.maxParallelism < 1) {
568549
throw new Error("maxParallelism must be >= 1");
569550
}
570-
if (opts.debounceMs < 10) {
571-
throw new Error("debounceMs must be >= 10 to prevent OCCs");
572-
}
573551
const pool = await ctx.db.query("pools").unique();
574552
if (pool) {
575553
let update = false;

src/component/schema.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,28 +43,27 @@ To avoid OCCs, we restrict which mutations can read and write from each table:
4343
*/
4444

4545
export default defineSchema({
46-
// Statically configured.
46+
// Statically configured, singleton.
4747
pools: defineTable({
4848
maxParallelism: v.number(),
4949
actionTimeoutMs: v.number(),
5050
mutationTimeoutMs: v.number(),
51-
debounceMs: v.number(),
5251
fastHeartbeatMs: v.number(),
5352
slowHeartbeatMs: v.number(),
5453
ttl: v.number(),
5554
logLevel,
5655
}),
5756

58-
// State across all pools.
5957
// TODO(emma) change this to use a boolean or enum of statuses, instead of using runAtTime.
6058
// Status like "running", "waitingForJobCompletion", "idle".
6159
// Currently there's a problem if enqueue is called from a mutation that takes longer than
6260
// debounceMs to complete, and a mainLoop finishes and restarts in that time window. Then the enqueue will OCC with the mainLoop.
6361
// But if we have fixed statuses, we don't need to write it so frequently so it won't OCC. Chat with @ian about details.
6462
mainLoop: defineTable({
65-
fn: v.optional(v.id("_scheduled_functions")),
66-
generation: v.number(),
67-
runAtTime: v.number(),
63+
// null means it's actively running.
64+
runAtTime: v.union(v.number(), v.null()),
65+
// Only set if it's not actively running -- so it's scheduled to run in the future, at runAtTime.
66+
fn: v.union(v.id("_scheduled_functions"), v.null()),
6867
}).index("runAtTime", ["runAtTime"]),
6968

7069
pendingWork: defineTable({

0 commit comments

Comments
 (0)