diff --git a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts index c3e74fb3f5..4ac570c183 100644 --- a/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts @@ -156,7 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: event.taskSlug, run_id: event.runId, start_time: formatClickhouseDate64NanosecondsEpochString(event.startTime.toString()), - duration: (event.duration ?? 0).toString(), + duration: formatClickhouseUnsignedIntegerString(event.duration ?? 0), trace_id: event.traceId, span_id: event.spanId, parent_span_id: event.parentId ?? "", @@ -432,7 +432,9 @@ export class ClickhouseEventRepository implements IEventRepository { const startTime = options.startTime ?? getNowInNanoseconds(); const duration = options.duration ?? - (options.endTime ? calculateDurationFromStart(startTime, options.endTime) : 100); + (options.endTime + ? calculateDurationFromStart(startTime, options.endTime, 100 * 1_000_000) + : 100); const traceId = propagatedContext?.traceparent?.traceId ?? generateTraceId(); const parentId = options.parentId ?? propagatedContext?.traceparent?.spanId; @@ -460,7 +462,7 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: options.taskSlug, run_id: options.attributes.runId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: duration.toString(), + duration: formatClickhouseUnsignedIntegerString(duration), trace_id: traceId, span_id: spanId, parent_span_id: parentId ?? "", @@ -561,7 +563,7 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: options.taskSlug, run_id: options.attributes.runId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: String(options.incomplete ? 0 : duration), + duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration), trace_id: traceId, span_id: spanId, parent_span_id: parentId ?? "", @@ -595,7 +597,7 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: options.taskSlug, run_id: options.attributes.runId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: String(options.incomplete ? 0 : duration), + duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration), trace_id: traceId, span_id: spanId, parent_span_id: parentId ?? "", @@ -644,7 +646,9 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: run.taskIdentifier, run_id: run.friendlyId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), + duration: formatClickhouseUnsignedIntegerString( + calculateDurationFromStart(startTime, endTime ?? new Date()) + ), trace_id: run.traceId, span_id: run.spanId, parent_span_id: run.parentSpanId ?? "", @@ -692,7 +696,9 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: run.taskIdentifier, run_id: blockedRun.friendlyId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), + duration: formatClickhouseUnsignedIntegerString( + calculateDurationFromStart(startTime, endTime ?? new Date()) + ), trace_id: blockedRun.traceId, span_id: spanId, parent_span_id: parentSpanId, @@ -732,7 +738,9 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: run.taskIdentifier, run_id: run.friendlyId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), + duration: formatClickhouseUnsignedIntegerString( + calculateDurationFromStart(startTime, endTime ?? new Date()) + ), trace_id: run.traceId, span_id: run.spanId, parent_span_id: run.parentSpanId ?? "", @@ -778,7 +786,9 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: run.taskIdentifier, run_id: run.friendlyId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(), + duration: formatClickhouseUnsignedIntegerString( + calculateDurationFromStart(startTime, endTime ?? new Date()) + ), trace_id: run.traceId, span_id: run.spanId, parent_span_id: run.parentSpanId ?? "", @@ -868,7 +878,9 @@ export class ClickhouseEventRepository implements IEventRepository { task_identifier: run.taskIdentifier, run_id: run.friendlyId, start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()), - duration: calculateDurationFromStart(startTime, cancelledAt).toString(), + duration: formatClickhouseUnsignedIntegerString( + calculateDurationFromStart(startTime, cancelledAt) + ), trace_id: run.traceId, span_id: run.spanId, parent_span_id: run.parentSpanId ?? "", @@ -1841,3 +1853,15 @@ function convertClickhouseDate64NanosecondsEpochStringToBigInt(date: string): bi const parts = date.split("."); return BigInt(parts.join("")); } + +function formatClickhouseUnsignedIntegerString(value: number | bigint): string { + if (value < 0) { + return "0"; + } + + if (typeof value === "bigint") { + return value.toString(); + } + + return Math.floor(value).toString(); +} diff --git a/apps/webapp/app/v3/eventRepository/common.server.ts b/apps/webapp/app/v3/eventRepository/common.server.ts index ecfc9cfdc7..2e3bdf37c5 100644 --- a/apps/webapp/app/v3/eventRepository/common.server.ts +++ b/apps/webapp/app/v3/eventRepository/common.server.ts @@ -28,10 +28,20 @@ export function getDateFromNanoseconds(nanoseconds: bigint): Date { return new Date(Number(nanoseconds) / 1_000_000); } -export function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) { +export function calculateDurationFromStart( + startTime: bigint, + endTime: Date = new Date(), + minimumDuration?: number +) { const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime; - return Number(BigInt($endtime.getTime() * 1_000_000) - startTime); + const duration = Number(BigInt($endtime.getTime() * 1_000_000) - startTime); + + if (minimumDuration && duration < minimumDuration) { + return minimumDuration; + } + + return duration; } export function calculateDurationFromStartJsDate(startTime: Date, endTime: Date = new Date()) { diff --git a/apps/webapp/app/v3/eventRepository/index.server.ts b/apps/webapp/app/v3/eventRepository/index.server.ts index cda9e58940..1b423022c6 100644 --- a/apps/webapp/app/v3/eventRepository/index.server.ts +++ b/apps/webapp/app/v3/eventRepository/index.server.ts @@ -2,7 +2,7 @@ import { env } from "~/env.server"; import { eventRepository } from "./eventRepository.server"; import { clickhouseEventRepository } from "./clickhouseEventRepositoryInstance.server"; import { IEventRepository, TraceEventOptions } from "./eventRepository.types"; -import { $replica } from "~/db.server"; +import { $replica, prisma } from "~/db.server"; import { logger } from "~/services/logger.server"; import { FEATURE_FLAG, flags } from "../featureFlags.server"; import { getTaskEventStore } from "../taskEventStore.server"; @@ -145,7 +145,7 @@ async function recordRunEvent( } async function findRunForEventCreation(runId: string) { - return $replica.taskRun.findFirst({ + return prisma.taskRun.findFirst({ where: { id: runId, }, diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index 7c0b7003e0..58b02dd0e5 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -403,7 +403,13 @@ export function registerRunEngineEventBusHandlers() { engine.eventBus.on("runRetryScheduled", async ({ time, run, environment, retryAt }) => { try { - let retryMessage = `Retry #${run.attemptNumber} delay`; + if (retryAt && time && time >= retryAt) { + return; + } + + let retryMessage = `Retry ${ + typeof run.attemptNumber === "number" ? `#${run.attemptNumber - 1}` : "" + } delay`; if (run.nextMachineAfterOOM) { retryMessage += ` after OOM`;