Skip to content

Commit b87e1c4

Browse files
authored
Improve clickhouse client debug logging (#2197)
* Improve clickhouse client debug logging * Stop hardcoding ScheduleEngineWorker log level * Add DLQ debug log message
1 parent 14058d5 commit b87e1c4

File tree

7 files changed

+45
-3
lines changed

7 files changed

+45
-3
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,6 +831,9 @@ const EnvironmentSchema = z.object({
831831
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
832832
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
833833
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
834+
RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
835+
.enum(["log", "error", "warn", "info", "debug"])
836+
.default("info"),
834837
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
835838
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
836839
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ function initializeRunsReplicationInstance() {
2929
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
3030
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
3131
},
32-
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
32+
logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL,
3333
compression: {
3434
request: true,
3535
},

internal-packages/clickhouse/src/client/client.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { Logger, type LogLevel } from "@trigger.dev/core/logger";
2121
import type { Agent as HttpAgent } from "http";
2222
import type { Agent as HttpsAgent } from "https";
2323
import { ClickhouseQueryBuilder } from "./queryBuilder.js";
24+
import { randomUUID } from "node:crypto";
2425

2526
export type ClickhouseConfig = {
2627
name: string;
@@ -103,18 +104,22 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
103104
settings?: ClickHouseSettings;
104105
}): ClickhouseQueryFunction<z.input<TIn>, z.output<TOut>> {
105106
return async (params, options) => {
107+
const queryId = randomUUID();
108+
106109
return await startSpan(this.tracer, "query", async (span) => {
107110
this.logger.debug("Querying clickhouse", {
108111
name: req.name,
109112
query: req.query.replace(/\s+/g, " "),
110113
params,
111114
settings: req.settings,
112115
attributes: options?.attributes,
116+
queryId,
113117
});
114118

115119
span.setAttributes({
116120
"clickhouse.clientName": this.name,
117121
"clickhouse.operationName": req.name,
122+
"clickhouse.queryId": queryId,
118123
...flattenAttributes(req.settings, "clickhouse.settings"),
119124
...flattenAttributes(options?.attributes),
120125
});
@@ -129,6 +134,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
129134
error: validParams.error,
130135
query: req.query,
131136
params,
137+
queryId,
132138
});
133139

134140
return [
@@ -146,6 +152,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
146152
query: req.query,
147153
query_params: validParams?.data,
148154
format: "JSONEachRow",
155+
query_id: queryId,
149156
...options?.params,
150157
clickhouse_settings: {
151158
...req.settings,
@@ -160,6 +167,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
160167
error: clickhouseError,
161168
query: req.query,
162169
params,
170+
queryId,
163171
});
164172

165173
recordClickhouseError(span, clickhouseError);
@@ -195,6 +203,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
195203
error: parsed.error,
196204
query: req.query,
197205
params,
206+
queryId,
198207
});
199208

200209
const queryError = new QueryError(generateErrorMessage(parsed.error.issues), {
@@ -235,11 +244,25 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
235244
settings?: ClickHouseSettings;
236245
}): ClickhouseInsertFunction<z.input<TSchema>> {
237246
return async (events, options) => {
247+
const queryId = randomUUID();
248+
238249
return await startSpan(this.tracer, "insert", async (span) => {
250+
this.logger.debug("Inserting into clickhouse", {
251+
clientName: this.name,
252+
name: req.name,
253+
table: req.table,
254+
events: Array.isArray(events) ? events.length : 1,
255+
settings: req.settings,
256+
attributes: options?.attributes,
257+
options,
258+
queryId,
259+
});
260+
239261
span.setAttributes({
240262
"clickhouse.clientName": this.name,
241263
"clickhouse.tableName": req.table,
242264
"clickhouse.operationName": req.name,
265+
"clickhouse.queryId": queryId,
243266
...flattenAttributes(req.settings, "clickhouse.settings"),
244267
...flattenAttributes(options?.attributes),
245268
});
@@ -271,6 +294,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
271294
table: req.table,
272295
format: "JSONEachRow",
273296
values: Array.isArray(validatedEvents) ? validatedEvents : [validatedEvents],
297+
query_id: queryId,
274298
...options?.params,
275299
clickhouse_settings: {
276300
...req.settings,
@@ -291,6 +315,14 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
291315
return [new InsertError(clickhouseError.message), null];
292316
}
293317

318+
this.logger.debug("Inserted into clickhouse", {
319+
clientName: this.name,
320+
name: req.name,
321+
table: req.table,
322+
result,
323+
queryId,
324+
});
325+
294326
span.setAttributes({
295327
"clickhouse.query_id": result.query_id,
296328
"clickhouse.executed": result.executed,

internal-packages/clickhouse/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ export class ClickHouse {
5656
private _splitClients: boolean;
5757

5858
constructor(config: ClickHouseConfig) {
59-
this.logger = config.logger ?? new Logger("ClickHouse", "debug");
59+
this.logger = config.logger ?? new Logger("ClickHouse", config.logLevel ?? "debug");
6060

6161
if (config.url) {
6262
const url = new URL(config.url);

internal-packages/clickhouse/src/taskRuns.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ describe("Task Runs V2", () => {
88
const client = new ClickhouseClient({
99
name: "test",
1010
url: clickhouseContainer.getConnectionUrl(),
11+
logLevel: "debug",
1112
});
1213

1314
const insert = insertTaskRuns(client, {

internal-packages/schedule-engine/src/engine/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ export class ScheduleEngine {
9797
},
9898
pollIntervalMs: options.worker.pollIntervalMs,
9999
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
100-
logger: new Logger("ScheduleEngineWorker", "debug"),
100+
logger: new Logger("ScheduleEngineWorker", (options.logLevel ?? "info") as any),
101101
jobs: {
102102
"schedule.triggerScheduledTask": this.#handleTriggerScheduledTaskJob.bind(this),
103103
},

packages/redis-worker/src/queue.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,12 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
318318

319319
async moveToDeadLetterQueue(id: string, errorMessage: string): Promise<void> {
320320
try {
321+
this.logger.debug(`SimpleQueue ${this.name}.moveToDeadLetterQueue(): moving item to DLQ`, {
322+
queue: this.name,
323+
id,
324+
errorMessage,
325+
});
326+
321327
const result = await this.redis.moveToDeadLetterQueue(
322328
`queue`,
323329
`items`,

0 commit comments

Comments
 (0)