Skip to content

Commit 59bd1cc

Browse files
committed
Defer span processing to background worker via object storage
The OTEL ingest endpoint previously parsed, transformed, and wrote spans to ClickHouse synchronously within the HTTP request. As the span processing logic grows more complex, this coupling increases response latency and risks timeouts from OTEL exporters. This change splits ingestion into two phases: the endpoint now validates the payload, buffers it to object storage, and enqueues a BullMQ job — returning immediately. A new background worker picks up the job, runs the transform, persists to ClickHouse, and cleans up the buffered payload. This keeps the ingest endpoint lightweight and allows processing to retry independently on failure. The OTLP parsing and transform logic was relocated from `apps/ingest` to `@domain/spans` so both the ingest app and workers app can share it without cross-app imports.
1 parent 76631e5 commit 59bd1cc

File tree

15 files changed

+218
-34
lines changed

15 files changed

+218
-34
lines changed

apps/ingest/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
"@platform/db-clickhouse": "workspace:*",
2424
"@platform/db-postgres": "workspace:*",
2525
"@platform/env": "workspace:*",
26+
"@platform/storage-object": "workspace:*",
2627
"@repo/observability": "workspace:*",
2728
"@repo/utils": "workspace:*",
29+
"bullmq": "catalog:",
2830
"dotenv": "catalog:",
2931
"effect": "catalog:",
3032
"hono": "catalog:",

apps/ingest/src/clients.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,22 @@
11
import type { ClickHouseClient } from "@clickhouse/client"
2-
import type { RedisClient } from "@platform/cache-redis"
2+
import { SPAN_INGESTION_QUEUE } from "@domain/shared"
3+
import type { RedisClient, RedisConnection } from "@platform/cache-redis"
34
import { createRedisClient, createRedisConnection } from "@platform/cache-redis"
45
import { createClickhouseClient } from "@platform/db-clickhouse"
56
import { createPostgresClient, type PostgresClient } from "@platform/db-postgres"
67
import { parseEnv } from "@platform/env"
8+
import type { StorageDisk } from "@platform/storage-object"
9+
import { createStorageDisk } from "@platform/storage-object"
10+
import { Queue } from "bullmq"
711
import { Effect } from "effect"
812

913
let postgresClientInstance: PostgresClient | undefined
1014
let adminPostgresClientInstance: PostgresClient | undefined
1115
let clickhouseInstance: ClickHouseClient | undefined
1216
let redisInstance: RedisClient | undefined
17+
let redisConnectionInstance: RedisConnection | undefined
18+
let storageDiskInstance: StorageDisk | undefined
19+
let spanIngestionQueueInstance: Queue | undefined
1320

