Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 3bfd4ab

Browse files
committed
Allow Miniflare#reload() with queue consumers set, closes #560
Previously, Miniflare would crash with a message saying a queue consumer was already set if configured on reload. This change clears consumers before reloading, and then adds them all back. It also adds some tests for this behaviour.
1 parent 0e48a64 commit 3bfd4ab

File tree

6 files changed

+100
-13
lines changed

6 files changed

+100
-13
lines changed

packages/core/src/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,9 @@ export class MiniflareCore<
716716
const newWatchPaths = new Set<string>();
717717
if (this.#wranglerConfigPath) newWatchPaths.add(this.#wranglerConfigPath);
718718

719+
// Reset all queue consumers, they'll be added back in `beforeReload()`s
720+
this.#ctx.queueBroker.resetConsumers();
721+
719722
// Run all before reload hooks, including mounts if we have any
720723
await this.#runAllBeforeReloads();
721724
if (!this.#isMount) {

packages/core/test/index.mounts.spec.ts

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,26 @@ import {
1111
MiniflareCore,
1212
MiniflareCoreContext,
1313
MiniflareCoreError,
14+
MiniflareCoreOptions,
1415
ReloadEvent,
1516
} from "@miniflare/core";
1617
import { DurableObjectsPlugin } from "@miniflare/durable-objects";
1718
import { HTTPPlugin, createServer } from "@miniflare/http-server";
1819
import { KVPlugin } from "@miniflare/kv";
19-
import { QueueBroker } from "@miniflare/queues";
20+
import {
21+
MessageBatch,
22+
QueueBroker,
23+
QueueError,
24+
QueuesPlugin,
25+
} from "@miniflare/queues";
2026
import { VMScriptRunner } from "@miniflare/runner-vm";
2127
import { LogLevel, NoOpLog, StoredValueMeta } from "@miniflare/shared";
2228
import {
2329
AsyncTestLog,
2430
MemoryStorageFactory,
2531
TestLog,
2632
TestPlugin,
33+
triggerPromise,
2734
useMiniflare,
2835
useTmp,
2936
waitForReload,
@@ -526,6 +533,61 @@ test("MiniflareCore: dispatches scheduled event to mount", async (t) => {
526533
t.deepEqual(waitUntil, ["mount", 1000, "* * * * *"]);
527534
});
528535

536+
test("MiniflareCore: consumes queue in mount", async (t) => {
537+
const opts: MiniflareCoreOptions<{
538+
CorePlugin: typeof CorePlugin;
539+
BindingsPlugin: typeof BindingsPlugin;
540+
QueuesPlugin: typeof QueuesPlugin;
541+
}> = {
542+
queueBindings: [{ name: "QUEUE", queueName: "queue" }],
543+
modules: true,
544+
script: `export default {
545+
async fetch(request, env, ctx) {
546+
env.QUEUE.send("message");
547+
return new Response();
548+
}
549+
}`,
550+
mounts: {
551+
a: {
552+
bindings: {
553+
REPORTER(batch: MessageBatch) {
554+
trigger(batch);
555+
},
556+
},
557+
queueConsumers: [{ queueName: "queue", maxWaitMs: 0 }],
558+
modules: true,
559+
script: `export default {
560+
queue(batch, env, ctx) {
561+
env.REPORTER(batch);
562+
}
563+
}`,
564+
},
565+
},
566+
};
567+
568+
// Check consumes messages sent in different mount
569+
let [trigger, promise] = triggerPromise<MessageBatch>();
570+
const mf = useMiniflare({ BindingsPlugin, QueuesPlugin }, opts);
571+
await mf.dispatchFetch("http://localhost");
572+
let batch = await promise;
573+
t.is(batch.messages.length, 1);
574+
t.is(batch.messages[0].body, "message");
575+
// ...even after reload (https://github.com/cloudflare/miniflare/issues/560)
576+
await mf.reload();
577+
[trigger, promise] = triggerPromise<MessageBatch>();
578+
await mf.dispatchFetch("http://localhost");
579+
batch = await promise;
580+
t.is(batch.messages.length, 1);
581+
t.is(batch.messages[0].body, "message");
582+
583+
// Check queue can have at most one consumer
584+
opts.queueConsumers = ["queue"]; // (adding parent as consumer too)
585+
await t.throwsAsync(mf.setOptions(opts), {
586+
instanceOf: QueueError,
587+
code: "ERR_CONSUMER_ALREADY_SET",
588+
});
589+
});
590+
529591
// Shared storage persistence tests
530592
type PersistOptions = Pick<
531593
MiniflareOptions,

packages/queues/src/broker.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,14 @@ export class WorkerQueue<Body = unknown> implements QueueInterface<Body> {
124124
}
125125
}
126126

127-
[kSetConsumer](consumer: Consumer) {
127+
[kSetConsumer](consumer?: Consumer) {
128+
if (consumer === undefined) {
129+
clearTimeout(this.#timeout);
130+
this.#pendingFlush = FlushType.NONE;
131+
this.#consumer = undefined;
132+
return;
133+
}
134+
128135
// only allow one subscription per queue (for now)
129136
if (this.#consumer) {
130137
throw new QueueError("ERR_CONSUMER_ALREADY_SET");
@@ -279,6 +286,10 @@ export class QueueBroker implements QueueBrokerInterface {
279286
return queue;
280287
}
281288

289+
resetConsumers() {
290+
for (const queue of this.#queues.values()) queue[kSetConsumer]();
291+
}
292+
282293
setConsumer(queue: WorkerQueue, consumer: Consumer) {
283294
queue[kSetConsumer](consumer);
284295
}

packages/queues/src/plugin.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ export class QueuesPlugin
8282
})
8383
queueConsumers?: (string | ConsumerOptions)[];
8484

85+
readonly #consumers: Consumer[];
86+
8587
constructor(ctx: PluginContext, options?: QueuesOptions) {
8688
super(ctx);
8789
this.assignOptions(options);
@@ -90,10 +92,8 @@ export class QueuesPlugin
9092
"Queues are experimental. There may be breaking changes in the future."
9193
);
9294
}
93-
}
9495

95-
async setup(_storageFactory: StorageFactory): Promise<SetupResult> {
96-
for (const entry of this.queueConsumers ?? []) {
96+
this.#consumers = (this.queueConsumers ?? []).map((entry) => {
9797
let opts: ConsumerOptions;
9898
if (typeof entry === "string") {
9999
opts = {
@@ -103,28 +103,36 @@ export class QueuesPlugin
103103
opts = entry;
104104
}
105105

106-
const consumer: Consumer = {
106+
return {
107107
queueName: opts.queueName,
108108
maxBatchSize: opts.maxBatchSize ?? DEFAULT_BATCH_SIZE,
109109
maxWaitMs: opts.maxWaitMs ?? DEFAULT_WAIT_MS,
110110
maxRetries: opts.maxRetries ?? DEFAULT_RETRIES,
111111
deadLetterQueue: opts.deadLetterQueue,
112112
dispatcher: this.ctx.queueEventDispatcher,
113113
};
114+
});
115+
}
114116

115-
const queue = this.ctx.queueBroker.getOrCreateQueue(opts.queueName);
116-
this.ctx.queueBroker.setConsumer(queue, consumer);
117-
}
118-
117+
async setup(_storageFactory: StorageFactory): Promise<SetupResult> {
119118
const bindings: Context = {};
120119
for (const binding of this.queueBindings ?? []) {
121120
bindings[binding.name] = this.ctx.queueBroker.getOrCreateQueue(
122121
binding.queueName
123122
);
124123
}
125124

126-
const requiresModuleExports =
127-
this.queueConsumers !== undefined && this.queueConsumers.length > 0;
125+
const requiresModuleExports = this.#consumers.length > 0;
128126
return { bindings, requiresModuleExports };
129127
}
128+
129+
beforeReload() {
130+
// Register consumers on every reload, we'll reset them all before running
131+
// `beforeReload()` hooks. This allows us to detect duplicate consumers
132+
// across mounts with different `QueuesPlugin` instances.
133+
for (const consumer of this.#consumers) {
134+
const queue = this.ctx.queueBroker.getOrCreateQueue(consumer.queueName);
135+
this.ctx.queueBroker.setConsumer(queue, consumer);
136+
}
137+
}
130138
}

packages/queues/test/plugin.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ test("QueuesPlugin: parses options from argv", async (t) => {
5757
const queueBroker = new QueueBroker();
5858
const plugin = new QueuesPlugin({ ...ctx, queueBroker }, options);
5959
await plugin.setup(factory);
60+
await plugin.beforeReload();
6061

6162
const queue1 = queueBroker.getOrCreateQueue("queue1");
6263
t.deepEqual(queue1[kGetConsumer]()?.maxBatchSize, DEFAULT_BATCH_SIZE);
@@ -103,6 +104,7 @@ test("QueuesPlugin: parses options from wrangler config", async (t) => {
103104
const queueBroker = new QueueBroker();
104105
const plugin = new QueuesPlugin({ ...ctx, queueBroker }, options);
105106
await plugin.setup(factory);
107+
await plugin.beforeReload();
106108

107109
// queue1 uses defaults
108110
const queue1 = queueBroker.getOrCreateQueue("queue1");

packages/shared/src/queues.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export interface QueueBroker {
99
getOrCreateQueue(name: string): Queue;
1010

1111
setConsumer(queue: Queue, consumer: Consumer): void;
12+
resetConsumers(): void;
1213
}
1314

1415
export interface Consumer {
@@ -33,7 +34,7 @@ export interface Queue<Body = unknown> {
3334
send(message: Body, options?: MessageSendOptions): Promise<void>;
3435
sendBatch(batch: Iterable<MessageSendRequest<Body>>): Promise<void>;
3536

36-
[kSetConsumer](consumer: Consumer): void;
37+
[kSetConsumer](consumer?: Consumer): void;
3738
[kGetConsumer](): Consumer | null;
3839
}
3940

0 commit comments

Comments
 (0)