Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 4c1bfdb

Browse files
jbw1991jbwcloudflaremrbbot
authored
Initial Queues Support (#354)
* Initial support for Queues * Improve Queues API: Wrap raw message bodies into a new Message type * Add typescript examples for producer and consumer * Add support for manual message retry() * Support automatic retryAll() on consumer errors * Update config based on most recent iteration * Remove test workers and docs * fix: re-enable `globalAsyncIO` option in tests * Fix lint warnings * Fix package lock * Experimental warning for queues * Use Miniflare logger for queues Co-authored-by: Josh Wheeler <[email protected]> Co-authored-by: MrBBot <[email protected]> Co-authored-by: bcoll <[email protected]>
1 parent f3b0dc8 commit 4c1bfdb

File tree

36 files changed

+1469
-84
lines changed

36 files changed

+1469
-84
lines changed

package-lock.json

Lines changed: 25 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cache/test/plugin.spec.ts

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import {
77
CachedMeta,
88
NoOpCache,
99
} from "@miniflare/cache";
10+
import { QueueBroker } from "@miniflare/queues";
1011
import {
1112
Compatibility,
1213
LogLevel,
1314
NoOpLog,
1415
PluginContext,
16+
QueueEventDispatcher,
1517
StoredValueMeta,
1618
} from "@miniflare/shared";
1719
import {
@@ -31,8 +33,16 @@ import { testResponse } from "./helpers";
3133
const log = new NoOpLog();
3234
const compat = new Compatibility();
3335
const rootPath = process.cwd();
34-
const ctx: PluginContext = { log, compat, rootPath, globalAsyncIO: true };
35-
36+
const queueBroker = new QueueBroker();
37+
const queueEventDispatcher: QueueEventDispatcher = async (_batch) => {};
38+
const ctx: PluginContext = {
39+
log,
40+
compat,
41+
rootPath,
42+
queueBroker,
43+
queueEventDispatcher,
44+
globalAsyncIO: true,
45+
};
3646
test("CacheStorage: provides default cache", async (t) => {
3747
const factory = new MemoryStorageFactory();
3848
const caches = new CacheStorage({}, log, factory, {});
@@ -208,14 +218,27 @@ test("CachePlugin: setup: Responses parse files in FormData as File objects only
208218
});
209219
test("CachePlugin: setup: operations throw outside request handler unless globalAsyncIO set", async (t) => {
210220
const factory = new MemoryStorageFactory();
211-
let plugin = new CachePlugin({ log, compat, rootPath });
221+
let plugin = new CachePlugin({
222+
log,
223+
compat,
224+
rootPath,
225+
queueBroker,
226+
queueEventDispatcher,
227+
});
212228
let caches: CacheStorage = plugin.setup(factory).globals?.caches;
213229
await t.throwsAsync(caches.default.match("http://localhost"), {
214230
instanceOf: Error,
215231
message: /^Some functionality, such as asynchronous I\/O/,
216232
});
217233

218-
plugin = new CachePlugin({ log, compat, rootPath, globalAsyncIO: true });
234+
plugin = new CachePlugin({
235+
log,
236+
compat,
237+
rootPath,
238+
globalAsyncIO: true,
239+
queueBroker,
240+
queueEventDispatcher,
241+
});
219242
caches = plugin.setup(factory).globals?.caches;
220243
await caches.default.match("http://localhost");
221244
});

packages/core/src/index.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import fs from "fs/promises";
22
import path from "path";
33
import { URL } from "url";
4+
import { QueueBroker } from "@miniflare/queues";
45
import {
56
AdditionalModules,
67
BeforeSetupResult,
78
Compatibility,
89
Context,
910
Log,
11+
MessageBatch,
1012
Mutex,
1113
Options,
1214
PluginContext,
@@ -49,8 +51,10 @@ import {
4951
ServiceWorkerGlobalScope,
5052
_kLoopHeader,
5153
kAddModuleFetchListener,
54+
kAddModuleQueueListener,
5255
kAddModuleScheduledListener,
5356
kDispatchFetch,
57+
kDispatchQueue,
5458
kDispatchScheduled,
5559
kDispose,
5660
withImmutableHeaders,
@@ -216,6 +220,7 @@ function throwNoScriptError(modules?: boolean) {
216220
export interface MiniflareCoreContext {
217221
log: Log;
218222
storageFactory: StorageFactory;
223+
queueBroker: QueueBroker;
219224
scriptRunner?: ScriptRunner;
220225
scriptRequired?: boolean;
221226
scriptRunForModuleExports?: boolean;
@@ -400,13 +405,26 @@ export class MiniflareCore<
400405
} else {
401406
this.#compat = new Compatibility(compatibilityDate, compatibilityFlags);
402407
}
408+
409+
const queueBroker = this.#ctx.queueBroker;
410+
const queueEventDispatcher = async (batch: MessageBatch) => {
411+
await this.dispatchQueue(batch);
412+
413+
// TODO(soon) detect success vs failure during processing
414+
this.#ctx.log.info(
415+
`${batch.queue} (${batch.messages.length} Messages) OK`
416+
);
417+
};
418+
403419
const ctx: PluginContext = {
404420
log: this.#ctx.log,
405421
compat: this.#compat,
406422
rootPath,
407423
usageModel,
408424
globalAsyncIO,
409425
fetchMock,
426+
queueEventDispatcher,
427+
queueBroker,
410428
};
411429

412430
// Log options and compatibility flags every time they might've changed
@@ -782,6 +800,11 @@ export class MiniflareCore<
782800
if (scheduledListener) {
783801
globalScope[kAddModuleScheduledListener](scheduledListener);
784802
}
803+
804+
const queueListener = defaults?.queue?.bind(defaults);
805+
if (queueListener) {
806+
globalScope[kAddModuleQueueListener](queueListener);
807+
}
785808
}
786809
}
787810

@@ -1118,6 +1141,24 @@ export class MiniflareCore<
11181141
);
11191142
}
11201143

1144+
async dispatchQueue<WaitUntil extends any[] = unknown[]>(
1145+
batch: MessageBatch
1146+
): Promise<WaitUntil> {
1147+
await this.#initPromise;
1148+
1149+
const { usageModel } = this.#instances!.CorePlugin;
1150+
const globalScope = this.#globalScope;
1151+
1152+
// Each fetch gets its own context (e.g. 50 subrequests).
1153+
// Start a new pipeline too.
1154+
return new RequestContext({
1155+
externalSubrequestLimit: usageModelExternalSubrequestLimit(usageModel),
1156+
}).runWith(() => {
1157+
const result = globalScope![kDispatchQueue]<WaitUntil>(batch);
1158+
return result;
1159+
});
1160+
}
1161+
11211162
async dispose(): Promise<void> {
11221163
// Ensure initialisation complete before disposing
11231164
// (see https://github.com/cloudflare/miniflare/issues/341)

packages/core/src/plugins/core.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import {
2222
import webStreams from "stream/web";
2323
import { URL, URLSearchParams } from "url";
2424
import { TextDecoder, TextEncoder } from "util";
25-
import { deserialize, serialize } from "v8";
2625
import {
2726
CompatibilityFlag,
2827
Context,
@@ -37,6 +36,7 @@ import {
3736
STRING_SCRIPT_PATH,
3837
SetupResult,
3938
globsToMatcher,
39+
structuredCloneBuffer,
4040
} from "@miniflare/shared";
4141
import { File, FormData, Headers, MockAgent } from "undici";
4242
// @ts-expect-error `urlpattern-polyfill` only provides global types
@@ -99,11 +99,6 @@ function proxyDisableStreamConstructor<
9999
});
100100
}
101101

102-
// Approximation of structuredClone for Node < 17.0.0
103-
function structuredCloneBuffer<T>(value: T): T {
104-
return deserialize(serialize(value));
105-
}
106-
107102
export interface CoreOptions {
108103
script?: string;
109104
scriptPath?: string;

packages/core/src/standards/event.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
Awaitable,
33
Context,
44
Log,
5+
MessageBatch,
56
MiniflareError,
67
ThrowingEventTarget,
78
TypedEventListener,
@@ -106,10 +107,27 @@ export class ScheduledEvent extends Event {
106107
}
107108
}
108109

110+
export class QueueEvent extends Event {
111+
readonly batch: MessageBatch;
112+
readonly [kWaitUntil]: Promise<unknown>[] = [];
113+
114+
constructor(type: "queue", init: { batch: MessageBatch }) {
115+
super(type);
116+
this.batch = init.batch;
117+
}
118+
119+
waitUntil(promise: Promise<any>): void {
120+
if (!(this instanceof QueueEvent)) {
121+
throw new TypeError("Illegal invocation");
122+
}
123+
this[kWaitUntil].push(promise);
124+
}
125+
}
126+
109127
export class ExecutionContext {
110-
readonly #event: FetchEvent | ScheduledEvent;
128+
readonly #event: FetchEvent | ScheduledEvent | QueueEvent;
111129

112-
constructor(event: FetchEvent | ScheduledEvent) {
130+
constructor(event: FetchEvent | ScheduledEvent | QueueEvent) {
113131
this.#event = event;
114132
}
115133

@@ -147,12 +165,20 @@ export type ModuleScheduledListener = (
147165
ctx: ExecutionContext
148166
) => any;
149167

168+
export type ModuleQueueListener = (
169+
batch: MessageBatch,
170+
env: Context,
171+
ctx: ExecutionContext
172+
) => any;
173+
150174
export const kAddModuleFetchListener = Symbol("kAddModuleFetchListener");
151175
export const kAddModuleScheduledListener = Symbol(
152176
"kAddModuleScheduledListener"
153177
);
178+
export const kAddModuleQueueListener = Symbol("kAddModuleQueueListener");
154179
export const kDispatchFetch = Symbol("kDispatchFetch");
155180
export const kDispatchScheduled = Symbol("kDispatchScheduled");
181+
export const kDispatchQueue = Symbol("kDispatchQueue");
156182
export const kDispose = Symbol("kDispose");
157183

158184
export class PromiseRejectionEvent extends Event {
@@ -172,6 +198,7 @@ export class PromiseRejectionEvent extends Event {
172198
export type WorkerGlobalScopeEventMap = {
173199
fetch: FetchEvent;
174200
scheduled: ScheduledEvent;
201+
queue: QueueEvent;
175202
unhandledrejection: PromiseRejectionEvent;
176203
rejectionhandled: PromiseRejectionEvent;
177204
};
@@ -334,6 +361,13 @@ export class ServiceWorkerGlobalScope extends WorkerGlobalScope {
334361
});
335362
}
336363

364+
[kAddModuleQueueListener](listener: ModuleQueueListener): void {
365+
super.addEventListener("queue", (e) => {
366+
const res = listener(e.batch, this.#bindings, new ExecutionContext(e));
367+
if (res !== undefined) e.waitUntil(Promise.resolve(res));
368+
});
369+
}
370+
337371
async [kDispatchFetch]<WaitUntil extends any[] = unknown[]>(
338372
request: Request,
339373
proxy = false
@@ -418,6 +452,14 @@ export class ServiceWorkerGlobalScope extends WorkerGlobalScope {
418452
return (await Promise.all(event[kWaitUntil])) as WaitUntil;
419453
}
420454

455+
async [kDispatchQueue]<WaitUntil extends any[] = any[]>(
456+
batch: MessageBatch
457+
): Promise<WaitUntil> {
458+
const event = new QueueEvent("queue", { batch });
459+
super.dispatchEvent(event);
460+
return (await Promise.all(event[kWaitUntil])) as WaitUntil;
461+
}
462+
421463
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
422464
#maybeAddPromiseListener(listener: PromiseListener, member: any): void {
423465
if (listener.set.size === 0) {

packages/core/test/index.mounts.spec.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
import { DurableObjectsPlugin } from "@miniflare/durable-objects";
1717
import { HTTPPlugin, createServer } from "@miniflare/http-server";
1818
import { KVPlugin } from "@miniflare/kv";
19+
import { QueueBroker } from "@miniflare/queues";
1920
import { VMScriptRunner } from "@miniflare/runner-vm";
2021
import { LogLevel, NoOpLog, StoredValueMeta } from "@miniflare/shared";
2122
import {
@@ -248,6 +249,7 @@ test("MiniflareCore: #init: doesn't throw if script required, parent script not
248249
storageFactory: new MemoryStorageFactory(),
249250
scriptRunner: new VMScriptRunner(),
250251
scriptRequired: true,
252+
queueBroker: new QueueBroker(),
251253
};
252254

253255
const mf = new MiniflareCore({ CorePlugin }, ctx, {
@@ -262,6 +264,7 @@ test("MiniflareCore: #init: logs reload errors when mount options update instead
262264
log,
263265
storageFactory: new MemoryStorageFactory(),
264266
scriptRunner: new VMScriptRunner(),
267+
queueBroker: new QueueBroker(),
265268
};
266269
const mf = new MiniflareCore({ CorePlugin, DurableObjectsPlugin }, ctx, {
267270
mounts: { a: { script: "//" } },
@@ -780,6 +783,7 @@ test("MiniflareCore: runs mounted worker script for Durable Object classes used
780783
storageFactory: new MemoryStorageFactory(),
781784
scriptRunner: new VMScriptRunner(),
782785
scriptRunForModuleExports: true,
786+
queueBroker: new QueueBroker(),
783787
},
784788
{
785789
modules: true,
@@ -823,6 +827,7 @@ test("MiniflareCore: can access Durable Objects defined in parent or other mount
823827
log: new NoOpLog(),
824828
storageFactory: new MemoryStorageFactory(),
825829
scriptRunner: new VMScriptRunner(),
830+
queueBroker: new QueueBroker(),
826831
},
827832
{
828833
name: "parent",

0 commit comments

Comments
 (0)