From 284a134ca422cda8be87d559abb4297d0b5308b3 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 2 Oct 2025 11:27:24 -0700 Subject: [PATCH] fix(clickhouse): correctly format datetime64(9) input format --- .../app/presenters/v3/RunPresenter.server.ts | 3 +- .../clickhouseEventRepository.server.ts | 64 ++++++++++++++----- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/apps/webapp/app/presenters/v3/RunPresenter.server.ts b/apps/webapp/app/presenters/v3/RunPresenter.server.ts index 40b5871190..437d6b6458 100644 --- a/apps/webapp/app/presenters/v3/RunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/RunPresenter.server.ts @@ -180,7 +180,8 @@ export class RunPresenter { run.status === "PAUSED" || run.status === "RETRYING_AFTER_FAILURE" || run.status === "DEQUEUED" || - run.status === "EXECUTING", + run.status === "EXECUTING" || + run.status === "WAITING_TO_RESUME", isCancelled: run.status === "CANCELED", isDebug: false, level: "TRACE", diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index 4274a678ea..c3e74fb3f5 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -111,6 +111,14 @@ export class ClickhouseEventRepository implements IEventRepository { span.setAttribute("flush_id", flushId); span.setAttribute("event_count", events.length); + const firstEvent = events[0]; + + if (firstEvent) { + logger.debug("ClickhouseEventRepository.flushBatch first event", { + event: firstEvent, + }); + } + const [insertError, insertResult] = await this._clickhouse.taskEvents.insert(events); if (insertError) { @@ -147,7 +155,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: event.startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(event.startTime.toString()), duration: (event.duration ?? 0).toString(), trace_id: event.traceId, span_id: event.spanId, @@ -177,7 +185,11 @@ export class ClickhouseEventRepository implements IEventRepository { } // Only return events where the event start_time is greater than the span start_time - return records.filter((r) => BigInt(r.start_time) > BigInt(event.startTime)); + return records.filter( + (r) => + convertClickhouseDate64NanosecondsEpochStringToBigInt(r.start_time) > + BigInt(event.startTime) + ); } private createTaskEventV1InputFromSpanEvent( @@ -209,7 +221,9 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: convertDateToNanoseconds(spanEvent.time).toString(), + start_time: formatClickhouseDate64NanosecondsEpochString( + convertDateToNanoseconds(spanEvent.time).toString() + ), duration: "0", // Events have no duration trace_id: event.traceId, span_id: event.spanId, @@ -243,7 +257,9 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: convertDateToNanoseconds(spanEvent.time).toString(), + start_time: formatClickhouseDate64NanosecondsEpochString( + convertDateToNanoseconds(spanEvent.time).toString() + ), duration: "0", // Events have no duration trace_id: event.traceId, span_id: event.spanId, @@ -271,7 +287,9 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: convertDateToNanoseconds(spanEvent.time).toString(), + start_time: formatClickhouseDate64NanosecondsEpochString( + convertDateToNanoseconds(spanEvent.time).toString() + ), duration: "0", // Events have no duration trace_id: event.traceId, span_id: event.spanId, @@ -303,7 +321,9 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: event.projectId, task_identifier: event.taskSlug, run_id: event.runId, - start_time: convertDateToNanoseconds(spanEvent.time).toString(), + start_time: formatClickhouseDate64NanosecondsEpochString( + convertDateToNanoseconds(spanEvent.time).toString() + ), duration: "0", // Events have no duration trace_id: event.traceId, span_id: event.spanId, @@ -439,7 +459,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: options.environment.projectId, task_identifier: options.taskSlug, run_id: options.attributes.runId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: duration.toString(), trace_id: traceId, span_id: spanId, @@ -540,7 +560,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: options.environment.projectId, task_identifier: options.taskSlug, run_id: options.attributes.runId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: String(options.incomplete ? 0 : duration), trace_id: traceId, span_id: spanId, @@ -574,7 +594,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: options.environment.projectId, task_identifier: options.taskSlug, run_id: options.attributes.runId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: String(options.incomplete ? 0 : duration), trace_id: traceId, span_id: spanId, @@ -623,7 +643,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: run.projectId, task_identifier: run.taskIdentifier, run_id: run.friendlyId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), trace_id: run.traceId, span_id: run.spanId, @@ -671,7 +691,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: run.projectId, task_identifier: run.taskIdentifier, run_id: blockedRun.friendlyId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), trace_id: blockedRun.traceId, span_id: spanId, @@ -711,7 +731,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: run.projectId, task_identifier: run.taskIdentifier, run_id: run.friendlyId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), trace_id: run.traceId, span_id: run.spanId, @@ -757,7 +777,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: run.projectId, task_identifier: run.taskIdentifier, run_id: run.friendlyId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), trace_id: run.traceId, span_id: run.spanId, @@ -803,7 +823,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: run.projectId, task_identifier: run.taskIdentifier, run_id: run.friendlyId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: "0", trace_id: run.traceId, span_id: run.spanId, @@ -847,7 +867,7 @@ export class ClickhouseEventRepository implements IEventRepository { project_id: run.projectId, task_identifier: run.taskIdentifier, run_id: run.friendlyId, - start_time: startTime.toString(), + start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), duration: calculateDurationFromStart(startTime, cancelledAt).toString(), trace_id: run.traceId, span_id: run.spanId, @@ -1807,3 +1827,17 @@ function isLogEvent(kind: string): boolean { function calculateEndTimeFromStartTime(startTime: Date, duration: number): Date { return new Date(startTime.getTime() + duration / 1_000_000); } + +// This will take a string like "1759427319944999936" and return "1759427319.944999936" +function formatClickhouseDate64NanosecondsEpochString(date: string): string { + if (date.length !== 19) { + return date; + } + + return date.substring(0, 10) + "." + date.substring(10); +} + +function convertClickhouseDate64NanosecondsEpochStringToBigInt(date: string): bigint { + const parts = date.split("."); + return BigInt(parts.join("")); +}