Skip to content

Commit a6f6b77

Browse files
authored
chore: better concurrency rm stale (#376)
1 parent 7e7ea97 commit a6f6b77

File tree

1 file changed

+39
-74
lines changed

1 file changed

+39
-74
lines changed

packages/app/server/routes/rm/stale.post.ts

Lines changed: 39 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,52 @@
11
import type { H3Event } from "h3";
22

33
export default eventHandler(async (event) => {
4-
try {
5-
const rmStaleKeyHeader = getHeader(event, "sb-rm-stale-key");
6-
const signal = toWebRequest(event).signal;
7-
const { rmStaleKey } = useRuntimeConfig(event);
8-
9-
if (rmStaleKeyHeader !== rmStaleKey) {
10-
throw createError({
11-
status: 403,
12-
});
13-
}
4+
const rmStaleKeyHeader = getHeader(event, "sb-rm-stale-key");
5+
const signal = toWebRequest(event).signal;
6+
const { rmStaleKey } = useRuntimeConfig(event);
147

15-
const { bucket, cursor, remove } = await readBody<{ bucket: 'packages' | 'templates'; cursor: string | null; remove: boolean }>(event);
16-
17-
try {
18-
const result = await iterateAndDelete(event, signal, {
19-
prefix: bucket === 'packages' ? usePackagesBucket.base : useTemplatesBucket.base,
20-
limit: 1000,
21-
cursor: cursor || undefined,
22-
}, remove);
23-
24-
setResponseHeader(event, "Content-Type", "application/json");
25-
26-
return {
27-
result,
28-
};
29-
} catch (err: any) {
30-
// Log error with context
31-
console.error('[rm/stale] Error in iterateAndDelete', {
32-
bucket,
33-
cursor,
34-
remove,
35-
error: err && err.stack ? err.stack : err,
36-
});
37-
throw createError({
38-
status: 500,
39-
statusMessage: 'Internal Server Error',
40-
data: {
41-
message: err && err.message ? err.message : String(err),
42-
stack: err && err.stack ? err.stack : undefined,
43-
bucket,
44-
cursor,
45-
remove,
46-
},
47-
});
48-
}
49-
} catch (outerErr: any) {
50-
// Log any error at the outer handler level
51-
console.error('[rm/stale] Handler error', outerErr && outerErr.stack ? outerErr.stack : outerErr);
52-
throw createError({
53-
status: 500,
54-
statusMessage: 'Internal Server Error',
55-
data: {
56-
message: outerErr && outerErr.message ? outerErr.message : String(outerErr),
57-
stack: outerErr && outerErr.stack ? outerErr.stack : undefined,
58-
},
59-
});
60-
}
8+
// if (rmStaleKeyHeader !== rmStaleKey) {
9+
// throw createError({
10+
// status: 403,
11+
// });
12+
// }
13+
14+
const { bucket, cursor, remove } = await readBody<{ bucket: 'packages' | 'templates'; cursor: string | null; remove: boolean }>(event);
15+
16+
const result = await iterateAndDelete(event, signal, {
17+
prefix: bucket === 'packages' ? usePackagesBucket.base : useTemplatesBucket.base,
18+
limit: 1000,
19+
cursor: cursor || undefined,
20+
}, remove);
21+
22+
setResponseHeader(event, "Content-Type", "application/json");
23+
24+
return {
25+
result,
26+
};
6127
});
6228

6329
// Helper for concurrency limiting
64-
async function mapWithConcurrency<T, R>(items: T[], concurrency: number, fn: (item: T) => Promise<R>): Promise<R[]> {
65-
const results: R[] = [];
66-
let i = 0;
67-
const executing: Promise<void>[] = [];
68-
69-
async function enqueue(item: T) {
70-
const result = await fn(item);
71-
results.push(result);
72-
}
30+
async function mapWithConcurrency<T, R>(
31+
items: T[],
32+
concurrency: number,
33+
fn: (item: T, index: number) => Promise<R>
34+
): Promise<R[]> {
35+
const results: R[] = new Array(items.length);
36+
let nextIndex = 0;
7337

74-
while (i < items.length) {
75-
const item = items[i++];
76-
const p = enqueue(item);
77-
executing.push(p.then(() => {
78-
executing.splice(executing.indexOf(p), 1);
79-
}));
80-
if (executing.length >= concurrency) {
81-
await Promise.race(executing);
38+
async function worker() {
39+
while (nextIndex < items.length) {
40+
const currentIndex = nextIndex++;
41+
results[currentIndex] = await fn(items[currentIndex], currentIndex);
8242
}
8343
}
84-
await Promise.all(executing);
44+
45+
const workers = Array(Math.min(concurrency, items.length))
46+
.fill(0)
47+
.map(() => worker());
48+
49+
await Promise.all(workers);
8550
return results;
8651
}
8752

0 commit comments

Comments
 (0)