From 12fa8842d0ecc88989c548d70a058f4a82948e11 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Wed, 25 Jun 2025 17:14:16 +0100 Subject: [PATCH 1/3] Improve clickhouse client debug logging --- apps/webapp/app/env.server.ts | 3 ++ .../runsReplicationInstance.server.ts | 2 +- .../clickhouse/src/client/client.ts | 32 +++++++++++++++++++ internal-packages/clickhouse/src/index.ts | 2 +- .../clickhouse/src/taskRuns.test.ts | 1 + 5 files changed, 38 insertions(+), 2 deletions(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 226ec1c7e4..9136e44a1a 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -831,6 +831,9 @@ const EnvironmentSchema = z.object({ RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10), RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"), + RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL: z + .enum(["log", "error", "warn", "info", "debug"]) + .default("info"), RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000), RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500), RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"), diff --git a/apps/webapp/app/services/runsReplicationInstance.server.ts b/apps/webapp/app/services/runsReplicationInstance.server.ts index c332afa6d8..8a8b0df650 100644 --- a/apps/webapp/app/services/runsReplicationInstance.server.ts +++ b/apps/webapp/app/services/runsReplicationInstance.server.ts @@ -29,7 +29,7 @@ function initializeRunsReplicationInstance() { enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1", idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS, }, - logLevel: env.RUN_REPLICATION_LOG_LEVEL, + logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL, compression: { request: true, }, diff --git a/internal-packages/clickhouse/src/client/client.ts b/internal-packages/clickhouse/src/client/client.ts index 132c0220ee..99fff59267 100644 --- a/internal-packages/clickhouse/src/client/client.ts +++ b/internal-packages/clickhouse/src/client/client.ts @@ -21,6 +21,7 @@ import { Logger, type LogLevel } from "@trigger.dev/core/logger"; import type { Agent as HttpAgent } from "http"; import type { Agent as HttpsAgent } from "https"; import { ClickhouseQueryBuilder } from "./queryBuilder.js"; +import { randomUUID } from "node:crypto"; export type ClickhouseConfig = { name: string; @@ -103,6 +104,8 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { settings?: ClickHouseSettings; }): ClickhouseQueryFunction, z.output> { return async (params, options) => { + const queryId = randomUUID(); + return await startSpan(this.tracer, "query", async (span) => { this.logger.debug("Querying clickhouse", { name: req.name, @@ -110,11 +113,13 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { params, settings: req.settings, attributes: options?.attributes, + queryId, }); span.setAttributes({ "clickhouse.clientName": this.name, "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, ...flattenAttributes(req.settings, "clickhouse.settings"), ...flattenAttributes(options?.attributes), }); @@ -129,6 +134,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { error: validParams.error, query: req.query, params, + queryId, }); return [ @@ -146,6 +152,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { query: req.query, query_params: validParams?.data, format: "JSONEachRow", + query_id: queryId, ...options?.params, clickhouse_settings: { ...req.settings, @@ -160,6 +167,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { error: clickhouseError, query: req.query, params, + queryId, }); recordClickhouseError(span, clickhouseError); @@ -195,6 +203,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { error: parsed.error, query: req.query, params, + queryId, }); const queryError = new QueryError(generateErrorMessage(parsed.error.issues), { @@ -235,11 +244,25 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { settings?: ClickHouseSettings; }): ClickhouseInsertFunction> { return async (events, options) => { + const queryId = randomUUID(); + return await startSpan(this.tracer, "insert", async (span) => { + this.logger.debug("Inserting into clickhouse", { + clientName: this.name, + name: req.name, + table: req.table, + events: Array.isArray(events) ? events.length : 1, + settings: req.settings, + attributes: options?.attributes, + options, + queryId, + }); + span.setAttributes({ "clickhouse.clientName": this.name, "clickhouse.tableName": req.table, "clickhouse.operationName": req.name, + "clickhouse.queryId": queryId, ...flattenAttributes(req.settings, "clickhouse.settings"), ...flattenAttributes(options?.attributes), }); @@ -271,6 +294,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { table: req.table, format: "JSONEachRow", values: Array.isArray(validatedEvents) ? validatedEvents : [validatedEvents], + query_id: queryId, ...options?.params, clickhouse_settings: { ...req.settings, @@ -291,6 +315,14 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter { return [new InsertError(clickhouseError.message), null]; } + this.logger.debug("Inserted into clickhouse", { + clientName: this.name, + name: req.name, + table: req.table, + result, + queryId, + }); + span.setAttributes({ "clickhouse.query_id": result.query_id, "clickhouse.executed": result.executed, diff --git a/internal-packages/clickhouse/src/index.ts b/internal-packages/clickhouse/src/index.ts index ee2967ea9f..7e0894ff1a 100644 --- a/internal-packages/clickhouse/src/index.ts +++ b/internal-packages/clickhouse/src/index.ts @@ -56,7 +56,7 @@ export class ClickHouse { private _splitClients: boolean; constructor(config: ClickHouseConfig) { - this.logger = config.logger ?? new Logger("ClickHouse", "debug"); + this.logger = config.logger ?? new Logger("ClickHouse", config.logLevel ?? "debug"); if (config.url) { const url = new URL(config.url); diff --git a/internal-packages/clickhouse/src/taskRuns.test.ts b/internal-packages/clickhouse/src/taskRuns.test.ts index 6aa9ae8ab9..30ea0270ba 100644 --- a/internal-packages/clickhouse/src/taskRuns.test.ts +++ b/internal-packages/clickhouse/src/taskRuns.test.ts @@ -8,6 +8,7 @@ describe("Task Runs V2", () => { const client = new ClickhouseClient({ name: "test", url: clickhouseContainer.getConnectionUrl(), + logLevel: "debug", }); const insert = insertTaskRuns(client, { From 87366af35b348ea67ec94521e3598d742fcd274f Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Jun 2025 12:08:46 +0100 Subject: [PATCH 2/3] Stop hardcoding ScheduleEngineWorker log level --- internal-packages/schedule-engine/src/engine/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 83f7ea93f3..3efafe73ca 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -97,7 +97,7 @@ export class ScheduleEngine { }, pollIntervalMs: options.worker.pollIntervalMs, shutdownTimeoutMs: options.worker.shutdownTimeoutMs, - logger: new Logger("ScheduleEngineWorker", "debug"), + logger: new Logger("ScheduleEngineWorker", (options.logLevel ?? "info") as any), jobs: { "schedule.triggerScheduledTask": this.#handleTriggerScheduledTaskJob.bind(this), }, From f1d56c6eb51be08ca33f1262db7eaea247327fd9 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 26 Jun 2025 14:58:44 +0100 Subject: [PATCH 3/3] Add DLQ debug log message --- packages/redis-worker/src/queue.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index dd692f51fd..b09a9979bd 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -318,6 +318,12 @@ export class SimpleQueue { async moveToDeadLetterQueue(id: string, errorMessage: string): Promise { try { + this.logger.debug(`SimpleQueue ${this.name}.moveToDeadLetterQueue(): moving item to DLQ`, { + queue: this.name, + id, + errorMessage, + }); + const result = await this.redis.moveToDeadLetterQueue( `queue`, `items`,