Skip to content

Conversation

@danthegoodman1
Copy link

@danthegoodman1 danthegoodman1 commented Dec 22, 2025

Support maxParallelism of 0 to enable pausing workpools. It appears like this mostly was supported other than just some validation and falsy checks (otherwise it handles 0 fine).

All tests passing.

Note: I did use Opus 4.5 to investigate what was needed to support maxParallelism of 0, but I investigated it and wrote the code myself.

For devs:

The best way to use this is to just set it:

workpool.options.maxParallelism = X

The next time anything runs, it will ensure it's synced with your code and pull in the latest value (including an enqueue).

Closes #133


By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

🫡

Summary by CodeRabbit

Release Notes

  • New Features

    • Added a kick method to workpool for triggering execution.
  • Bug Fixes

    • Updated maxParallelism constraint to allow 0 as the minimum value (previously required 1 or higher).

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 22, 2025

📝 Walkthrough

Walkthrough

Added a public Workpool.kick(ctx) method and a component-level kick mutation; validated config lower bound for maxParallelism changed from 1 to 0 and related checks/tests were updated to accept 0 (pausing the workpool).

Changes

Cohort / File(s) Summary
Client API
src/client/index.ts
Added Workpool.kick(ctx: RunMutationCtx): Promise<void> which calls ctx.runMutation with the workpool's kick function and passes { logLevel, maxParallelism } (with defaults). Also updated JSDoc for WorkpoolOptions.maxParallelism minimum from 1 to 0.
Component mutation
src/component/lib.ts
Added exported kick mutation that validates config and invokes kickMainLoop(ctx, "kick", config); updated validateConfig to accept maxParallelism >= 0 and adjusted the thrown error message accordingly.
Globals update logic
src/component/kick.ts
Relaxed update condition in getOrUpdateGlobals from a truthy check to an explicit undefined check so maxParallelism = 0 (or other falsy non-undefined values) can update globals.
Tests
src/component/lib.test.ts
Relaxed test expectations for maxParallelism lower bound: now expects "maxParallelism must be >= 0" and adjusts input accordingly.
sequenceDiagram
  rect rgba(200,230,255,0.4)
    participant Client as Workpool (Client)
    participant Runner as RunMutationCtx
    participant Component as Kick Mutation Handler
    participant Globals as Workpool Globals / kickMainLoop
  end

  Client->>Runner: kick(ctx) calls ctx.runMutation("kick", {config})
  Runner->>Component: invoke kick mutation handler with config
  Component->>Component: validateConfig(config)
  alt config valid
    Component->>Globals: kickMainLoop(ctx, "kick", config)
    Globals-->>Component: completes loop (maybe updates globals)
    Component-->>Runner: return null
    Runner-->>Client: resolved Promise<void>
  else invalid
    Component-->>Runner: throw validation error
    Runner-->>Client: rejects with error
  end
Loading

🎯 3 (Moderate) | ⏱️ ~20 minutes

🐇
I nudged the pool with quiet kick,
Zero hush, then tasks tick tick.
A pause, a hop, a gentle nudge—
Work flows, then waits, then softly budge. 🥕

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary change: enabling maxParallelism of 0 to pause workpools, which directly addresses the main objective of the pull request.
Linked Issues check ✅ Passed The PR fulfills issue #133 by enabling workpool pausing via maxParallelism = 0 and adds a public kick() method to resume processing, meeting both requirements.
Out of Scope Changes check ✅ Passed All changes are directly related to enabling maxParallelism = 0 for pausing workpools; validation relaxation, public kick() method, and test updates are all in-scope.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 35cc48b and 3a679e9.

