Skip to content

Commit a68c282

Browse files
committed
Merge branch 'complete'
2 parents a7e8764 + 572a7bd commit a68c282

File tree

24 files changed

+3877
-486
lines changed

24 files changed

+3877
-486
lines changed

.mdc/convex_rules.mdc

Lines changed: 676 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ await pool.enqueueAction(ctx, internal.email.send, args, {
9595
export const emailSent = internalMutation({
9696
args: {
9797
workId: workIdValidator,
98+
result: resultValidator,
9899
context: v.object({ emailType: v.string(), userId: v.id("users") }),
99-
result: runResultValidator,
100100
},
101101
handler: async (ctx, args) => {
102102
if (args.result.kind === "canceled") return;
@@ -303,25 +303,16 @@ You can read the status of a function by calling `pool.status(id)`.
303303

304304
The status will be one of:
305305

306-
- `{ kind: "pending" }`: The function has not started yet.
307-
- `{ kind: "inProgress" }`: The function is currently running.
308-
- `{ kind: "completed"; completionStatus: CompletionStatus }`: The function has
309-
finished.
310-
311-
The `CompletionStatus` type is one of:
312-
313-
- `"success"`: The function completed successfully.
314-
- `"error"`: The function threw an error.
315-
- `"canceled"`: The function was canceled.
316-
- `"timeout"`: The function timed out.
306+
- `{ kind: "pending"; previousAttempts: number }`: The function has not started yet.
307+
- `{ kind: "running"; previousAttempts: number }`: The function is currently running.
308+
- `{ kind: "finished" }`: The function has succeeded, failed, or been canceled.
317309

318310
## Canceling work
319311

320-
You can cancel work by calling `pool.cancel(id)`.
312+
You can cancel work by calling `pool.cancel(id)` or all of them with
313+
`pool.cancelAll()`.
321314

322-
This will remove the work from the queue and mark it as canceled.
323-
If the work has already started, it will wait for it to finish, but not allow
324-
retries.
315+
This will avoid starting or retrying, but will not stop in-progress work.
325316

326317
<!-- END: Include on https://convex.dev/components -->
327318

example/convex/_generated/api.d.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,14 @@ export declare const components: {
8080
"query",
8181
"internal",
8282
{ id: string },
83-
| { attempt: number; state: "pending" }
84-
| { attempt: number; state: "running" }
83+
| { previousAttempts: number; state: "pending" }
84+
| { previousAttempts: number; state: "running" }
8585
| { state: "finished" }
8686
>;
8787
};
88+
stats: {
89+
queueLength: FunctionReference<"query", "internal", {}, number>;
90+
};
8891
};
8992
bigPool: {
9093
lib: {
@@ -126,11 +129,14 @@ export declare const components: {
126129
"query",
127130
"internal",
128131
{ id: string },
129-
| { attempt: number; state: "pending" }
130-
| { attempt: number; state: "running" }
132+
| { previousAttempts: number; state: "pending" }
133+
| { previousAttempts: number; state: "running" }
131134
| { state: "finished" }
132135
>;
133136
};
137+
stats: {
138+
queueLength: FunctionReference<"query", "internal", {}, number>;
139+
};
134140
};
135141
serializedPool: {
136142
lib: {
@@ -172,10 +178,13 @@ export declare const components: {
172178
"query",
173179
"internal",
174180
{ id: string },
175-
| { attempt: number; state: "pending" }
176-
| { attempt: number; state: "running" }
181+
| { previousAttempts: number; state: "pending" }
182+
| { previousAttempts: number; state: "running" }
177183
| { state: "finished" }
178184
>;
179185
};
186+
stats: {
187+
queueLength: FunctionReference<"query", "internal", {}, number>;
188+
};
180189
};
181190
};

example/convex/example.test.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ describe("workpool", () => {
3333
const id = await t.mutation(api.example.enqueueOneMutation, { data: 1 });
3434
expect(await t.query(api.example.status, { id })).toEqual({
3535
state: "pending",
36-
attempt: 0,
36+
previousAttempts: 0,
3737
});
3838
expect(await t.query(api.example.queryData, {})).toEqual([]);
3939
await t.finishAllScheduledFunctions(vi.runAllTimers);
@@ -49,9 +49,11 @@ describe("workpool", () => {
4949
await t.mutation(api.example.enqueueOneMutation, { data: i });
5050
}
5151
await t.finishAllScheduledFunctions(vi.runAllTimers);
52-
expect(await t.query(api.example.queryData, {})).toEqual([
53-
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
54-
]);
52+
expect(await t.query(api.example.queryData, {})).toEqual(
53+
expect.arrayContaining([
54+
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
55+
])
56+
);
5557
});
5658

5759
test("enqueueMany with high parallelism", async () => {

example/convex/example.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
WorkId,
1111
workIdValidator,
1212
Workpool,
13-
runResultValidator,
13+
resultValidator,
1414
} from "@convex-dev/workpool";
1515
import { v } from "convex/values";
1616
import { createLogger } from "../../src/component/logging";
@@ -24,19 +24,19 @@ const bigPool = new Workpool(components.bigPool, {
2424
base: 2,
2525
},
2626
retryActionsByDefault: true,
27-
logLevel: "DEBUG",
27+
logLevel: "WARN",
2828
});
2929
const smallPool = new Workpool(components.smallPool, {
3030
maxParallelism: 3,
3131
retryActionsByDefault: true,
32-
logLevel: "INFO",
32+
logLevel: "WARN",
3333
});
3434
const serializedPool = new Workpool(components.serializedPool, {
3535
maxParallelism: 1,
3636
retryActionsByDefault: true,
37-
logLevel: "INFO",
37+
logLevel: "WARN",
3838
});
39-
const console = createLogger("WARN");
39+
const console = createLogger("DEBUG");
4040

4141
export const addMutation = mutation({
4242
args: { data: v.optional(v.number()) },
@@ -235,10 +235,10 @@ export const onComplete = internalMutation({
235235
args: {
236236
workId: workIdValidator,
237237
context: v.number(),
238-
result: runResultValidator,
238+
result: resultValidator,
239239
},
240240
handler: async (ctx, args) => {
241-
console.warn("total", (Date.now() - args.context) / 1000);
241+
console.info("total", (Date.now() - args.context) / 1000);
242242
},
243243
});
244244

@@ -274,7 +274,7 @@ export const runPaced = internalAction({
274274

275275
export const cancel = internalAction({
276276
args: {
277-
id: workIdValidator,
277+
id: v.optional(workIdValidator),
278278
},
279279
handler: async (ctx, args) => {
280280
console.debug("Canceling", args.id);

package.json

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
"email": "support@convex.dev",
88
"url": "https://github.com/get-convex/workpool/issues"
99
},
10-
"version": "0.2.0-beta.0",
10+
"version": "0.2.0",
1111
"license": "Apache-2.0",
1212
"keywords": [
1313
"convex",
@@ -24,14 +24,15 @@
2424
"build:cjs": "tsc --project ./commonjs.json && echo '{\\n \"type\": \"commonjs\"\\n}' > dist/commonjs/package.json",
2525
"dev": "cd example; npm run dev",
2626
"typecheck": "tsc --noEmit",
27-
"prepare": "npm run build",
28-
"prepack": "node node10stubs.mjs",
29-
"postpack": "node node10stubs.mjs --cleanup",
30-
"test": "vitest",
3127
"lint": "tsc --noEmit && eslint . && prettier --check .",
3228
"format": "prettier --write .",
29+
"test": "vitest run",
30+
"test:watch": "vitest",
3331
"test:debug": "vitest --inspect-brk --no-file-parallelism",
34-
"test:coverage": "vitest run --coverage --coverage.reporter=text"
32+
"test:coverage": "vitest run --coverage --coverage.reporter=text",
33+
"prepare": "npm run build",
34+
"prepack": "node node10stubs.mjs",
35+
"postpack": "node node10stubs.mjs --cleanup"
3536
},
3637
"files": [
3738
"dist",

src/client/index.ts

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,22 @@ import {
88
import { v, VString } from "convex/values";
99
import { Mounts } from "../component/_generated/api.js";
1010
import {
11+
DEFAULT_LOG_LEVEL,
12+
type LogLevel,
13+
logLevel,
14+
} from "../component/logging.js";
15+
import {
16+
Config,
17+
DEFAULT_MAX_PARALLELISM,
1118
OnComplete,
12-
runResult as runResultValidator,
13-
RunResult,
19+
runResult as resultValidator,
1420
type RetryBehavior,
21+
RunResult,
1522
OnCompleteArgs as SharedOnCompleteArgs,
1623
Status,
17-
Config,
1824
} from "../component/shared.js";
19-
import { type LogLevel, logLevel } from "../component/logging.js";
2025
import { RunMutationCtx, RunQueryCtx, UseApi } from "./utils.js";
21-
import { DEFAULT_LOG_LEVEL } from "../component/logging.js";
22-
import { DEFAULT_MAX_PARALLELISM } from "../component/kick.js";
23-
export { runResultValidator, type RunResult };
26+
export { resultValidator, type RunResult };
2427

2528
// Attempts will run with delay [0, 250, 500, 1000, 2000] (ms)
2629
export const DEFAULT_RETRY_BEHAVIOR: RetryBehavior = {
@@ -133,11 +136,18 @@ export class Workpool {
133136
fnArgs: Args,
134137
options?: CallbackOptions & SchedulerOptions
135138
): Promise<WorkId> {
139+
const onComplete: OnComplete | undefined = options?.onComplete
140+
? {
141+
fnHandle: await createFunctionHandle(options.onComplete),
142+
context: options.context,
143+
}
144+
: undefined;
136145
const id = await ctx.runMutation(this.component.lib.enqueue, {
137146
...(await defaultEnqueueArgs(fn, this.options)),
138147
fnArgs,
139148
fnType: "mutation",
140149
runAt: getRunAt(options),
150+
onComplete,
141151
});
142152
return id as WorkId;
143153
}
@@ -215,7 +225,7 @@ export type CallbackOptions = {
215225
* args: {
216226
* workId: workIdValidator,
217227
* context: v.any(),
218-
* result: runResult,
228+
* result: resultValidator,
219229
* },
220230
* handler: async (ctx, args) => {
221231
* console.log(args.result, "Got Context back -> ", args.context, Date.now() - args.context);

src/component/README.md

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@ Concepts:
2525
flowchart LR
2626
Client -->|enqueue| pendingStart
2727
Client -->|cancel| pendingCancelation
28-
Recovery-->|recover| pendingCancelation
29-
Recovery-->|recover| pendingCompletion
30-
Worker-->|"saveResult"| pendingCompletion
31-
pendingStart -->|main| workerRunning["internalState.running"]
32-
workerRunning-->|"main(pendingCompletion)"| Retry{"Needs retry?"}
33-
Retry-->|"no / canceled"| complete
34-
Retry-->|yes| pendingStart
35-
pendingStart-->|"main(pendingCancelation)"| complete
28+
complete --> |success or failure| pendingCompletion
29+
pendingCompletion -->|retry| pendingStart
30+
pendingStart --> workerRunning["worker running"]
31+
workerRunning -->|worker finished| complete
32+
workerRunning --> |recovery| complete
33+
successfulCancel["AND"]@{shape: delay} --> |canceled| complete
34+
pendingStart --> successfulCancel
35+
pendingCancelation --> successfulCancel
3636
```
3737

3838
Notably:
3939

40-
- The pending\* states are only written by other sources.
40+
- The pending\* states are written by outside sources.
4141
- The main loop federates changes to/from "running"
4242
- Canceling only impacts pending and retrying jobs.
4343

@@ -53,12 +53,12 @@ flowchart TD
5353
running-->|"all done"| idle
5454
```
5555

56-
- While the loop is running, clients won't see database conflicts with the
57-
state changing.
58-
- The "saturated" state is concretely "running" or "scheduled" with a boolean
59-
set, to avoid clients from kicking the main loop on enqueueing, which is
60-
unlikely to be productive, since the next action needs to be something
61-
terminating.
56+
- While the loop is running, the runStatus doesn't change, making it safer to
57+
read from clients without database conflicts.
58+
- The "saturated" state is concretely "running" or "scheduled" at max
59+
parallelism. There is a boolean set on "scheduled" to avoid clients from
60+
kicking the main loop on enqueueing, which is unlikely to be productive, since
61+
the next action needs to be something terminating.
6262

6363
## Retention optimization strategy
6464

src/component/_generated/api.d.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* @module
99
*/
1010

11+
import type * as complete from "../complete.js";
1112
import type * as kick from "../kick.js";
1213
import type * as lib from "../lib.js";
1314
import type * as logging from "../logging.js";
@@ -31,6 +32,7 @@ import type {
3132
* ```
3233
*/
3334
declare const fullApi: ApiFromModules<{
35+
complete: typeof complete;
3436
kick: typeof kick;
3537
lib: typeof lib;
3638
logging: typeof logging;
@@ -80,11 +82,14 @@ export type Mounts = {
8082
"query",
8183
"public",
8284
{ id: string },
83-
| { attempt: number; state: "pending" }
84-
| { attempt: number; state: "running" }
85+
| { previousAttempts: number; state: "pending" }
86+
| { previousAttempts: number; state: "running" }
8587
| { state: "finished" }
8688
>;
8789
};
90+
stats: {
91+
queueLength: FunctionReference<"query", "public", {}, number>;
92+
};
8893
};
8994
// For now fullApiWithMounts is only fullApi which provides
9095
// jump-to-definition in component client code.

0 commit comments

Comments
 (0)