Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 6ac8ff2

Browse files
authored
Add contentType support for sending Queues messages (#641)
* Add contentType support for Queues messages * Address PR comments * Address more PR comments
1 parent 121953f commit 6ac8ff2

File tree

3 files changed

+133
-11
lines changed

3 files changed

+133
-11
lines changed

packages/miniflare/src/plugins/queues/gateway.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import v8 from "v8";
44
import { stringify } from "devalue";
55
import { Colorize, bold, green, grey, red, reset, yellow } from "kleur/colors";
66
import { z } from "zod";
7-
import { Log, Timers } from "../../shared";
7+
import { Base64DataSchema, Log, Timers, viewToBuffer } from "../../shared";
88
import { Storage } from "../../storage";
99
import { CoreHeaders, structuredSerializableReducers } from "../../workers";
1010
import { DispatchFetch, QueueConsumer } from "../shared";
@@ -44,6 +44,15 @@ const exceptionQueueResponse: QueueResponse = {
4444
explicitAcks: [],
4545
};
4646

47+
export const QueueContentTypeSchema = z
48+
.enum(["text", "json", "bytes", "v8"])
49+
.default("v8");
50+
export const GatewayMessageSchema = z.object({
51+
body: Base64DataSchema,
52+
contentType: QueueContentTypeSchema,
53+
});
54+
export type GatewayMessage = z.infer<typeof GatewayMessageSchema>;
55+
4756
export class Message {
4857
#failedAttempts = 0;
4958

@@ -82,7 +91,7 @@ interface PendingFlush {
8291
export type QueueEnqueueOn = (
8392
queueName: string,
8493
consumer: QueueConsumer,
85-
messages: (Message | Buffer)[]
94+
messages: (Message | GatewayMessage)[]
8695
) => void;
8796

8897
// `QueuesGateway` slightly misrepresents what this class does. Each queue will
@@ -225,15 +234,24 @@ export class QueuesGateway {
225234
enqueue(
226235
enqueueOn: QueueEnqueueOn,
227236
consumer: QueueConsumer,
228-
messages: (Message | Buffer)[]
237+
messages: (Message | GatewayMessage)[]
229238
) {
230239
for (const message of messages) {
231240
if (message instanceof Message) {
232241
this.#messages.push(message);
233242
} else {
234243
const id = crypto.randomBytes(16).toString("hex");
235244
const timestamp = this.timers.now();
236-
const body = v8.deserialize(message);
245+
let body: unknown;
246+
if (message.contentType === "text") {
247+
body = message.body.toString();
248+
} else if (message.contentType === "json") {
249+
body = JSON.parse(message.body.toString());
250+
} else if (message.contentType === "bytes") {
251+
body = viewToBuffer(message.body);
252+
} else {
253+
body = v8.deserialize(message.body);
254+
}
237255
this.#messages.push(new Message(id, timestamp, body));
238256
}
239257
}

packages/miniflare/src/plugins/queues/router.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
import { parse } from "devalue";
22
import { z } from "zod";
33
import { Headers, Response } from "../../http";
4-
import { Base64DataSchema, HttpError } from "../../shared";
4+
import { HttpError } from "../../shared";
55
import {
66
HEADER_PERSIST,
77
POST,
88
QueueConsumer,
99
RouteHandler,
1010
Router,
1111
} from "../shared";
12-
import { QueueEnqueueOn, QueuesGateway } from "./gateway";
12+
import {
13+
GatewayMessageSchema,
14+
QueueContentTypeSchema,
15+
QueueEnqueueOn,
16+
QueuesGateway,
17+
} from "./gateway";
1318

1419
const MAX_MESSAGE_SIZE_BYTES = 128 * 1000;
1520
const MAX_MESSAGE_BATCH_COUNT = 100;
@@ -30,6 +35,21 @@ function validateMessageSize(headers: Headers) {
3035
}
3136
}
3237

38+
function validateContentType(
39+
headers: Headers
40+
): z.infer<typeof QueueContentTypeSchema> {
41+
const format = headers.get("X-Msg-Fmt") ?? undefined; // zod will throw if null
42+
const result = QueueContentTypeSchema.safeParse(format);
43+
if (!result.success) {
44+
throw new HttpError(
45+
400,
46+
`message content type ${format} is invalid; if specified, must be one of 'text', 'json', 'bytes', or 'v8'`
47+
);
48+
} else {
49+
return result.data;
50+
}
51+
}
52+
3353
function validateBatchSize(headers: Headers) {
3454
const count = headers.get("CF-Queue-Batch-Count");
3555
if (count !== null && parseInt(count) > MAX_MESSAGE_BATCH_COUNT) {
@@ -63,7 +83,7 @@ async function decodeQueueConsumer(
6383
}
6484

6585
const QueuesBatchRequestSchema = z.object({
66-
messages: z.array(z.object({ body: Base64DataSchema })),
86+
messages: z.array(GatewayMessageSchema),
6787
});
6888

6989
export interface QueuesParams {
@@ -78,15 +98,20 @@ export class QueuesRouter extends Router<QueuesGateway> {
7898
@POST("/:queue/message")
7999
message: RouteHandler<QueuesParams> = async (req, params) => {
80100
validateMessageSize(req.headers);
101+
const contentType = validateContentType(req.headers);
81102

82103
// Get consumer from persistence header, if we don't have a consumer,
83104
// drop the message
84105
const consumer = await decodeQueueConsumer(req.headers);
85106
if (consumer === undefined) return new Response();
86107

87108
const queue = decodeURIComponent(params.queue);
88-
const serialisedBody = Buffer.from(await req.arrayBuffer());
89-
this.#enqueueOn(queue, consumer, [serialisedBody]);
109+
this.#enqueueOn(queue, consumer, [
110+
{
111+
body: Buffer.from(await req.arrayBuffer()),
112+
contentType,
113+
},
114+
]);
90115
return new Response();
91116
};
92117

@@ -101,8 +126,7 @@ export class QueuesRouter extends Router<QueuesGateway> {
101126

102127
const queue = decodeURIComponent(params.queue);
103128
const body = QueuesBatchRequestSchema.parse(await req.json());
104-
const messages = body.messages.map(({ body }) => body);
105-
this.#enqueueOn(queue, consumer, messages);
129+
this.#enqueueOn(queue, consumer, body.messages);
106130
return new Response();
107131
};
108132
}

packages/miniflare/test/plugins/queues/index.spec.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,3 +657,83 @@ test("operations permit strange queue names", async (t) => {
657657
{ queue: id, id: batch[1].id, body: "msg2" },
658658
]);
659659
});
660+
661+
test("supports message contentTypes", async (t) => {
662+
const MessageContentTypeTestSchema = z
663+
.object({ queue: z.string(), id: z.string(), body: z.any() })
664+
.array();
665+
const promise = new DeferredPromise<
666+
z.infer<typeof MessageContentTypeTestSchema>
667+
>();
668+
const timers = new TestTimers();
669+
const id = "my/ Queue";
670+
const log = new TestLog(t);
671+
const mf = new Miniflare({
672+
log,
673+
timers,
674+
verbose: true,
675+
queueProducers: { QUEUE: id },
676+
queueConsumers: [id],
677+
serviceBindings: {
678+
async REPORTER(request) {
679+
promise.resolve(
680+
MessageContentTypeTestSchema.parse(await request.json())
681+
);
682+
return new Response();
683+
},
684+
},
685+
modules: true,
686+
script: `export default {
687+
async fetch(request, env, ctx) {
688+
await env.QUEUE.send("msg1", { contentType: "text" });
689+
await env.QUEUE.send([{ message: "msg2" }], { contentType: "json" });
690+
const arrayBuffer = new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]);
691+
await env.QUEUE.send(arrayBuffer, { contentType: "bytes" });
692+
await env.QUEUE.send(new Date(1600000000000), { contentType: "v8" });
693+
return new Response();
694+
},
695+
async queue(batch, env, ctx) {
696+
delete Date.prototype.toJSON; // JSON.stringify calls .toJSON before the replacer
697+
await env.REPORTER.fetch("http://localhost", {
698+
method: "POST",
699+
body: JSON.stringify(
700+
batch.messages.map(({ id, body }) => ({
701+
queue: batch.queue,
702+
id,
703+
body,
704+
})),
705+
(_, value) => {
706+
if (value instanceof ArrayBuffer) {
707+
return {
708+
$type: "ArrayBuffer",
709+
value: Array.from(new Uint8Array(value)),
710+
};
711+
} else if (value instanceof Date) {
712+
return { $type: "Date", value: value.getTime() };
713+
}
714+
return value;
715+
},
716+
),
717+
});
718+
},
719+
};`,
720+
});
721+
await mf.dispatchFetch("http://localhost");
722+
timers.timestamp += 1000;
723+
await timers.waitForTasks();
724+
const batch = await promise;
725+
t.deepEqual(batch, [
726+
{ queue: id, id: batch[0].id, body: "msg1" },
727+
{ queue: id, id: batch[1].id, body: [{ message: "msg2" }] },
728+
{
729+
queue: id,
730+
id: batch[2].id,
731+
body: { $type: "ArrayBuffer", value: [0, 1, 2, 3, 4, 5, 6, 7] },
732+
},
733+
{
734+
queue: id,
735+
id: batch[3].id,
736+
body: { $type: "Date", value: 1600000000000 },
737+
},
738+
]);
739+
});

0 commit comments

Comments
 (0)