⛔ Files ignored due to path filters (1)
  • src/component/_generated/component.ts is excluded by !**/_generated/**
📒 Files selected for processing (2)
  • src/client/index.ts
  • src/component/lib.ts
🧰 Additional context used
🧬 Code graph analysis (2)
src/component/lib.ts (3)
src/component/shared.ts (1)
  • config (36-39)
src/client/index.ts (1)
  • kick (261-268)
src/component/kick.ts (1)
  • kickMainLoop (20-73)
src/client/index.ts (3)
src/client/utils.ts (1)
  • RunMutationCtx (19-24)
src/component/logging.ts (1)
  • DEFAULT_LOG_LEVEL (3-3)
src/component/shared.ts (1)
  • DEFAULT_MAX_PARALLELISM (12-12)
🔇 Additional comments (4)
src/client/index.ts (2)

255-268: LGTM! The kick() method correctly handles maxParallelism = 0.

The implementation properly uses the nullish coalescing operator (??), which only substitutes the default when the value is null or undefined, not when it's 0. This ensures that maxParallelism: 0 is correctly passed to pause the workpool.

The method follows the same pattern as cancel() and cancelAll(), maintaining consistency across the codebase.


374-374: JSDoc update accurately reflects the new minimum value.

The documentation correctly indicates that 0 is now the minimum value and clarifies the behavior (no new work starts), which helps users understand the pausing mechanism.

src/component/lib.ts (2)

166-174: LGTM! The kick mutation is correctly implemented.

The mutation follows the established pattern from enqueue and enqueueBatch:

  • Validates config before processing
  • Calls kickMainLoop with the appropriate source identifier
  • Returns the expected type

This enables the client-side Workpool.kick() method to sync configuration changes (including pausing with maxParallelism = 0) without requiring an enqueue or cancel operation.


84-85: Validation correctly updated to allow pausing.

The change from < 1 to < 0 properly enables maxParallelism = 0 as a valid configuration for pausing the workpool, while still rejecting negative values. The error message accurately reflects the new constraint.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@danthegoodman1
Copy link
Author

is this my coderabbit or yours...

@danthegoodman1
Copy link
Author

@coderabbitai

image

@coderabbitai
Copy link

coderabbitai bot commented Dec 22, 2025

Tip

For best results, initiate chat on the files or code changes.

@danthegoodman1 I can't view images directly, but I'm here to help! Could you describe what you're seeing in the screenshot or let me know what question you have?

If you're asking about whether this is your CodeRabbit instance or a different one - I'm CodeRabbit responding on this PR. If you have questions about the review, configuration, or anything else related to this PR, I'm happy to help!

@pkg-pr-new
Copy link

pkg-pr-new bot commented Dec 23, 2025

Open in StackBlitz

npm i https://pkg.pr.new/get-convex/workpool/@convex-dev/workpool@145

commit: 3a679e9

@danthegoodman1
Copy link
Author

danthegoodman1 commented Dec 23, 2025

One large issue: that value override only lasts for the duration of the isolate 😭

Without changing the behavior of "always use the code's options", the only way to achieve a permanent change in behavior is to look up a current value every time from the DB with a query/helper function. That would definitely have tons of serialization aborts when ever a value is changed too.

@danthegoodman1 danthegoodman1 marked this pull request as draft December 31, 2025 18:18
@danthegoodman1
Copy link
Author

Marked draft as there's discussion around making the DB the source of truth rather than the code.

@ianmacartney
Copy link
Contributor

Following up here - when you specify the value via env var with a runtime-initialized workpool, this works, correct?

@danthegoodman1
Copy link
Author

I hadn’t tested using an env var in the ‘new Workpool’ (sry mobile) since we spoke but I’ll test that shortly

@danthegoodman1
Copy link
Author

@ianmacartney it does not appear to. I set:

export const testWorkpool = new Workpool(components.testWorkpool, {
  retryActionsByDefault: true,
  defaultRetryBehavior: {
    maxAttempts: 100,
    base: 1,
    initialBackoffMs: 5000,
  },
  maxParallelism: process.env.WRKPL_TEST_MAX_P ? parseInt(process.env.WRKPL_TEST_MAX_P, 10) : 1,
})

Then triggered a self-invoking action:

export const testAction = internalAction({
  args: {
    delay: v.number(),
  },
  handler: async (ctx, args) => {
    console.log(`Enqueuing test action with delay: ${args.delay}`)
    await testWorkpool.enqueueAction(ctx, internal.test_actions.testAction, {
      delay: args.delay,
    })
  },
})

Then npx convex env set WRKPL_TEST_MAX_P 0 and the invocations still continued

@danthegoodman1
Copy link
Author

actually I'm stupid and dumb and didn't use my version of it XDDDDDDDDDDDDD one sec

@ianmacartney
Copy link
Contributor

Note: you need to do another enqueue for the value to get passed in. So if there aren't any enqueues you'd need to set the env var and change it in the db

@ianmacartney
Copy link
Contributor

Ideally there'd be a components.workpool.config.set function to update global values for situations like this - so you could run it remotely

@danthegoodman1
Copy link
Author

Yeah sorry that example wasn't 100% accurate, I hav ea helper function like:

maxParallelism: getMaxParallelism("TEST", 1),
// ...

function getMaxParallelism(name: string, defaultValue: number): number {
  try {
    const envValue = process.env[`WRKPL_${name}_MAX_P`]
    if (envValue) {
      const parsed = parseInt(envValue, 10)
      if (!isNaN(parsed) && parsed > 0) {
        return parsed
      }
    }
  } catch {
    // Env vars unavailable during schema evaluation
  }
  return defaultValue
}

@danthegoodman1
Copy link
Author

danthegoodman1 commented Jan 7, 2026

Ok so it seems like you can set it to 0 initially, and it will stay paused, but once you turn it up to a positive number, then try to reduce it to 0 again, it will not stop in my observations. I also am seeing a maxParallelism 1 in the DB so something it's setting it.

@danthegoodman1
Copy link
Author

danthegoodman1 commented Jan 7, 2026

if (!isNaN(parsed) && parsed > 0) {
        return parsed
      }

I need to go to bed... or maybe opus needs to review me XD

@danthegoodman1
Copy link
Author

Alright so I added a public .kick() method that does the internal kick method which can be used to wake workpools without the need to enqueue or cancel.

@danthegoodman1
Copy link
Author

@ianmacartney is it possible to update that above PR npm install to the current commit?

@ianmacartney
Copy link
Contributor

done

@danthegoodman1
Copy link
Author

That seems to have done the trick, now you can Workpool.kick() to wake it back up!

@danthegoodman1
Copy link
Author

danthegoodman1 commented Jan 7, 2026

Testing proceedure:

/**
 * Name MUST be less than 28 characters (convex only allows 40 char long env vars)
 */
