Skip to content
This repository was archived by the owner on Mar 13, 2025. It is now read-only.

Commit 92839d3

Browse files
committed
Add per-request context, limit subrequests to 50, closes #117
1 parent 138859c commit 92839d3

File tree

13 files changed

+312
-11
lines changed

13 files changed

+312
-11
lines changed

packages/cache/src/cache.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
Clock,
1212
Storage,
1313
defaultClock,
14+
getRequestContext,
1415
millisToSeconds,
1516
waitForOpenInputGate,
1617
waitForOpenOutputGate,
@@ -151,6 +152,7 @@ export class Cache implements CacheInterface {
151152
req: RequestInfo,
152153
res: BaseResponse | Response
153154
): Promise<undefined> {
155+
getRequestContext()?.incrementSubrequests();
154156
req = normaliseRequest(req);
155157

156158
if (res instanceof Response && res.webSocket) {
@@ -192,6 +194,7 @@ export class Cache implements CacheInterface {
192194
req: RequestInfo,
193195
options?: CacheMatchOptions
194196
): Promise<Response | undefined> {
197+
getRequestContext()?.incrementSubrequests();
195198
req = normaliseRequest(req);
196199
// Cloudflare only caches GET requests
197200
if (req.method !== "GET" && !options?.ignoreMethod) return;
@@ -232,6 +235,7 @@ export class Cache implements CacheInterface {
232235
req: RequestInfo,
233236
options?: CacheMatchOptions
234237
): Promise<boolean> {
238+
getRequestContext()?.incrementSubrequests();
235239
req = normaliseRequest(req);
236240
// Cloudflare only caches GET requests
237241
if (req.method !== "GET" && !options?.ignoreMethod) return false;

packages/cache/test/cache.spec.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import assert from "assert";
22
import { URL } from "url";
33
import { Cache, CacheError, CachedMeta } from "@miniflare/cache";
44
import { Request, RequestInitCfProperties, Response } from "@miniflare/core";
5-
import { Storage } from "@miniflare/shared";
5+
import { RequestContext, Storage } from "@miniflare/shared";
66
import {
77
getObjectProperties,
88
utf8Decode,
@@ -191,6 +191,12 @@ test("Cache: put respects cf cacheTtlByStatus", async (t) => {
191191
t.is(await cache.match("http://localhost/302"), undefined);
192192
});
193193

194+
test("Cache: put increments subrequest count", async (t) => {
195+
const { cache } = t.context;
196+
const ctx = new RequestContext();
197+
await ctx.runWith(() => cache.put("http://localhost:8787/", testResponse()));
198+
t.is(ctx.subrequests, 1);
199+
});
194200
test("Cache: put waits for output gate to open before storing", (t) => {
195201
const { cache } = t.context;
196202
return waitsForOutputGate(
@@ -243,6 +249,12 @@ test("Cache: only matches non-GET requests when ignoring method", async (t) => {
243249
t.not(await cache.match(req, { ignoreMethod: true }), undefined);
244250
});
245251

252+
test("Cache: match increments subrequest count", async (t) => {
253+
const { cache } = t.context;
254+
const ctx = new RequestContext();
255+
await ctx.runWith(() => cache.match("http://localhost:8787/"));
256+
t.is(ctx.subrequests, 1);
257+
});
246258
test("Cache: match MISS waits for input gate to open before returning", async (t) => {
247259
const { cache } = t.context;
248260
await waitsForInputGate(t, () => cache.match("http://localhost:8787/"));
@@ -297,6 +309,12 @@ test("Cache: only deletes non-GET requests when ignoring method", async (t) => {
297309
t.true(await cache.delete(req, { ignoreMethod: true }));
298310
});
299311

312+
test("Cache: delete increments subrequest count", async (t) => {
313+
const { cache } = t.context;
314+
const ctx = new RequestContext();
315+
await ctx.runWith(() => cache.delete("http://localhost:8787/"));
316+
t.is(ctx.subrequests, 1);
317+
});
300318
test("Cache: delete waits for output gate to open before deleting", async (t) => {
301319
const { cache } = t.context;
302320
await cache.put("http://localhost:8787/", testResponse());

packages/core/src/index.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
PluginOptions,
1515
PluginOptionsUnion,
1616
PluginSignatures,
17+
RequestContext,
1718
ScriptBlueprint,
1819
ScriptRunner,
1920
ScriptRunnerResult,
@@ -918,9 +919,12 @@ export class MiniflareCore<
918919
}
919920

920921
const globalScope = this.#globalScope;
921-
return globalScope![kDispatchFetch]<WaitUntil>(
922-
withImmutableHeaders(request),
923-
!!upstreamURL // only proxy if upstream URL set
922+
// Each fetch gets its own context (e.g. 50 subrequests)
923+
return new RequestContext().runWith(() =>
924+
globalScope![kDispatchFetch]<WaitUntil>(
925+
withImmutableHeaders(request),
926+
!!upstreamURL // only proxy if upstream URL set
927+
)
924928
);
925929
}
926930

@@ -930,7 +934,10 @@ export class MiniflareCore<
930934
): Promise<WaitUntil> {
931935
await this.#initPromise;
932936
const globalScope = this.#globalScope;
933-
return globalScope![kDispatchScheduled]<WaitUntil>(scheduledTime, cron);
937+
// Each fetch gets its own context (e.g. 50 subrequests)
938+
return new RequestContext().runWith(() =>
939+
globalScope![kDispatchScheduled]<WaitUntil>(scheduledTime, cron)
940+
);
934941
}
935942

936943
async dispose(): Promise<void> {

packages/core/src/standards/http.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { URL } from "url";
1313
import {
1414
Compatibility,
1515
Log,
16+
getRequestContext,
1617
nonCircularClone,
1718
waitForOpenInputGate,
1819
waitForOpenOutputGate,
@@ -591,14 +592,31 @@ export function withWaitUntil<WaitUntil extends any[]>(
591592
return resWaitUntil;
592593
}
593594

595+
/** @internal */
596+
export function _getURLList(res: BaseResponse): URL[] | undefined {
597+
// Extract the internal urlList property on Responses. It doesn't matter
598+
// too much if the internal representation changes in the future: this code
599+
// shouldn't throw. Currently we use this to count the number of redirects,
600+
// and increment the subrequest count accordingly.
601+
for (const symbol of Object.getOwnPropertySymbols(res)) {
602+
if (symbol.description === "state") {
603+
// @ts-expect-error symbol properties are not included type definitions
604+
return res[symbol].urlList;
605+
}
606+
}
607+
}
608+
594609
export async function fetch(
595610
input: RequestInfo,
596611
init?: RequestInit
597612
): Promise<Response> {
598-
// TODO (someday): support cache using fetch:
613+
// TODO (someday): support cache using fetch (could add to request context?):
599614
// https://developers.cloudflare.com/workers/learning/how-the-cache-works#fetch
600615
// https://developers.cloudflare.com/workers/examples/cache-using-fetch
601616

617+
const ctx = getRequestContext();
618+
ctx?.incrementSubrequests();
619+
602620
await waitForOpenOutputGate();
603621

604622
// Don't pass our strange hybrid Request to undici
@@ -616,6 +634,16 @@ export async function fetch(
616634

617635
const baseRes = await baseFetch(req);
618636

637+
// Increment the subrequest count by the number of redirects
638+
// TODO (someday): technically we should check the subrequest count before
639+
// each redirect, so requests don't actually get sent to the server if the
640+
// subrequest count exceeds the limit
641+
if (baseRes.redirected && ctx) {
642+
const urlList = _getURLList(baseRes);
643+
// Last url is final destination, so subtract 1 for redirect count
644+
if (urlList) ctx.incrementSubrequests(urlList.length - 1);
645+
}
646+
619647
// Convert the response to our hybrid Response
620648
const res = new Response(baseRes.body, baseRes);
621649

packages/core/src/standards/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export {
1313
withImmutableHeaders,
1414
Response,
1515
withWaitUntil,
16+
_getURLList,
1617
fetch,
1718
_urlFromRequestInput,
1819
_buildUnknownProtocolWarning,

packages/core/test/index.spec.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import assert from "assert";
22
import fs from "fs/promises";
33
import path from "path";
44
import { setTimeout } from "timers/promises";
5+
import { CachePlugin } from "@miniflare/cache";
56
import {
67
BindingsPlugin,
78
BuildPlugin,
@@ -25,6 +26,7 @@ import {
2526
Options,
2627
Storage,
2728
TypedEventListener,
29+
getRequestContext,
2830
} from "@miniflare/shared";
2931
import {
3032
LogEntry,
@@ -44,6 +46,7 @@ import {
4446
import test, { Macro } from "ava";
4547
import { Request as BaseRequest, File, FormData } from "undici";
4648

49+
const log = new NoOpLog();
4750
// Only use this shared storage factory when the test doesn't care about storage
4851
const storageFactory = new MemoryStorageFactory();
4952
const scriptRunner = new VMScriptRunner();
@@ -1085,6 +1088,37 @@ test("MiniflareCore: dispatchFetch: Request parse files in FormData as File obje
10851088
t.is(await file.text(), "test");
10861089
t.is(file.name, "test.txt");
10871090
});
1091+
test("MiniflareCore: dispatchFetch: creates new request context", async (t) => {
1092+
const mf = useMiniflareWithHandler(
1093+
{ BindingsPlugin, CachePlugin },
1094+
{
1095+
globals: {
1096+
assertSubrequests(expected: number) {
1097+
t.is(getRequestContext()?.subrequests, expected);
1098+
},
1099+
},
1100+
},
1101+
async (globals, req) => {
1102+
globals.assertSubrequests(0);
1103+
await globals.caches.default.match("http://localhost/");
1104+
globals.assertSubrequests(1);
1105+
1106+
const n = parseInt(new globals.URL(req.url).searchParams.get("n"));
1107+
await Promise.all(
1108+
Array.from(Array(n)).map(() =>
1109+
globals.caches.default.match("http://localhost/")
1110+
)
1111+
);
1112+
return new globals.Response("body");
1113+
}
1114+
);
1115+
await t.throwsAsync(mf.dispatchFetch("http://localhost/?n=50"), {
1116+
instanceOf: Error,
1117+
message: /^Too many subrequests/,
1118+
});
1119+
const res = await mf.dispatchFetch("http://localhost/?n=1");
1120+
t.is(await res.text(), "body");
1121+
});
10881122

10891123
test("MiniflareCore: dispatchScheduled: dispatches scheduled event", async (t) => {
10901124
const mf = useMiniflare(
@@ -1101,6 +1135,40 @@ test("MiniflareCore: dispatchScheduled: dispatches scheduled event", async (t) =
11011135
const res = await mf.dispatchScheduled(1000, "30 * * * *");
11021136
t.deepEqual(res, [1000, "30 * * * *"]);
11031137
});
1138+
test("MiniflareCore: dispatchScheduled: creates new request context", async (t) => {
1139+
const mf = new MiniflareCore(
1140+
{ CorePlugin, BindingsPlugin, CachePlugin },
1141+
{ log, storageFactory, scriptRunner },
1142+
{
1143+
globals: {
1144+
assertSubrequests(expected: number) {
1145+
t.is(getRequestContext()?.subrequests, expected);
1146+
},
1147+
},
1148+
modules: true,
1149+
script: `export default {
1150+
async scheduled(controller) {
1151+
assertSubrequests(0);
1152+
await caches.default.match("http://localhost/");
1153+
assertSubrequests(1);
1154+
1155+
await Promise.all(
1156+
Array.from(Array(controller.scheduledTime)).map(() =>
1157+
caches.default.match("http://localhost/")
1158+
)
1159+
);
1160+
return true;
1161+
}
1162+
}`,
1163+
}
1164+
);
1165+
await t.throwsAsync(mf.dispatchScheduled(50), {
1166+
instanceOf: Error,
1167+
message: /^Too many subrequests/,
1168+
});
1169+
const waitUntil = await mf.dispatchScheduled(1);
1170+
t.true(waitUntil[0]);
1171+
});
11041172

11051173
test("MiniflareCore: dispose: runs dispose for all plugins", async (t) => {
11061174
const log = new TestLog();

packages/core/test/standards/http.spec.ts

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import assert from "assert";
2+
import http from "http";
23
import { text } from "stream/consumers";
34
import { ReadableStream, TransformStream, WritableStream } from "stream/web";
45
import { URL } from "url";
@@ -7,6 +8,7 @@ import {
78
IncomingRequestCfProperties,
89
Request,
910
Response,
11+
_getURLList,
1012
_isByteStream,
1113
createCompatFetch,
1214
fetch,
@@ -16,7 +18,13 @@ import {
1618
withStringFormDataFiles,
1719
withWaitUntil,
1820
} from "@miniflare/core";
19-
import { Compatibility, InputGate, LogLevel, NoOpLog } from "@miniflare/shared";
21+
import {
22+
Compatibility,
23+
InputGate,
24+
LogLevel,
25+
NoOpLog,
26+
RequestContext,
27+
} from "@miniflare/shared";
2028
import {
2129
TestLog,
2230
triggerPromise,
@@ -35,6 +43,7 @@ import {
3543
File,
3644
FormData,
3745
Headers,
46+
fetch as baseFetch,
3847
} from "undici";
3948

4049
// @ts-expect-error filling out all properties is annoying
@@ -804,12 +813,51 @@ test("withWaitUntil: adds wait until to (Base)Response", async (t) => {
804813
t.is(await res.waitUntil(), baseWaitUntil);
805814
});
806815

816+
function redirectingServerListener(
817+
req: http.IncomingMessage,
818+
res: http.ServerResponse
819+
) {
820+
const { searchParams } = new URL(req.url ?? "", "http://localhost");
821+
const n = parseInt(searchParams.get("n") ?? "0");
822+
if (n > 0) {
823+
res.writeHead(302, { Location: `/?n=${n - 1}` });
824+
} else {
825+
res.writeHead(200);
826+
}
827+
res.end();
828+
}
829+
test("_getURLList: extracts URL list from Response", async (t) => {
830+
const upstream = (await useServer(t, redirectingServerListener)).http;
831+
const url = new URL("/?n=3", upstream);
832+
const res = await baseFetch(url);
833+
const urlList = _getURLList(res);
834+
t.deepEqual(urlList?.map(String), [
835+
`${upstream.origin}/?n=3`,
836+
`${upstream.origin}/?n=2`,
837+
`${upstream.origin}/?n=1`,
838+
`${upstream.origin}/?n=0`,
839+
]);
840+
});
841+
807842
test("fetch: can fetch from existing Request", async (t) => {
808843
const upstream = (await useServer(t, (req, res) => res.end("upstream"))).http;
809844
const req = new Request(upstream);
810845
const res = await fetch(req);
811846
t.is(await res.text(), "upstream");
812847
});
848+
test("fetch: increments subrequest count", async (t) => {
849+
const upstream = (await useServer(t, (req, res) => res.end("upstream"))).http;
850+
const ctx = new RequestContext();
851+
await ctx.runWith(() => fetch(upstream));
852+
t.is(ctx.subrequests, 1);
853+
});
854+
test("fetch: increments subrequest count for each redirect", async (t) => {
855+
const upstream = (await useServer(t, redirectingServerListener)).http;
856+
const url = new URL("/?n=3", upstream);
857+
const ctx = new RequestContext();
858+
await ctx.runWith(() => fetch(url));
859+
t.is(ctx.subrequests, 4);
860+
});
813861
test("fetch: waits for output gate to open before fetching", async (t) => {
814862
let fetched = false;
815863
const upstream = (

packages/durable-objects/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
"undici": "^4.11.1"
4242
},
4343
"devDependencies": {
44+
"@miniflare/cache": "2.0.0-rc.3",
45+
"@miniflare/runner-vm": "2.0.0-rc.3",
4446
"@miniflare/shared-test": "2.0.0-rc.3"
4547
}
4648
}

0 commit comments

Comments
 (0)