Skip to content

Commit 4dfadb9

Browse files
refactor(webapp): move reconciliation logic to separate file for better testability
1 parent 4c3f0e8 commit 4dfadb9

File tree

2 files changed

+125
-71
lines changed

2 files changed

+125
-71
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 36 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ export class RunPresenter {
124124
isFinished: isFinalRunStatus(run.status),
125125
startedAt: run.startedAt,
126126
completedAt: run.completedAt,
127-
createdAt: run.createdAt,
128127
logsDeletedAt: showDeletedLogs ? null : run.logsDeletedAt,
129128
rootTaskRun: run.rootTaskRun,
130129
parentTaskRun: run.parentTaskRun,
@@ -209,41 +208,16 @@ export class RunPresenter {
209208

210209
//we need the start offset for each item, and the total duration of the entire tree
211210
const treeRootStartTimeMs = tree ? tree?.data.startTime.getTime() : 0;
212-
213-
const postgresRunDuration =
214-
runData.isFinished && run.completedAt
215-
? millisecondsToNanoseconds(
216-
run.completedAt.getTime() -
217-
(run.rootTaskRun?.createdAt ?? run.createdAt).getTime()
218-
)
219-
: 0;
220-
221211
let totalDuration = tree?.data.duration ?? 0;
222212
const events = tree
223-
? flattenTree(tree).map((n, index) => {
224-
const isRoot = index === 0;
225-
const offset = millisecondsToNanoseconds(n.data.startTime.getTime() - treeRootStartTimeMs);
226-
227-
let nIsPartial = n.data.isPartial;
228-
let nDuration = n.data.duration;
229-
let nIsError = n.data.isError;
230-
231-
// NOTE: Clickhouse trace ingestion is eventually consistent.
232-
// When a run is marked finished in Postgres, we reconcile the
233-
// root span to reflect completion even if telemetry is still partial.
234-
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
235-
// run states in the dashboard.
236-
if (isRoot && runData.isFinished && nIsPartial) {
237-
nIsPartial = false;
238-
nDuration = Math.max(nDuration ?? 0, postgresRunDuration);
239-
nIsError = isFailedRunStatus(runData.status);
240-
}
241-
213+
? flattenTree(tree).map((n) => {
214+
const offset = millisecondsToNanoseconds(
215+
n.data.startTime.getTime() - treeRootStartTimeMs
216+
);
242217
//only let non-debug events extend the total duration
243218
if (!n.data.isDebug) {
244-
totalDuration = Math.max(totalDuration, offset + (nIsPartial ? 0 : nDuration));
219+
totalDuration = Math.max(totalDuration, offset + n.data.duration);
245220
}
246-
247221
return {
248222
...n,
249223
data: {
@@ -254,24 +228,23 @@ export class RunPresenter {
254228
treeRootStartTimeMs
255229
),
256230
//set partial nodes to null duration
257-
duration: nIsPartial ? null : nDuration,
258-
isPartial: nIsPartial,
259-
isError: nIsError,
231+
duration: n.data.isPartial ? null : n.data.duration,
260232
offset,
261-
isRoot,
233+
isRoot: n.id === traceSummary.rootSpan.id,
262234
},
263235
};
264236
})
265237
: [];
266238

267-
if (runData.isFinished) {
268-
totalDuration = Math.max(totalDuration, postgresRunDuration);
269-
}
270-
271239
//total duration should be a minimum of 1ms
272240
totalDuration = Math.max(totalDuration, millisecondsToNanoseconds(1));
273241

274-
const reconciled = reconcileTraceWithRunLifecycle(runData, traceSummary.rootSpan.id, events, totalDuration);
242+
const reconciled = reconcileTraceWithRunLifecycle(
243+
runData,
244+
traceSummary.rootSpan.id,
245+
events,
246+
totalDuration
247+
);
275248

276249
return {
277250
run: runData,
@@ -312,17 +285,14 @@ export function reconcileTraceWithRunLifecycle(
312285
totalDuration: number;
313286
rootSpanStatus: "executing" | "completed" | "failed";
314287
} {
315-
const rootEvent = events[0];
316-
const isActualRoot = rootEvent?.id === rootSpanId;
317-
318-
const currentStatus: "executing" | "completed" | "failed" =
319-
isActualRoot && rootEvent
320-
? rootEvent.data.isError
321-
? "failed"
322-
: !rootEvent.data.isPartial
323-
? "completed"
324-
: "executing"
325-
: "executing";
288+
const rootEvent = events.find((e) => e.id === rootSpanId);
289+
const currentStatus: "executing" | "completed" | "failed" = rootEvent
290+
? rootEvent.data.isError
291+
? "failed"
292+
: !rootEvent.data.isPartial
293+
? "completed"
294+
: "executing"
295+
: "executing";
326296

327297
if (!runData.isFinished) {
328298
return { events, totalDuration, rootSpanStatus: currentStatus };
@@ -337,28 +307,23 @@ export function reconcileTraceWithRunLifecycle(
337307

338308
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
339309

340-
// We only need to potentially update the root event (the first one) if it matches our ID
341-
if (isActualRoot && rootEvent && rootEvent.data.isPartial) {
342-
const updatedEvents = [...events];
343-
updatedEvents[0] = {
344-
...rootEvent,
345-
data: {
346-
...rootEvent.data,
347-
isPartial: false,
348-
duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration),
349-
isError: isFailedRunStatus(runData.status),
350-
},
351-
};
352-
353-
return {
354-
events: updatedEvents,
355-
totalDuration: updatedTotalDuration,
356-
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
357-
};
358-
}
310+
const updatedEvents = events.map((e) => {
311+
if (e.id === rootSpanId && e.data.isPartial) {
312+
return {
313+
...e,
314+
data: {
315+
...e.data,
316+
isPartial: false,
317+
duration: Math.max(e.data.duration ?? 0, postgresRunDuration),
318+
isError: isFailedRunStatus(runData.status),
319+
},
320+
};
321+
}
322+
return e;
323+
});
359324

360325
return {
361-
events,
326+
events: updatedEvents,
362327
totalDuration: updatedTotalDuration,
363328
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
364329
};
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { millisecondsToNanoseconds } from "@trigger.dev/core/v3";
2+
import { isFailedRunStatus } from "~/v3/taskStatus";
3+
import type { TaskRunStatus } from "@trigger.dev/database";
4+
5+
export type ReconcileRunData = {
6+
isFinished: boolean;
7+
status: TaskRunStatus;
8+
createdAt: Date;
9+
completedAt: Date | null;
10+
rootTaskRun: { createdAt: Date } | null;
11+
};
12+
13+
export type ReconcileEvent = {
14+
id: string;
15+
data: {
16+
isPartial: boolean;
17+
isError: boolean;
18+
duration?: number | null;
19+
};
20+
};
21+
22+
export type ReconcileResult = {
23+
events: any[];
24+
totalDuration: number;
25+
rootSpanStatus: "executing" | "completed" | "failed";
26+
};
27+
28+
// NOTE: Clickhouse trace ingestion is eventually consistent.
29+
// When a run is marked finished in Postgres, we reconcile the
30+
// root span to reflect completion even if telemetry is still partial.
31+
// This is a deliberate UI-layer tradeoff to prevent stale or "stuck"
32+
// run states in the dashboard.
33+
export function reconcileTraceWithRunLifecycle(
34+
runData: ReconcileRunData,
35+
rootSpanId: string,
36+
events: any[],
37+
totalDuration: number
38+
): ReconcileResult {
39+
const rootEvent = events[0];
40+
const isActualRoot = rootEvent?.id === rootSpanId;
41+
42+
const currentStatus: "executing" | "completed" | "failed" =
43+
isActualRoot && rootEvent
44+
? rootEvent.data.isError
45+
? "failed"
46+
: !rootEvent.data.isPartial
47+
? "completed"
48+
: "executing"
49+
: "executing";
50+
51+
if (!runData.isFinished) {
52+
return { events, totalDuration, rootSpanStatus: currentStatus };
53+
}
54+
55+
const postgresRunDuration = runData.completedAt
56+
? millisecondsToNanoseconds(
57+
runData.completedAt.getTime() -
58+
(runData.rootTaskRun?.createdAt ?? runData.createdAt).getTime()
59+
)
60+
: 0;
61+
62+
const updatedTotalDuration = Math.max(totalDuration, postgresRunDuration);
63+
64+
// We only need to potentially update the root event (the first one) if it matches our ID
65+
if (isActualRoot && rootEvent && rootEvent.data.isPartial) {
66+
const updatedEvents = [...events];
67+
updatedEvents[0] = {
68+
...rootEvent,
69+
data: {
70+
...rootEvent.data,
71+
isPartial: false,
72+
duration: Math.max(rootEvent.data.duration ?? 0, postgresRunDuration),
73+
isError: isFailedRunStatus(runData.status),
74+
},
75+
};
76+
77+
return {
78+
events: updatedEvents,
79+
totalDuration: updatedTotalDuration,
80+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
81+
};
82+
}
83+
84+
return {
85+
events,
86+
totalDuration: updatedTotalDuration,
87+
rootSpanStatus: isFailedRunStatus(runData.status) ? "failed" : "completed",
88+
};
89+
}

0 commit comments

Comments
 (0)