Skip to content

Commit c8e4823

Browse files
committed
add backlog to report
1 parent e376c65 commit c8e4823

File tree

3 files changed

+452
-28
lines changed

3 files changed

+452
-28
lines changed

src/component/loop.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import {
2121
RunResult,
2222
toSegment,
2323
} from "./shared.js";
24-
import { recordCompleted, recordReport, recordStarted } from "./stats.js";
24+
import { recordCompleted, generateReport, recordStarted } from "./stats.js";
2525

2626
const CANCELLATION_BATCH_SIZE = 64; // the only queue that can get unbounded.
2727
const SECOND = 1000;
@@ -100,7 +100,7 @@ export const main = internalMutation({
100100
// It's been a while, let's start fresh.
101101
lastReportTs = Date.now();
102102
}
103-
recordReport(console, state);
103+
await generateReport(ctx, console, state, globals);
104104
state.report = {
105105
completed: 0,
106106
succeeded: 0,

src/component/stats.test.ts

Lines changed: 344 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,344 @@
1+
import { convexTest } from "convex-test";
2+
import {
3+
describe,
4+
expect,
5+
it,
6+
beforeEach,
7+
afterEach,
8+
vi,
9+
assert,
10+
} from "vitest";
11+
import schema from "./schema";
12+
import { internal } from "./_generated/api";
13+
import { Logger } from "./logging";
14+
import { getCurrentSegment } from "./shared";
15+
import { paginator } from "convex-helpers/server/pagination";
16+
17+
const modules = import.meta.glob("./**/*.ts");
18+
19+
// Create a proper Logger mock
20+
function createLoggerMock(): Logger {
21+
return {
22+
event: vi.fn(),
23+
debug: vi.fn(),
24+
info: vi.fn(),
25+
warn: vi.fn(),
26+
error: vi.fn(),
27+
time: vi.fn(),
28+
timeEnd: vi.fn(),
29+
};
30+
}
31+
32+
describe("stats", () => {
33+
async function setupTest() {
34+
const t = convexTest(schema, modules);
35+
return t;
36+
}
37+
38+
let t: Awaited<ReturnType<typeof setupTest>>;
39+
40+
beforeEach(async () => {
41+
vi.useFakeTimers();
42+
t = await setupTest();
43+
});
44+
45+
afterEach(() => {
46+
vi.useRealTimers();
47+
});
48+
49+
describe("generateReport", () => {
50+
it("should not generate a report when log level is below REPORT", async () => {
51+
// Setup internal state
52+
const stateId = await t.run(async (ctx) => {
53+
return await ctx.db.insert("internalState", {
54+
generation: 1n,
55+
segmentCursors: {
56+
incoming: 0n,
57+
completion: 0n,
58+
cancelation: 0n,
59+
},
60+
lastRecovery: 0n,
61+
report: {
62+
completed: 0,
63+
succeeded: 0,
64+
failed: 0,
65+
retries: 0,
66+
canceled: 0,
67+
lastReportTs: 0,
68+
},
69+
running: [],
70+
});
71+
});
72+
73+
// Mock the console.event function to track if it's called
74+
const consoleMock = createLoggerMock();
75+
76+
// Get the state document
77+
const state = await t.run(async (ctx) => {
78+
return await ctx.db.get(stateId);
79+
});
80+
assert(state);
81+
82+
// Call generateReport with a log level that won't trigger reporting
83+
await t.run(async (ctx) => {
84+
const { generateReport } = await import("./stats");
85+
await generateReport(ctx, consoleMock, state, {
86+
maxParallelism: 10,
87+
logLevel: "INFO", // Below REPORT level
88+
});
89+
});
90+
91+
// Verify that console.event was not called
92+
expect(consoleMock.event).not.toHaveBeenCalled();
93+
});
94+
95+
it("should generate a report when backlog is small enough", async () => {
96+
// Setup internal state
97+
const stateId = await t.run(async (ctx) => {
98+
return await ctx.db.insert("internalState", {
99+
generation: 1n,
100+
segmentCursors: {
101+
incoming: 0n,
102+
completion: 0n,
103+
cancelation: 0n,
104+
},
105+
lastRecovery: 0n,
106+
report: {
107+
completed: 10,
108+
succeeded: 6,
109+
failed: 2,
110+
retries: 2,
111+
canceled: 0,
112+
lastReportTs: 0,
113+
},
114+
running: [],
115+
});
116+
});
117+
118+
// Create a few pending start items
119+
await t.run(async (ctx) => {
120+
// Create a work item
121+
const workId = await ctx.db.insert("work", {
122+
fnType: "mutation",
123+
fnHandle: "testHandle",
124+
fnName: "testFunction",
125+
fnArgs: { test: true },
126+
attempts: 0,
127+
});
128+
129+
// Create a pendingStart for the work
130+
await ctx.db.insert("pendingStart", {
131+
workId,
132+
segment: 5n, // Some segment between 0 and currentSegment
133+
});
134+
});
135+
136+
// Mock the console.event function to track if it's called
137+
const consoleMock = createLoggerMock();
138+
139+
// Get the state document
140+
const state = await t.run(async (ctx) => {
141+
return await ctx.db.get(stateId);
142+
});
143+
assert(state);
144+
145+
// Call generateReport with REPORT log level
146+
await t.run(async (ctx) => {
147+
const { generateReport } = await import("./stats");
148+
await generateReport(ctx, consoleMock, state, {
149+
maxParallelism: 10,
150+
logLevel: "REPORT", // This should trigger reporting
151+
});
152+
});
153+
154+
// Verify that console.event was called with the correct data
155+
expect(consoleMock.event).toHaveBeenCalledWith("report", {
156+
backlog: 1, // We created one pendingStart
157+
running: 0,
158+
completed: 10,
159+
succeeded: 6,
160+
failed: 2,
161+
retries: 2,
162+
canceled: 0,
163+
failureRate: 0.4, // (failed + retries) / completed = (2 + 2) / 10 = 0.4
164+
permanentFailureRate: 0.25, // failed / (completed - retries) = 2 / (10 - 2) = 2/8
165+
});
166+
});
167+
168+
it("should schedule calculateBacklogAndReport when backlog is large", async () => {
169+
// Setup internal state
170+
const stateId = await t.run(async (ctx) => {
171+
return await ctx.db.insert("internalState", {
172+
generation: 1n,
173+
segmentCursors: {
174+
incoming: 0n,
175+
completion: 0n,
176+
cancelation: 0n,
177+
},
178+
lastRecovery: 0n,
179+
report: {
180+
completed: 10,
181+
succeeded: 8,
182+
failed: 1,
183+
retries: 1,
184+
canceled: 0,
185+
lastReportTs: 0,
186+
},
187+
running: [],
188+
});
189+
});
190+
191+
// Create more pending start items than maxParallelism
192+
const maxParallelism = 5;
193+
194+
// Create maxParallelism + 1 work items to trigger pagination
195+
for (let i = 0; i < maxParallelism + 1; i++) {
196+
await t.run(async (ctx) => {
197+
// Create a work item
198+
const workId = await ctx.db.insert("work", {
199+
fnType: "mutation",
200+
fnHandle: "testHandle",
201+
fnName: `testFunction${i}`,
202+
fnArgs: { test: i },
203+
attempts: 0,
204+
});
205+
206+
// Create a pendingStart for the work
207+
await ctx.db.insert("pendingStart", {
208+
workId,
209+
segment: 5n, // Some segment between 0 and currentSegment
210+
});
211+
});
212+
}
213+
214+
// Mock the console.event function
215+
const consoleMock = createLoggerMock();
216+
217+
// Get the state document
218+
const state = await t.run(async (ctx) => {
219+
return await ctx.db.get(stateId);
220+
});
221+
assert(state);
222+
223+
// Call generateReport with REPORT log level
224+
await t.run(async (ctx) => {
225+
const { generateReport } = await import("./stats");
226+
await generateReport(ctx, consoleMock, state, {
227+
maxParallelism,
228+
logLevel: "REPORT", // This should trigger reporting
229+
});
230+
});
231+
232+
// Verify that calculateBacklogAndReport was scheduled
233+
await t.run(async (ctx) => {
234+
const scheduledFunctions = await ctx.db.system
235+
.query("_scheduled_functions")
236+
.collect();
237+
238+
expect(scheduledFunctions.length).toBeGreaterThan(0);
239+
240+
// Check that one of the scheduled functions is calculateBacklogAndReport
241+
const calculateBacklogScheduled = scheduledFunctions.find(
242+
(sf) => sf.name === "stats:calculateBacklogAndReport"
243+
);
244+
expect(calculateBacklogScheduled).toBeDefined();
245+
assert(calculateBacklogScheduled);
246+
247+
// Verify console.event was not called yet (will be called by calculateBacklogAndReport)
248+
expect(consoleMock.event).not.toHaveBeenCalled();
249+
});
250+
});
251+
252+
it("should calculate backlog and report correctly", async () => {
253+
// Setup internal state
254+
const stateId = await t.run(async (ctx) => {
255+
return await ctx.db.insert("internalState", {
256+
generation: 1n,
257+
segmentCursors: {
258+
incoming: 0n,
259+
completion: 0n,
260+
cancelation: 0n,
261+
},
262+
lastRecovery: 0n,
263+
report: {
264+
completed: 10,
265+
succeeded: 8,
266+
failed: 1,
267+
retries: 1,
268+
canceled: 0,
269+
lastReportTs: 0,
270+
},
271+
running: [],
272+
});
273+
});
274+
275+
// Create some pending start items
276+
const currentSegment = getCurrentSegment();
277+
278+
// Create 3 work items
279+
for (let i = 0; i < 3; i++) {
280+
await t.run(async (ctx) => {
281+
// Create a work item
282+
const workId = await ctx.db.insert("work", {
283+
fnType: "mutation",
284+
fnHandle: "testHandle",
285+
fnName: `testFunction${i}`,
286+
fnArgs: { test: i },
287+
attempts: 0,
288+
});
289+
290+
// Create a pendingStart for the work
291+
await ctx.db.insert("pendingStart", {
292+
workId,
293+
segment: 5n, // Some segment between 0 and currentSegment
294+
});
295+
});
296+
}
297+
298+
// Get the state document
299+
const state = await t.run(async (ctx) => {
300+
return await ctx.db.get(stateId);
301+
});
302+
assert(state);
303+
304+
const cursor = await t.run(async (ctx) => {
305+
return await paginator(ctx.db, schema)
306+
.query("pendingStart")
307+
.withIndex("segment", (q) =>
308+
q.gte("segment", 0n).lt("segment", currentSegment)
309+
)
310+
.paginate({
311+
numItems: 1,
312+
cursor: null,
313+
});
314+
});
315+
316+
// Call calculateBacklogAndReport directly
317+
await t.mutation(internal.stats.calculateBacklogAndReport, {
318+
startSegment: 0n,
319+
endSegment: currentSegment,
320+
cursor: cursor.continueCursor,
321+
report: state.report,
322+
running: state.running.length,
323+
logLevel: "REPORT",
324+
});
325+
326+
// Verify that console.event was called with the correct data
327+
// Note: We can't directly check the mock since it's created inside the mutation
328+
// Instead, we can check if the function completed successfully
329+
330+
// We can verify the function was executed by checking if any scheduled functions were created
331+
await t.run(async (ctx) => {
332+
const scheduledFunctions = await ctx.db.system
333+
.query("_scheduled_functions")
334+
.collect();
335+
336+
// Since our backlog is small, no additional scheduled functions should be created
337+
const calculateBacklogScheduled = scheduledFunctions.find(
338+
(sf) => sf.name === "stats:calculateBacklogAndReport"
339+
);
340+
expect(calculateBacklogScheduled).toBeUndefined();
341+
});
342+
});
343+
});
344+
});

0 commit comments

Comments
 (0)