function getMaxParallelism(name: string, defaultValue: number): number {
  try {
    const envValue = process.env[`WRKPL_${name}_MAX_P`]
    if (envValue) {
      const parsed = parseInt(envValue, 10)
      if (!isNaN(parsed) && parsed >= 0) {
        return parsed
      }
    }
  } catch {
    // Env vars unavailable during schema evaluation
  }
  return defaultValue
}

export const testWorkpool = new Workpool(components.testWorkpool, {
  retryActionsByDefault: true,
  defaultRetryBehavior: {
    maxAttempts: 100,
    base: 1,
    initialBackoffMs: 5000,
  },
  maxParallelism: getMaxParallelism("TEST", 1),
})
export const testAction = internalAction({
  args: {
    delay: v.number(),
  },
  handler: async (ctx, args) => {
    console.log(`Test action with delay: ${args.delay} ${process.env.WRKPL_TEST_MAX_P}`)
    if (args.delay === 0) {
      console.log("Not enqueueing with 0 delay, kicking", process.env.WRKPL_TEST_MAX_P)
      await testWorkpool.kick(ctx)
      return Ok()
    }
    console.log("Enqueuing action")
    await testWorkpool.enqueueAction(
      ctx,
      internal.test_action.testAction,
      {
        delay: args.delay,
      },
      {
        runAfter: args.delay,
      }
    )
    return Ok()
  },
})
  • Run testAction with delay = 2000, watch it loop
  • npx convex env set WRKPL_TEST_MAX_P 0
  • Looping stops, wait a few seconds
  • npx convex env set WRKPL_TEST_MAX_P 1
  • Run testAction with delay = 0, kicks workpool
  • Immediately after, looping begins again
1/7/2026, 6:16:45 AM [CONVEX A(test_action:testAction)] [LOG] 'Test action with delay: 2000 1'
1/7/2026, 6:16:45 AM [CONVEX A(test_action:testAction)] [LOG] 'Enqueuing action'
1/7/2026, 6:16:47 AM [CONVEX A(test_action:testAction)] [LOG] 'Test action with delay: 2000 1'
1/7/2026, 6:16:47 AM [CONVEX A(test_action:testAction)] [LOG] 'Enqueuing action'
###### SET ENV TO 0 HERE
1/7/2026, 6:16:49 AM [CONVEX A(test_action:testAction)] [LOG] 'Test action with delay: 2000 0'
1/7/2026, 6:16:49 AM [CONVEX A(test_action:testAction)] [LOG] 'Enqueuing action'
###### SET ENV TO 1 HERE
1/7/2026, 6:16:58 AM [CONVEX A(test_action:testAction)] [LOG] 'Test action with delay: 0 1'
1/7/2026, 6:16:58 AM [CONVEX A(test_action:testAction)] [LOG] 'Not enqueueing with 0 delay, kicking' '1'
1/7/2026, 6:16:58 AM [CONVEX A(test_action:testAction)] [LOG] 'Test action with delay: 2000 1'
1/7/2026, 6:16:58 AM [CONVEX A(test_action:testAction)] [LOG] 'Enqueuing action'
1/7/2026, 6:17:00 AM [CONVEX A(test_action:testAction)] [LOG] 'Test action with delay: 2000 1'

@danthegoodman1 danthegoodman1 marked this pull request as ready for review January 7, 2026 14:20
@danthegoodman1
Copy link
Author

I do still think something that doesn't require env vars would be better, but this does "work", and it lays the groundwork for supporting 0 parallelism anyway

@danthegoodman1
Copy link
Author

@ianmacartney this is ready for a final review when you have time! (sorry I don't have permissions to request review)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Enable pausing the workpool

2 participants