Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,9 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL: z
.enum(["log", "error", "warn", "info", "debug"])
.default("info"),
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ function initializeRunsReplicationInstance() {
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL,
compression: {
request: true,
},
Expand Down
32 changes: 32 additions & 0 deletions internal-packages/clickhouse/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Logger, type LogLevel } from "@trigger.dev/core/logger";
import type { Agent as HttpAgent } from "http";
import type { Agent as HttpsAgent } from "https";
import { ClickhouseQueryBuilder } from "./queryBuilder.js";
import { randomUUID } from "node:crypto";

export type ClickhouseConfig = {
name: string;
Expand Down Expand Up @@ -103,18 +104,22 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
settings?: ClickHouseSettings;
}): ClickhouseQueryFunction<z.input<TIn>, z.output<TOut>> {
return async (params, options) => {
const queryId = randomUUID();

return await startSpan(this.tracer, "query", async (span) => {
this.logger.debug("Querying clickhouse", {
name: req.name,
query: req.query.replace(/\s+/g, " "),
params,
settings: req.settings,
attributes: options?.attributes,
queryId,
});

span.setAttributes({
"clickhouse.clientName": this.name,
"clickhouse.operationName": req.name,
"clickhouse.queryId": queryId,
...flattenAttributes(req.settings, "clickhouse.settings"),
...flattenAttributes(options?.attributes),
});
Expand All @@ -129,6 +134,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
error: validParams.error,
query: req.query,
params,
queryId,
});

return [
Expand All @@ -146,6 +152,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
query: req.query,
query_params: validParams?.data,
format: "JSONEachRow",
query_id: queryId,
...options?.params,
clickhouse_settings: {
...req.settings,
Expand All @@ -160,6 +167,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
error: clickhouseError,
query: req.query,
params,
queryId,
});

recordClickhouseError(span, clickhouseError);
Expand Down Expand Up @@ -195,6 +203,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
error: parsed.error,
query: req.query,
params,
queryId,
});

const queryError = new QueryError(generateErrorMessage(parsed.error.issues), {
Expand Down Expand Up @@ -235,11 +244,25 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
settings?: ClickHouseSettings;
}): ClickhouseInsertFunction<z.input<TSchema>> {
return async (events, options) => {
const queryId = randomUUID();

return await startSpan(this.tracer, "insert", async (span) => {
this.logger.debug("Inserting into clickhouse", {
clientName: this.name,
name: req.name,
table: req.table,
events: Array.isArray(events) ? events.length : 1,
settings: req.settings,
attributes: options?.attributes,
options,
queryId,
});

span.setAttributes({
"clickhouse.clientName": this.name,
"clickhouse.tableName": req.table,
"clickhouse.operationName": req.name,
"clickhouse.queryId": queryId,
...flattenAttributes(req.settings, "clickhouse.settings"),
...flattenAttributes(options?.attributes),
});
Expand Down Expand Up @@ -271,6 +294,7 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
table: req.table,
format: "JSONEachRow",
values: Array.isArray(validatedEvents) ? validatedEvents : [validatedEvents],
query_id: queryId,
...options?.params,
clickhouse_settings: {
...req.settings,
Expand All @@ -291,6 +315,14 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
return [new InsertError(clickhouseError.message), null];
}

this.logger.debug("Inserted into clickhouse", {
clientName: this.name,
name: req.name,
table: req.table,
result,
queryId,
});

span.setAttributes({
"clickhouse.query_id": result.query_id,
"clickhouse.executed": result.executed,
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/clickhouse/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class ClickHouse {
private _splitClients: boolean;

constructor(config: ClickHouseConfig) {
this.logger = config.logger ?? new Logger("ClickHouse", "debug");
this.logger = config.logger ?? new Logger("ClickHouse", config.logLevel ?? "debug");

if (config.url) {
const url = new URL(config.url);
Expand Down
1 change: 1 addition & 0 deletions internal-packages/clickhouse/src/taskRuns.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ describe("Task Runs V2", () => {
const client = new ClickhouseClient({
name: "test",
url: clickhouseContainer.getConnectionUrl(),
logLevel: "debug",
});

const insert = insertTaskRuns(client, {
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/schedule-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class ScheduleEngine {
},
pollIntervalMs: options.worker.pollIntervalMs,
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
logger: new Logger("ScheduleEngineWorker", "debug"),
logger: new Logger("ScheduleEngineWorker", (options.logLevel ?? "info") as any),
jobs: {
"schedule.triggerScheduledTask": this.#handleTriggerScheduledTaskJob.bind(this),
},
Expand Down
6 changes: 6 additions & 0 deletions packages/redis-worker/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {

async moveToDeadLetterQueue(id: string, errorMessage: string): Promise<void> {
try {
this.logger.debug(`SimpleQueue ${this.name}.moveToDeadLetterQueue(): moving item to DLQ`, {
queue: this.name,
id,
errorMessage,
});

const result = await this.redis.moveToDeadLetterQueue(
`queue`,
`items`,
Expand Down