Skip to content

Queues via BullMQ#3647

Open
CarsonF wants to merge 18 commits intodevelopfrom
queues
Open

Queues via BullMQ#3647
CarsonF wants to merge 18 commits intodevelopfrom
queues

Conversation

@CarsonF
Copy link
Member

@CarsonF CarsonF commented Feb 13, 2026

Ported our queue work from seed-api

@coderabbitai
Copy link

coderabbitai bot commented Feb 13, 2026

📝 Walkthrough

Walkthrough

Integrates BullMQ queueing and Redis mock/Lua support, adds Duration and JMESPath GraphQL scalars, introduces queue management GraphQL DTOs/resolvers and wiring, refactors stack formatting to prettyStack, and exposes new helpers and type utilities across functions and queue modules.

Changes

Cohort / File(s) Summary
Package manifest
package.json
Added BullMQ, @nestjs/bullmq, Fengari/Lua, ioredis-mock, msgpackr and related types/patch wiring.
GraphQL scalars
src/common/scalars/duration.scalar.ts, src/common/scalars/jmes-path.scalar.ts, src/common/scalars/index.ts
Added Duration and JMESPath scalars and registered/exported them.
Queue public API & types
src/core/queue/index.ts, src/core/queue/types.ts
New queue public re-exports and ergonomic type aliases for BullMQ types.
Queue runtime patch & wrapper
src/core/queue/queue.patch.ts
Patched Queue class (nameFor, add overloads), runtime assignment to BullModule.queueClass, error stack prettifying, inspect/telemetry hooks.
Bull config & module wiring
src/core/queue/bull.config.ts, src/core/queue/queue.module.ts, src/core/queue/registration.ts
Added BullConfig factory, QueueModule with forRootAsync and FlowProducer registration, and registerQueues helper for dynamic queue registration.
Queue management features
src/core/queue/management/...
src/core/queue/management/dto/*, src/core/queue/management/resolvers/*, src/core/queue/management/queue-management.module.ts, src/core/queue/management/queue-management.service.ts
Introduced DTO enums/types, GraphQL type augmentations, resolvers for queue/job queries and mutations, service/module with admin checks.
Redis mock & Lua interop
src/core/redis/redis.mock.ts, src/core/redis/mock-lua.ts
Implemented RedisMock patches for BullMQ, Fengari-based Lua VM interop, MessagePack/JSON modules, notifier/marker support, and utilities.
Exception stack refactor
src/core/exception/pretty-stack.ts, src/core/exception/exception.normalizer.ts, src/core/exception/index.ts, (removed) src/core/exception/is-src-frame.ts, src/core/exception/normalize-frame-path.ts
Replaced prior stack-frame helpers with new prettyStack implementation and updated normalizer to use it; removed older helper modules/exports.
Common functions & helpers
src/common/functions/get-cause-list.ts, src/common/functions/exponential-delay.ts, src/common/functions/leveled-exp-backoff.ts, src/common/functions/index.ts
Added getCauseList, exponentialDelay, leveledExpBackoff and re-exported them.
GraphQL helpers & barrels
src/common/graphql/objects/mappers/declare-fields.ts, src/common/graphql/index.ts
Added declareGqlFields helper and re-export in GraphQL index.
DTO barrels & exports
src/core/queue/management/dto/index.ts, src/core/queue/management/dto/*.ts, src/core/queue/management/dto/job.dto.ts, src/core/queue/management/dto/queue.dto.ts
Added DTO barrel and GraphQL augmentations for Job and Queue types; new enums and modifier DTOs.
Config & core wiring
src/core/config/config.service.ts, src/core/core.module.ts
Added redis.prefix and bull config surface to ConfigService; integrated QueueModule into CoreModule.
Misc exports & refactors
src/common/exceptions/exception.ts (removed getCauseList), src/common/functions/index.ts, src/core/exception/index.ts
Moved getCauseList into functions, updated exports and removed/added exception-related exports.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

💥 Breaking Change

🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Description check ⚠️ Warning The PR description is minimal ('Ported our queue work from seed-api') and does not follow the required template structure with Monday task link, detailed description, or checklist items. Add a Monday task link or provide a reason for the PR, include a detailed description of the changes, and complete the ready-for-review checklist with applicable items.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Queues via BullMQ' accurately describes the main change—introducing BullMQ-based queue infrastructure. It is concise, specific, and directly reflects the primary objective.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into develop

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch queues

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
src/common/functions/leveled-exp-backoff.ts (1)

13-23: Dead code: the Array.isArray(level) branch is unreachable.

DelayConfig is typed as an object (Omit<exponentialDelay.Options, 'attempt'> & { attempts?: number }), so Array.isArray(level) will always be false at runtime, and lines 21-22 (the tuple destructuring path) can never execute.

If tuple syntax support is intended, the DelayConfig type needs to be a union that includes a tuple form. Otherwise, remove the dead branch.

Option A: Remove the dead branch
   const parsedLevels = levels.map((level) => {
-    if (!Array.isArray(level)) {
-      return {
-        ...level,
-        initial:
-          level.initial != null ? Duration.from(level.initial) : undefined,
-      };
-    }
-    const [attempts, duration, jitter] = level;
-    return { attempts, initial: Duration.from(duration), jitter };
+    return {
+      ...level,
+      initial:
+        level.initial != null ? Duration.from(level.initial) : undefined,
+    };
   });
Option B: Add tuple support to the type
+type DelayConfigTuple = [attempts: number, duration: DurationIn, jitter?: number];
+
-export const leveledExpBackoff = (levels: readonly DelayConfig[]) => {
+export const leveledExpBackoff = (levels: readonly (DelayConfig | DelayConfigTuple)[]) => {

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🤖 Fix all issues with AI agents
In `@package.json`:
- Around line 82-83: The dependency "fengari" is unmaintained and poses
supply-chain risk while "fengari-interop" is sporadically maintained; remove or
replace "fengari" in package.json (the "fengari" entry) with a maintained Lua
runtime or an actively supported integration, keep or re-evaluate
"fengari-interop" as needed, run the full test suite to verify behavior of any
replacement, and open a tracking issue noting the change and rationale so future
maintainers can follow up.

In `@src/common/scalars/duration.scalar.ts`:
- Around line 46-53: The variable-resolution branch in the Duration scalar
incorrectly uses the outer field identifier `name` to lookup variables; when
`value.kind === Kind.VARIABLE` use the variable identifier `value.name.value`
(and guard that `variables` exists) instead of `variables[name]`, so replace the
lookup with `variables[value.name.value]` (falling back to undefined) and keep
the rest of the logic that returns `[name, parsed]` and uses `parseFloat` for
non-variable values.
- Line 31: The code imports the augmentation module as a type-only import so
Duration.from isn’t available at runtime; change the import of DurationIn from
'@seedcompany/common/temporal/luxon' so the module is loaded at runtime (e.g.,
remove the top-level "type" to make it a runtime import while retaining the type
modifier on the specifier if desired) so the augmentation that adds
Duration.from is executed and Duration.from(ast.value) works; update the import
used by the Duration.from call accordingly.

In `@src/core/exception/pretty-stack.ts`:
- Around line 27-32: normalizeFramePath currently uses the regex /\\\\/ which
matches two backslashes and only the first occurrence; change the backslash
replacement to match a single backslash globally so Windows paths are fully
normalized (e.g., use a global regex that matches one backslash such as /\\/g or
use replaceAll('\\', '/') ). Keep the existing matchSrcPathInTrace replacement
first, then apply the global single-backslash replacement on the resulting
string in the normalizeFramePath function.

In `@src/core/queue/bull.config.ts`:
- Line 4: Remove the internal ioredis import (parseRedisUrl) and stop using that
utility; instead set the BullMQ connection variable (connection of type
QueueOptions['connection']) directly to the incoming url string (e.g.,
connection = url ?? undefined) and only import/instantiate RedisMock (RedisMock)
when url is not provided to use as the fallback connection; update any code that
previously relied on parseRedisUrl to accept the plain URL string for the
connection property.

In `@src/core/queue/resolvers/job-modifier.resolver.ts`:
- Around line 35-45: The GraphQL RetryArgs currently exposes QueueJobState (all
11 states) but job.retry only accepts FinishedStatus ('failed'|'completed'),
causing runtime errors; fix by restricting the GraphQL input or validating it:
either replace the field type in RetryArgs with a new GraphQL enum type
representing FinishedStatus (only 'failed' and 'completed') so the schema only
allows valid values, or add explicit runtime validation in the resolver before
calling job.retry(args.state) (e.g., check that args.state is strictly 'failed'
or 'completed' and throw a user-friendly error otherwise); update references to
RetryArgs, QueueJobState, FinishedStatus and the call to job.retry(...)
accordingly.

In `@src/core/queue/resolvers/job.resolver.ts`:
- Around line 86-87: The helper maybeSearch currently skips calling search when
value is any falsy value (0, false, "", etc.); update maybeSearch to only bypass
search for null/undefined by changing the condition to check value !== null &&
value !== undefined (or use value == null to detect nullish) so that numeric or
boolean progress values (e.g., 0 or false) will still be passed into search when
path is provided; locate the maybeSearch arrow function and replace the `path &&
value ? search(path, value) : value` check with a nullish-aware check such as
`path && value !== null && value !== undefined ? search(path, value) : value`.
- Around line 168-181: The currentZone() function assumes
request?.headers['x-time-zone'] is a string by casting to string | undefined,
which can hide a string[] when multiple headers exist; modify currentZone() to
detect if the header is an array and pick a single value (e.g., the last or
first entry) or throw an InputException for multiple values, then pass that
string to Info.normalizeZone and keep the existing validity check and error
throw using InputException; update references to
gqlContextHost.context.request?.headers['x-time-zone'], Info.normalizeZone, and
the thrown InputException accordingly.
- Around line 93-96: The queue resolver `@ResolveField`(() => Queue) in
job.resolver.ts currently returns (job as any).queue with no null-check; update
the resolver to either declare the field nullable (e.g., `@ResolveField`(() =>
Queue, { nullable: true })) or add a runtime guard inside the queue(`@Parent`()
job: Job) method that returns null when (job as any).queue is undefined (and
otherwise returns the queue), ensuring the resolver's return type matches
possible runtime values.

In `@src/core/queue/resolvers/queue-modifier.resolver.ts`:
- Around line 219-228: Fix the typo in the JSDoc-like description passed to the
`@ResolveField` decorator in queue-modifier.resolver.ts: update the sentence "This
method will the *pause* the queue..." to "This method will *pause* the queue..."
inside the description string for the ResolveField(() => Queue) decorator so the
message used by the Queue resolver (Queue modifier) reads correctly.
- Around line 209-217: The promoteJobs resolver currently calls the wrong BullMQ
method: in the promoteJobs(`@Parent`() { queue }: Modifier, `@Args`() args:
PromoteArgs) function it invokes queue.retryJobs(...) which retries failed jobs;
change this to call queue.promoteJobs({ count: args.batchSize }) so delayed jobs
are promoted; update the single call and keep the same return queue behavior.

In `@src/core/queue/resolvers/queue.resolver.ts`:
- Around line 110-114: The current code performs N separate Redis round-trips by
calling job.getState() for each Job when filterPath.includes('state'); modify
the logic in queue.resolver.ts to avoid per-job calls: if the incoming filter is
a simple state filter use BullMQ's getJobs with the appropriate types/state
parameter (so the initial jobs list is already filtered by state), otherwise
batch state lookups via a Redis pipeline (or BullMQ multi) to fetch all job
states in one round-trip and then rebuild the states ReadonlyMap; update the
branch that currently sets states using Promise.all and mapEntries to instead
use the optimized path depending on filterPath and keep symbols job.getState,
getJobs, jobs, states, filterPath, mapEntries, and JobType as the points of
change.
- Around line 36-39: Replace the ambiguous GraphQL Number type with the
integer-specific Int for pagination fields: import Int from '@nestjs/graphql'
and change the decorators on the pagination properties (the start and end fields
in the Queue resolver input/args and the start/end fields in MetricArgs) from
`@Field`(() => Number) to `@Field`(() => Int) so the schema explicitly uses GraphQL
Int for these integer indices.

In `@src/core/redis/redis.mock.ts`:
- Around line 142-181: The mock's bzpopmin patch uses rx.timeout(timeout * 1000)
which treats timeout=0 as an immediate timeout (causing wrong behavior for Redis
semantics where 0 means block forever); change the pipeline in the patched
bzpopmin (inside patchMethod(this, 'bzpopmin', ...)) to detect timeout <= 0 and
in that case omit rx.timeout(...) (or use an infinite/no-timeout observable) so
the concatWith(getThisMarkerNotifier()...) can block indefinitely; keep the
existing rx.catchError logic for real timeouts when timeout > 0 and ensure
nonBlockingPop/getThisMarkerNotifier usage remains unchanged.
- Around line 19-33: The debug block guarded by the hardcoded debug variable is
dead code; either remove the entire if (debug) { ... } block or make debug
configurable (e.g., read a runtime/config/env flag instead of the hardcoded
false) so the Scripts.prototype patch via patchMethod and the top-level await
import('bullmq') can actually run; if you choose to parameterize, replace the
current debug constant with a runtime check (e.g., process.env.DEBUG_BULLMQ or a
config flag), keep the same patch logic for Scripts.prototype.execCommand and
Logger.debug/Logger.error, and ensure the code path is executed only when that
runtime flag is truthy.
🧹 Nitpick comments (13)
package.json (1)

98-98: ioredis-mock as a runtime dependency warrants consideration.

Placing a mock library in dependencies (rather than devDependencies) is unconventional. Per the PR description, this is intentional to enable in-memory queue support without Redis. However, a few concerns:

  1. The heavy patching (referenced in PR objectives) adds maintenance overhead — each ioredis-mock or bullmq upgrade may require re-validating the patches.
  2. This pulls test-oriented code into production bundles.

Consider documenting (e.g., in a comment or ADR) why ioredis-mock is a runtime dependency, so future contributors don't mistakenly move it to devDependencies.

src/common/scalars/duration.scalar.ts (1)

66-76: Consider handling null/undefined in serialize.

If a resolver returns null or undefined for a nullable Duration field, this method will throw a GraphQLError ("Expected a Duration but received a: object"/"undefined"). GraphQL scalars' serialize typically pass through null for nullable fields, but it depends on how the framework calls this. Worth confirming the expected behavior.

src/core/queue/queue.patch.ts (1)

52-58: Mutating error.stack permanently strips non-src frames.

prettyStack filters to source frames only. Since this mutates the original error's stack property before passing it to BullMQ's moveToFailed, the full stack trace (including library frames) will be permanently lost in Redis. This is fine for display purposes, but could hinder debugging when a failure originates deep inside a dependency.

If this is intentional (consistent with logging), no action needed — just flagging for awareness.

src/core/queue/resolvers/queue.resolver.ts (1)

62-71: Swallowed exception in findQueue may mask DI errors.

Any error from moduleRef.get (including DI wiring bugs) is caught and re-thrown as a generic NotFoundException. Consider narrowing the catch or logging the original error at debug level to aid troubleshooting.

src/core/redis/mock-lua.ts (2)

80-93: deepNullToUndefined does not guard against circular references.

Recursive traversal will stack overflow on circular object graphs. For Redis/BullMQ data this is very unlikely, but a depth limit or WeakSet-based cycle check would make it resilient.


152-165: defineGlobalObject relies on Lua stack state after luaExecString.

The pattern of executing Lua code that returns a value, followed by setGlobal to pop it from the stack, is tightly coupled to the ioredis-mock VM's internal stack behavior. If luaExecString changes how it manages the stack (e.g., auto-pops the return), this would silently bind the wrong value. Consider adding a brief comment documenting this coupling.

src/core/redis/redis.mock.ts (2)

78-92: lpos is defined after the deepNullToUndefined wrapping loop — it won't be wrapped.

The loop at lines 78–84 iterates over Object.keys(luaCommands) at that moment, applying the null-to-undefined conversion. lpos is added at line 88, after the loop completes, so it won't have this wrapper. This is likely fine since lpos already returns undefined for not-found, but the debug logging (line 81) also won't apply to lpos calls. If this is intentional, a brief comment would help future readers understand the ordering dependency.


108-113: Lua script regex replacement is fragile — requires surrounding spaces.

The regex / ~= (false|nil) /g requires a space before ~= and after false/nil. This won't match if the comparison is at the end of a line or has different whitespace (e.g., tabs). Given that the comment says this is needed for BullMQ's specific Lua scripts, this may be acceptable, but if BullMQ updates their scripts, the pattern could silently stop matching.

src/core/queue/registration.ts (1)

7-7: @nestjs/common/utils/shared.utils.js is an internal import.

While commonly used in NestJS ecosystem code, this isn't a public API. Consider using a plain object check or importing from a stable path if this utility gets relocated in a future NestJS version.

src/core/queue/resolvers/job-modifier.resolver.ts (1)

94-105: remove() returns the job after deletion — intentional but worth a note.

After job.remove(), the job no longer exists in Redis. Returning it is fine for the immediate GraphQL response (the in-memory object is still valid), but any subsequent field resolution that hits Redis (e.g., fetching related data) could fail. If all fields on Job are already hydrated in memory, this is safe.

src/core/queue/resolvers/queue-modifier.resolver.ts (2)

83-110: startAt and startAfter mutual exclusivity is not enforced.

The descriptions say they are mutually exclusive, but if both are provided, startAt silently wins (Lines 198-204). Since this is an admin-only mutation, this is low risk, but a runtime validation or GraphQL @oneOf-style check would prevent confusion.


34-42: options field accepts arbitrary JSON passed directly to queue.add().

BullMQ job options control repeat schedules, removal policies, delays, and more. Passing unvalidated input directly could have unintended side effects. Since this is admin-gated, the risk is reduced, but consider either allowlisting specific option fields or documenting which options are expected.

src/core/queue/queue.module.ts (1)

53-58: Replace constructor.name check with instanceof for mock detection.

The current check this.sharedConfig.connection.constructor.name === '_RedisMock' is fragile because it couples to an internal class name from the ioredis-mock dependency. This can break silently if the dependency updates or if minification affects constructor names.

Since RedisMock is directly importable from '../redis/redis.mock' and the connection instance is created as new RedisMock() in BullConfig, use instanceof instead:

Suggested fix
+ import { RedisMock } from '../redis/redis.mock';
+
  onModuleInit() {
-   const isMock =
-     this.sharedConfig.connection.constructor.name === '_RedisMock';
+   const isMock = this.sharedConfig.connection instanceof RedisMock;
    if ((!isCli && !isTest) || isMock) {
      this.registrar.register();
    }

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@src/common/scalars/duration.scalar.ts`:
- Around line 64-74: The serialize method in the Duration scalar currently
returns Duration.toISO() without checking for invalid Durations; update
serialize (and also add guards in parseValue and parseLiteral where
Duration.from and Duration.fromObject are used) to check the Duration.isValid
flag and ensure toISO() is non-null; if the Duration is invalid or toISO()
returns null, throw a GraphQLError with a clear message (e.g., "Expected a valid
Duration but received an invalid Duration") so invalid durations are rejected
before causing runtime errors in non-nullable fields.

In `@src/core/queue/bull.config.ts`:
- Around line 12-14: The current creation of connection = url ? { url } :
undefined passes an invalid options object to ioredis; replace this by
constructing an ioredis client when url is provided (e.g., set connection to new
Redis(url, { maxRetriesPerRequest: null })) and add the import for Redis from
'ioredis'; update references to QueueOptions['connection'] accordingly so BullMQ
receives an actual Redis instance rather than an options object.

In `@src/core/queue/management/resolvers/queue.resolver.ts`:
- Around line 88-108: The Proxy get trap currently invokes this.jobResolver
methods directly which can throw when those methods call this.currentZone()
absent an HTTP context; to fix, when handling Reflect.has(this.jobResolver,
prop) in the get trap, call the resolver method on a safe resolver object that
overrides currentZone (e.g., const safeResolver =
Object.create(this.jobResolver); safeResolver.currentZone = () => undefined;)
and then invoke safeResolver[prop as keyof JobResolver](job, {}), or
alternatively wrap the call in a try/catch and return undefined on error; update
the get trap in the Proxy (the anonymous Proxy in the jobs.filter callback) to
use this safeInvoker approach for methods like createdAt/finishedAt/processedAt
to avoid throwing when no HTTP request context exists.
🧹 Nitpick comments (5)
src/core/queue/management/dto/finished-status.enum.ts (1)

1-13: Solid compile-time alignment checks.

The bidirectional type-check pattern is a nice way to keep the local enum in sync with BullMQ's upstream type. One very minor nit:

Line 12 includes | 'unknown' in the target type, but unlike JobState (which has an Unknown = 'unknown' member), FinishedStatus has no 'unknown' variant. The union is harmless — it just makes the "no extra" check slightly less strict than it could be. If this was intentionally kept for consistency with the JobState pattern, feel free to ignore.

src/core/queue/management/queue-management.service.ts (1)

20-27: Broad catch swallows all errors as NotFoundException.

ModuleRef.get() is synchronous and throws if the token isn't registered, so wrapping it in try/catch for a NotFoundException makes sense for the happy path. However, catching every exception (including unexpected DI failures or other runtime errors) and re-throwing as NotFoundException could mask real issues during debugging. Consider narrowing the catch or at minimum logging the original error.

💡 Optional: preserve original error context
     try {
       const queue = this.moduleRef.get<Queue>(getQueueToken(name), {
         strict: false,
       });
       return queue;
     } catch (e) {
-      throw new NotFoundException();
+      throw new NotFoundException(`Queue '${name}' not found`);
     }
src/core/queue/queue.module.ts (2)

15-16: argv.join(' ').match(...) could match substrings unintentionally.

The regex /repl|console/ matches anywhere in the joined argv string — it could false-positive on paths or argument values that happen to contain "repl" or "console" as a substring (e.g., --queue=console-worker). Word boundaries would be safer.

♻️ Tighter match
-const isCli = argv.join(' ').match(/repl|console/);
+const isCli = argv.some((arg) => /^(repl|console)$/.test(arg));

50-55: sharedConfig.connection access is not null-guarded.

QueueOptions.connection is typed as optional in BullMQ's types. If BullConfig ever fails to supply it, accessing .constructor.name will throw. Given the learning that CarsonF prefers fail-fast behavior this may be intentional, but worth noting that the error message would be an opaque NPE rather than a descriptive one.

src/core/queue/management/resolvers/queue-modifier.resolver.ts (1)

83-109: startAt and startAfter are described as mutually exclusive but not validated.

The descriptions say these are mutually exclusive, but no runtime check prevents both from being provided. The ?? fallback logic on lines 194-200 silently favors startAt. Consider adding an explicit validation:

♻️ Optional validation
   async retryJobs(`@Parent`() { queue }: Modifier, `@Args`() args: RetryArgs) {
+    if (args.startAt && args.startAfter) {
+      throw new InputException('startAt and startAfter are mutually exclusive');
+    }
     await queue.retryJobs({

```
jobs(filter: "[?progress != `0`]")
jobs(filter: "[?attemptsMade > `1`]")
```
stacktrace didn't make sense - it is error stacks for multiple errors.
So I'm calling it `failures[]` now.

Also I'm splitting each failure/stack on the line breaks.
This is what Apollo does for `extensions.stacktrace`, and helps
with formatting in the explorer.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant