Skip to content

Commit eeed38d

Browse files
authored
fix(clickhouse): correctly format datetime64(9) input format (#2580)
1 parent 0ca0926 commit eeed38d

File tree

2 files changed

+51
-16
lines changed

2 files changed

+51
-16
lines changed

apps/webapp/app/presenters/v3/RunPresenter.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ export class RunPresenter {
180180
run.status === "PAUSED" ||
181181
run.status === "RETRYING_AFTER_FAILURE" ||
182182
run.status === "DEQUEUED" ||
183-
run.status === "EXECUTING",
183+
run.status === "EXECUTING" ||
184+
run.status === "WAITING_TO_RESUME",
184185
isCancelled: run.status === "CANCELED",
185186
isDebug: false,
186187
level: "TRACE",

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ export class ClickhouseEventRepository implements IEventRepository {
111111
span.setAttribute("flush_id", flushId);
112112
span.setAttribute("event_count", events.length);
113113

114+
const firstEvent = events[0];
115+
116+
if (firstEvent) {
117+
logger.debug("ClickhouseEventRepository.flushBatch first event", {
118+
event: firstEvent,
119+
});
120+
}
121+
114122
const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events);
115123

116124
if (insertError) {
@@ -147,7 +155,7 @@ export class ClickhouseEventRepository implements IEventRepository {
147155
project_id: event.projectId,
148156
task_identifier: event.taskSlug,
149157
run_id: event.runId,
150-
start_time: event.startTime.toString(),
158+
start_time: formatClickhouseDate64NanosecondsEpochString(event.startTime.toString()),
151159
duration: (event.duration ?? 0).toString(),
152160
trace_id: event.traceId,
153161
span_id: event.spanId,
@@ -177,7 +185,11 @@ export class ClickhouseEventRepository implements IEventRepository {
177185
}
178186

179187
// Only return events where the event start_time is greater than the span start_time
180-
return records.filter((r) => BigInt(r.start_time) > BigInt(event.startTime));
188+
return records.filter(
189+
(r) =>
190+
convertClickhouseDate64NanosecondsEpochStringToBigInt(r.start_time) >
191+
BigInt(event.startTime)
192+
);
181193
}
182194

183195
private createTaskEventV1InputFromSpanEvent(
@@ -209,7 +221,9 @@ export class ClickhouseEventRepository implements IEventRepository {
209221
project_id: event.projectId,
210222
task_identifier: event.taskSlug,
211223
run_id: event.runId,
212-
start_time: convertDateToNanoseconds(spanEvent.time).toString(),
224+
start_time: formatClickhouseDate64NanosecondsEpochString(
225+
convertDateToNanoseconds(spanEvent.time).toString()
226+
),
213227
duration: "0", // Events have no duration
214228
trace_id: event.traceId,
215229
span_id: event.spanId,
@@ -243,7 +257,9 @@ export class ClickhouseEventRepository implements IEventRepository {
243257
project_id: event.projectId,
244258
task_identifier: event.taskSlug,
245259
run_id: event.runId,
246-
start_time: convertDateToNanoseconds(spanEvent.time).toString(),
260+
start_time: formatClickhouseDate64NanosecondsEpochString(
261+
convertDateToNanoseconds(spanEvent.time).toString()
262+
),
247263
duration: "0", // Events have no duration
248264
trace_id: event.traceId,
249265
span_id: event.spanId,
@@ -271,7 +287,9 @@ export class ClickhouseEventRepository implements IEventRepository {
271287
project_id: event.projectId,
272288
task_identifier: event.taskSlug,
273289
run_id: event.runId,
274-
start_time: convertDateToNanoseconds(spanEvent.time).toString(),
290+
start_time: formatClickhouseDate64NanosecondsEpochString(
291+
convertDateToNanoseconds(spanEvent.time).toString()
292+
),
275293
duration: "0", // Events have no duration
276294
trace_id: event.traceId,
277295
span_id: event.spanId,
@@ -303,7 +321,9 @@ export class ClickhouseEventRepository implements IEventRepository {
303321
project_id: event.projectId,
304322
task_identifier: event.taskSlug,
305323
run_id: event.runId,
306-
start_time: convertDateToNanoseconds(spanEvent.time).toString(),
324+
start_time: formatClickhouseDate64NanosecondsEpochString(
325+
convertDateToNanoseconds(spanEvent.time).toString()
326+
),
307327
duration: "0", // Events have no duration
308328
trace_id: event.traceId,
309329
span_id: event.spanId,
@@ -439,7 +459,7 @@ export class ClickhouseEventRepository implements IEventRepository {
439459
project_id: options.environment.projectId,
440460
task_identifier: options.taskSlug,
441461
run_id: options.attributes.runId,
442-
start_time: startTime.toString(),
462+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
443463
duration: duration.toString(),
444464
trace_id: traceId,
445465
span_id: spanId,
@@ -540,7 +560,7 @@ export class ClickhouseEventRepository implements IEventRepository {
540560
project_id: options.environment.projectId,
541561
task_identifier: options.taskSlug,
542562
run_id: options.attributes.runId,
543-
start_time: startTime.toString(),
563+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
544564
duration: String(options.incomplete ? 0 : duration),
545565
trace_id: traceId,
546566
span_id: spanId,
@@ -574,7 +594,7 @@ export class ClickhouseEventRepository implements IEventRepository {
574594
project_id: options.environment.projectId,
575595
task_identifier: options.taskSlug,
576596
run_id: options.attributes.runId,
577-
start_time: startTime.toString(),
597+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
578598
duration: String(options.incomplete ? 0 : duration),
579599
trace_id: traceId,
580600
span_id: spanId,
@@ -623,7 +643,7 @@ export class ClickhouseEventRepository implements IEventRepository {
623643
project_id: run.projectId,
624644
task_identifier: run.taskIdentifier,
625645
run_id: run.friendlyId,
626-
start_time: startTime.toString(),
646+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
627647
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
628648
trace_id: run.traceId,
629649
span_id: run.spanId,
@@ -671,7 +691,7 @@ export class ClickhouseEventRepository implements IEventRepository {
671691
project_id: run.projectId,
672692
task_identifier: run.taskIdentifier,
673693
run_id: blockedRun.friendlyId,
674-
start_time: startTime.toString(),
694+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
675695
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
676696
trace_id: blockedRun.traceId,
677697
span_id: spanId,
@@ -711,7 +731,7 @@ export class ClickhouseEventRepository implements IEventRepository {
711731
project_id: run.projectId,
712732
task_identifier: run.taskIdentifier,
713733
run_id: run.friendlyId,
714-
start_time: startTime.toString(),
734+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
715735
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
716736
trace_id: run.traceId,
717737
span_id: run.spanId,
@@ -757,7 +777,7 @@ export class ClickhouseEventRepository implements IEventRepository {
757777
project_id: run.projectId,
758778
task_identifier: run.taskIdentifier,
759779
run_id: run.friendlyId,
760-
start_time: startTime.toString(),
780+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
761781
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
762782
trace_id: run.traceId,
763783
span_id: run.spanId,
@@ -803,7 +823,7 @@ export class ClickhouseEventRepository implements IEventRepository {
803823
project_id: run.projectId,
804824
task_identifier: run.taskIdentifier,
805825
run_id: run.friendlyId,
806-
start_time: startTime.toString(),
826+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
807827
duration: "0",
808828
trace_id: run.traceId,
809829
span_id: run.spanId,
@@ -847,7 +867,7 @@ export class ClickhouseEventRepository implements IEventRepository {
847867
project_id: run.projectId,
848868
task_identifier: run.taskIdentifier,
849869
run_id: run.friendlyId,
850-
start_time: startTime.toString(),
870+
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
851871
duration: calculateDurationFromStart(startTime, cancelledAt).toString(),
852872
trace_id: run.traceId,
853873
span_id: run.spanId,
@@ -1807,3 +1827,17 @@ function isLogEvent(kind: string): boolean {
18071827
function calculateEndTimeFromStartTime(startTime: Date, duration: number): Date {
18081828
return new Date(startTime.getTime() + duration / 1_000_000);
18091829
}
1830+
1831+
// This will take a string like "1759427319944999936" and return "1759427319.944999936"
1832+
function formatClickhouseDate64NanosecondsEpochString(date: string): string {
1833+
if (date.length !== 19) {
1834+
return date;
1835+
}
1836+
1837+
return date.substring(0, 10) + "." + date.substring(10);
1838+
}
1839+
1840+
function convertClickhouseDate64NanosecondsEpochStringToBigInt(date: string): bigint {
1841+
const parts = date.split(".");
1842+
return BigInt(parts.join(""));
1843+
}

0 commit comments

Comments
 (0)