1421
export const getPostgresClient = (): PostgresClient => {
1522
if (!postgresClientInstance) {
@@ -44,3 +51,24 @@ export const getRedisClient = (): RedisClient => {
4451
}
4552
return redisInstance
4653
}
54+
55+
const getRedisConnection = (): RedisConnection => {
56+
if (!redisConnectionInstance) {
57+
redisConnectionInstance = createRedisConnection()
58+
}
59+
return redisConnectionInstance
60+
}
61+
62+
export const getStorageDisk = (): StorageDisk => {
63+
if (!storageDiskInstance) {
64+
storageDiskInstance = createStorageDisk()
65+
}
66+
return storageDiskInstance
67+
}
68+
69+
export const getSpanIngestionQueue = (): Queue => {
70+
if (!spanIngestionQueueInstance) {
71+
spanIngestionQueueInstance = new Queue(SPAN_INGESTION_QUEUE, { connection: getRedisConnection() })
72+
}
73+
return spanIngestionQueueInstance
74+
}

apps/ingest/src/otlp/proto.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
import type { OtlpExportTraceServiceRequest } from "@domain/spans"
12
import protobuf from "protobufjs"
2-
import type { OtlpExportTraceServiceRequest } from "./types.ts"
33

44
/**
55
* Protobuf schema for OTLP ExportTraceServiceRequest.
@@ -132,15 +132,14 @@ function bytesToHex(bytes: Uint8Array | number[]): string {
132132

133133
/**
134134
* Recursively normalize a decoded protobuf object to match the OTLP/JSON shape:
135-
* - Uint8Array bytes fields lowercase hex strings
136-
* - Long objects decimal strings
135+
* - Uint8Array bytes fields -> lowercase hex strings
136+
* - Long objects -> decimal strings
137137
*/
138138
function normalizeValue(value: unknown): unknown {
139139
if (value === null || value === undefined) return value
140140
if (value instanceof Uint8Array) return bytesToHex(value)
141141
if (Array.isArray(value)) return value.map(normalizeValue)
142142

143-
// protobufjs Long objects have low/high/unsigned properties
144143
if (typeof value === "object" && "low" in value && "high" in value) {
145144
const long = value as { low: number; high: number; unsigned: boolean }
146145
const bigint = BigInt(long.high >>> 0) * BigInt(2 ** 32) + BigInt(long.low >>> 0)

apps/ingest/src/routes/traces.ts

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
import { OrganizationId } from "@domain/shared"
2-
import { SpanRepository } from "@domain/spans"
3-
import { ChSqlClientLive, SpanRepositoryLive } from "@platform/db-clickhouse"
1+
import { OrganizationId, ProjectId, putInDisk } from "@domain/shared"
2+
import type { OtlpExportTraceServiceRequest } from "@domain/spans"
43
import { Effect } from "effect"
54
import type { Hono } from "hono"
6-
import { getClickhouseClient } from "../clients.ts"
5+
import { getSpanIngestionQueue, getStorageDisk } from "../clients.ts"
76
import { authMiddleware } from "../middleware/auth.ts"
87
import { projectMiddleware } from "../middleware/project.ts"
98
import { decodeOtlpProtobuf } from "../otlp/proto.ts"
10-
import { transformOtlpToSpans } from "../otlp/transform.ts"
11-
import type { OtlpExportTraceServiceRequest } from "../otlp/types.ts"
129
import type { IngestEnv } from "../types.ts"
1310

1411
interface TracesRouteContext {
@@ -31,22 +28,38 @@ export const registerTracesRoute = ({ app }: TracesRouteContext) => {
3128
return c.json({ error: "Invalid OTLP payload" }, 400)
3229
}
3330

34-
const spans = transformOtlpToSpans(request, {
35-
organizationId: c.get("organizationId"),
36-
projectId: c.get("projectId"),
37-
apiKeyId: c.get("apiKeyId"),
38-
})
39-
40-
if (spans.length > 0) {
41-
const orgId = OrganizationId(c.get("organizationId"))
42-
await Effect.runPromise(
43-
Effect.gen(function* () {
44-
const repo = yield* SpanRepository
45-
yield* repo.insert(spans)
46-
}).pipe(Effect.provide(SpanRepositoryLive), Effect.provide(ChSqlClientLive(getClickhouseClient(), orgId))),
47-
)
31+
if (!request.resourceSpans?.length) {
32+
return c.json({})
4833
}
4934

35+
const organizationId = c.get("organizationId")
36+
const projectId = c.get("projectId")
37+
const apiKeyId = c.get("apiKeyId")
38+
const ingestedAt = new Date().toISOString()
39+
40+
const payload = JSON.stringify({ request, context: { organizationId, projectId, apiKeyId }, ingestedAt })
41+
42+
const disk = getStorageDisk()
43+
const storageKey = await Effect.runPromise(
44+
putInDisk(disk, {
45+
namespace: "ingest",
46+
organizationId: OrganizationId(organizationId),
47+
projectId: ProjectId(projectId),
48+
content: payload,
49+
}),
50+
)
51+
52+
const queue = getSpanIngestionQueue()
53+
await queue.add(
54+
"process-spans",
55+
{ storageKey },
56+
{
57+
jobId: storageKey,
58+
attempts: 3,
59+
backoff: { type: "exponential", delay: 2000 },
60+
},
61+
)
62+
5063
return c.json({})
5164
})
5265
}

apps/workers/package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,15 @@
1313
"check:fix": "biome check --fix src"
1414
},
1515
"dependencies": {
16+
"@clickhouse/client": "catalog:",
17+
"@domain/shared": "workspace:*",
18+
"@domain/spans": "workspace:*",
1619
"@platform/cache-redis": "workspace:*",
20+
"@platform/db-clickhouse": "workspace:*",
1721
"@platform/db-postgres": "workspace:*",
1822
"@platform/events-outbox": "workspace:*",
1923
"@platform/queue-bullmq": "workspace:*",
24+
"@platform/storage-object": "workspace:*",
2025
"@repo/observability": "workspace:*",
2126
"@types/pg": "catalog:",
2227
"bullmq": "catalog:",

apps/workers/src/clients.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import type { ClickHouseClient } from "@clickhouse/client"
12
import type { RedisConnection } from "@platform/cache-redis"
23
import { createRedisConnection } from "@platform/cache-redis"
4+
import { createClickhouseClient } from "@platform/db-clickhouse"
35
import { createPostgresPool } from "@platform/db-postgres"
6+
import type { StorageDisk } from "@platform/storage-object"
7+
import { createStorageDisk } from "@platform/storage-object"
48
import type { Pool } from "pg"
59

610
let redisConnectionInstance: RedisConnection | undefined
711
let pgPoolInstance: Pool | undefined
12+
let clickhouseInstance: ClickHouseClient | undefined
13+
let storageDiskInstance: StorageDisk | undefined
814

915
export const getRedisConnection = (): RedisConnection => {
1016
if (!redisConnectionInstance) {
@@ -19,3 +25,17 @@ export const getPostgresPool = (maxConnections?: number): Pool => {
1925
}
2026
return pgPoolInstance
2127
}
28+
29+
export const getClickhouseClient = (): ClickHouseClient => {
30+
if (!clickhouseInstance) {
31+
clickhouseInstance = createClickhouseClient()
32+
}
33+
return clickhouseInstance
34+
}
35+
36+
export const getStorageDisk = (): StorageDisk => {
37+
if (!storageDiskInstance) {
38+
storageDiskInstance = createStorageDisk()
39+
}
40+
return storageDiskInstance
41+
}

apps/workers/src/server.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@ import { createPollingOutboxConsumer } from "@platform/events-outbox"
44
import { createBullmqEventsPublisher } from "@platform/queue-bullmq"
55
import { createLogger } from "@repo/observability"
66
import { config as loadDotenv } from "dotenv"
7-
import { getPostgresPool, getRedisConnection } from "./clients.ts"
7+
import { getClickhouseClient, getPostgresPool, getRedisConnection, getStorageDisk } from "./clients.ts"
88
import { createEventsWorker } from "./workers/events.ts"
9+
import { createSpanIngestionWorker } from "./workers/span-ingestion.ts"
910

1011
const nodeEnv = process.env.NODE_ENV || "development"
1112
const envFilePath = fileURLToPath(new URL(`../../../.env.${nodeEnv}`, import.meta.url))
@@ -27,6 +28,12 @@ const outboxConsumer = createPollingOutboxConsumer(
2728
eventsPublisher,
2829
)
2930

31+
const { queue: spanIngestionQueue, worker: spanIngestionWorker } = createSpanIngestionWorker({
32+
redisConnection,
33+
clickhouseClient: getClickhouseClient(),
34+
storageDisk: getStorageDisk(),
35+
})
36+
3037
const logger = createLogger("workers")
3138

3239
eventsWorker.on("ready", () => {
@@ -35,10 +42,20 @@ eventsWorker.on("ready", () => {
3542
logger.info("workers ready and outbox consumer started")
3643
})
3744

45+
spanIngestionWorker.on("ready", () => {
46+
logger.info("span ingestion worker ready")
47+
})
48+
49+
spanIngestionWorker.on("failed", (job, error) => {
50+
logger.error({ jobId: job?.id, storageKey: job?.data?.storageKey, error }, "span ingestion job failed")
51+
})
52+
3853
process.on("SIGINT", async () => {
3954
await outboxConsumer.stop()
4055
await pgPool.end()
4156
await eventsQueue.close()
4257
await eventsWorker.close()
58+
await spanIngestionQueue.close()
59+
await spanIngestionWorker.close()
4360
process.exit(0)
4461
})
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import type { ClickHouseClient } from "@clickhouse/client"
2+
import { type OrganizationId, SPAN_INGESTION_QUEUE, type StorageDiskPort } from "@domain/shared"
3+
import type { OtlpExportTraceServiceRequest, TransformContext } from "@domain/spans"
4+
import { SpanRepository, transformOtlpToSpans } from "@domain/spans"
5+
import type { RedisConnection } from "@platform/cache-redis"
6+
import { ChSqlClientLive, SpanRepositoryLive } from "@platform/db-clickhouse"
7+
import { type Job, Queue, Worker } from "bullmq"
8+
import { Effect } from "effect"
9+
10+
interface StoredPayload {
11+
readonly request: OtlpExportTraceServiceRequest
12+
readonly context: Omit<TransformContext, "ingestedAt">
13+
readonly ingestedAt?: string
14+
}
15+
16+
interface SpanIngestionWorkerConfig {
17+
readonly redisConnection: RedisConnection
18+
readonly clickhouseClient: ClickHouseClient
19+
readonly storageDisk: StorageDiskPort
20+
}
21+
22+
const createProcessor =
23+
({ clickhouseClient, storageDisk }: { clickhouseClient: ClickHouseClient; storageDisk: StorageDiskPort }) =>
24+
async (job: Job<{ storageKey: string }>) => {
25+
const { storageKey } = job.data
26+
27+
const raw = await storageDisk.get(storageKey)
28+
const { request, context, ingestedAt } = JSON.parse(raw) as StoredPayload
29+
30+
const spans = transformOtlpToSpans(request, {
31+
...context,
32+
ingestedAt: ingestedAt ? new Date(ingestedAt) : new Date(),
33+
})
34+
35+
if (spans.length > 0) {
36+
await Effect.runPromise(
37+
Effect.gen(function* () {
38+
const repo = yield* SpanRepository
39+
yield* repo.insert(spans)
40+
}).pipe(
41+
Effect.provide(SpanRepositoryLive),
42+
Effect.provide(ChSqlClientLive(clickhouseClient, context.organizationId as OrganizationId)),
43+
),
44+
)
45+
}
46+
47+
await storageDisk.delete(storageKey)
48+
}
49+
50+
export const createSpanIngestionWorker = ({
51+
redisConnection,
52+
clickhouseClient,
53+
storageDisk,
54+
}: SpanIngestionWorkerConfig) => {
55+
const queue = new Queue(SPAN_INGESTION_QUEUE, { connection: redisConnection })
56+
57+
const worker = new Worker(SPAN_INGESTION_QUEUE, createProcessor({ clickhouseClient, storageDisk }), {
58+
connection: redisConnection,
59+
})
60+
61+
return { queue, worker }
62+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export * from "./ch-sql-client.ts"
22
export * from "./errors.ts"
33
export * from "./id.ts"
4+
export * from "./queues.ts"
45
export * from "./seeds.ts"
56
export * from "./sql-client.ts"
67
export * from "./storage.ts"
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export const SPAN_INGESTION_QUEUE = "span-ingestion" as const

0 commit comments

Comments
 (0)