Skip to content

Commit 05f2f4f

Browse files
committed
switch off otel plugin
1 parent 226d17b commit 05f2f4f

File tree

9 files changed

+333
-232
lines changed

9 files changed

+333
-232
lines changed

apps/basket/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
"@databuddy/shared": "workspace:*",
1919
"@databuddy/validation": "workspace:*",
2020
"@elysiajs/cors": "^1.3.3",
21-
"@elysiajs/opentelemetry": "^1.4.6",
2221
"@elysiajs/server-timing": "^1.3.0",
2322
"@maxmind/geoip2-node": "^6.1.0",
23+
"@opentelemetry/api": "^1.9.0",
2424
"@opentelemetry/exporter-trace-otlp-proto": "^0.208.0",
2525
"@opentelemetry/resources": "^2.2.0",
26+
"@opentelemetry/sdk-node": "^0.208.0",
2627
"@opentelemetry/sdk-trace-node": "^2.2.0",
2728
"@opentelemetry/semantic-conventions": "^1.38.0",
2829
"@types/ua-parser-js": "^0.7.39",

apps/basket/src/hooks/auth.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
import { and, db, eq, member, websites } from "@databuddy/db";
99
import { cacheable } from "@databuddy/redis";
10-
import { record, setAttributes } from "@elysiajs/opentelemetry";
1110
import { logger } from "../lib/logger";
11+
import { record, setAttributes } from "../lib/tracing";
1212

1313
type Website = typeof websites.$inferSelect;
1414

@@ -130,10 +130,7 @@ export function isValidOrigin(
130130
return true;
131131
}
132132
if (!allowedDomain?.trim()) {
133-
logger.warn(
134-
{ originHeader },
135-
"[isValidOrigin] No allowed domain provided"
136-
);
133+
logger.warn({ originHeader }, "[isValidOrigin] No allowed domain provided");
137134
return false;
138135
}
139136
try {
@@ -339,7 +336,10 @@ const getWebsiteByIdWithOwnerCached = cacheable(
339336
const ownerId = await _resolveOwnerId(website);
340337
return { ...website, ownerId };
341338
} catch (error) {
342-
logger.error({ error, websiteId: id }, "Failed to get website by ID from cache");
339+
logger.error(
340+
{ error, websiteId: id },
341+
"Failed to get website by ID from cache"
342+
);
343343
return null;
344344
}
345345
},
@@ -351,9 +351,7 @@ const getWebsiteByIdWithOwnerCached = cacheable(
351351
}
352352
);
353353

354-
export function getWebsiteByIdV2(
355-
id: string
356-
): Promise<WebsiteWithOwner | null> {
354+
export function getWebsiteByIdV2(id: string): Promise<WebsiteWithOwner | null> {
357355
return record("getWebsiteByIdV2", async () => {
358356
setAttributes({
359357
"website.id": id,

apps/basket/src/index.ts

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import "./polyfills/compression";
22

3-
import { opentelemetry } from "@elysiajs/opentelemetry";
4-
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto";
5-
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-node";
63
import { Elysia } from "elysia";
74
import { logger } from "./lib/logger";
85
import { getProducerStats } from "./lib/producer";
6+
import {
7+
endRequestSpan,
8+
initTracing,
9+
startRequestSpan,
10+
} from "./lib/tracing";
911
import basketRouter from "./routes/basket";
1012
import emailRouter from "./routes/email";
1113

14+
initTracing();
15+
1216
function getKafkaHealth() {
1317
const stats = getProducerStats();
1418

@@ -37,27 +41,9 @@ function getKafkaHealth() {
3741
}
3842

3943
const app = new Elysia()
40-
.use(
41-
opentelemetry({
42-
serviceName: "basket",
43-
spanProcessors: [
44-
new BatchSpanProcessor(
45-
new OTLPTraceExporter({
46-
url: "https://api.axiom.co/v1/traces",
47-
headers: {
48-
Authorization: `Bearer ${process.env.AXIOM_TOKEN}`,
49-
"X-Axiom-Dataset": process.env.AXIOM_DATASET ?? "basket",
50-
},
51-
})
52-
),
53-
],
54-
})
55-
)
56-
.onError(function handleError({ error, code }) {
57-
if (code === "NOT_FOUND") {
58-
return new Response(null, { status: 404 });
59-
}
60-
logger.error({ error }, "Error in basket service");
44+
.state("tracing", {
45+
span: null as ReturnType<typeof startRequestSpan> | null,
46+
startTime: 0,
6147
})
6248
.onBeforeHandle(function handleCors({ request, set }) {
6349
const origin = request.headers.get("origin");
@@ -71,6 +57,34 @@ const app = new Elysia()
7157
set.headers["Access-Control-Allow-Credentials"] = "true";
7258
}
7359
})
60+
.onBeforeHandle(function startTrace({ request, path, store }) {
61+
const method = request.method;
62+
const startTime = Date.now();
63+
const span = startRequestSpan(method, request.url, path);
64+
65+
// Store span and start time in Elysia store
66+
store.tracing = {
67+
span,
68+
startTime,
69+
};
70+
})
71+
.onAfterHandle(function endTrace({ response, store }) {
72+
if (store.tracing?.span && store.tracing.startTime) {
73+
const statusCode = response instanceof Response ? response.status : 200;
74+
endRequestSpan(store.tracing.span, statusCode, store.tracing.startTime);
75+
}
76+
})
77+
.onError(function handleError({ error, code, store }) {
78+
if (store.tracing?.span && store.tracing.startTime) {
79+
const statusCode = code === "NOT_FOUND" ? 404 : 500;
80+
endRequestSpan(store.tracing.span, statusCode, store.tracing.startTime);
81+
}
82+
83+
if (code === "NOT_FOUND") {
84+
return new Response(null, { status: 404 });
85+
}
86+
logger.error({ error }, "Error in basket service");
87+
})
7488
.options("*", () => new Response(null, { status: 204 }))
7589
.use(basketRouter)
7690
.use(emailRouter)

apps/basket/src/lib/event-service.ts

Lines changed: 17 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import type {
66
ErrorEvent,
77
WebVitalsEvent,
88
} from "@databuddy/db";
9-
import { record, setAttributes } from "@elysiajs/opentelemetry";
109
import { getGeo } from "../utils/ip-geo";
1110
import { parseUserAgent } from "../utils/user-agent";
1211
import {
@@ -18,6 +17,7 @@ import {
1817
import { logger } from "./logger";
1918
import { sendEvent, sendEventBatch } from "./producer";
2019
import { checkDuplicate, getDailySalt, saltAnonymousId } from "./security";
20+
import { record, setAttributes } from "./tracing";
2121

2222
/**
2323
* Insert an error event into the database
@@ -70,7 +70,8 @@ export function insertError(
7070
VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH
7171
),
7272
session_id: validateSessionId(payload.sessionId),
73-
timestamp: typeof payload.timestamp === "number" ? payload.timestamp : now,
73+
timestamp:
74+
typeof payload.timestamp === "number" ? payload.timestamp : now,
7475
path: sanitizeString(payload.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
7576
message: sanitizeString(
7677
payload.message,
@@ -108,10 +109,7 @@ export function insertError(
108109
try {
109110
sendEvent("analytics-errors", errorEvent);
110111
} catch (error) {
111-
logger.error(
112-
{ error, eventId },
113-
"Failed to queue error event"
114-
);
112+
logger.error({ error, eventId }, "Failed to queue error event");
115113
}
116114
});
117115
}
@@ -175,10 +173,7 @@ export async function insertWebVitals(
175173
try {
176174
sendEvent("analytics-web-vitals", webVitalsEvent);
177175
} catch (error) {
178-
logger.error(
179-
{ error, eventId },
180-
"Failed to queue web vitals event"
181-
);
176+
logger.error({ error, eventId }, "Failed to queue web vitals event");
182177
// Don't throw - event is buffered or sent async
183178
}
184179
}
@@ -229,10 +224,7 @@ export async function insertCustomEvent(
229224
try {
230225
sendEvent("analytics-custom-events", customEvent);
231226
} catch (error) {
232-
logger.error(
233-
{ error, eventId },
234-
"Failed to queue custom event"
235-
);
227+
logger.error({ error, eventId }, "Failed to queue custom event");
236228
// Don't throw - event is buffered or sent async
237229
}
238230
}
@@ -281,10 +273,7 @@ export async function insertOutgoingLink(
281273
try {
282274
sendEvent("analytics-outgoing-links", outgoingLinkEvent);
283275
} catch (error) {
284-
logger.error(
285-
{ error, eventId },
286-
"Failed to queue outgoing link event"
287-
);
276+
logger.error({ error, eventId }, "Failed to queue outgoing link event");
288277
}
289278
}
290279

@@ -370,7 +359,10 @@ export function insertTrackEvent(
370359
),
371360
url: sanitizeString(trackData.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
372361
path: sanitizeString(trackData.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
373-
title: sanitizeString(trackData.title, VALIDATION_LIMITS.STRING_MAX_LENGTH),
362+
title: sanitizeString(
363+
trackData.title,
364+
VALIDATION_LIMITS.STRING_MAX_LENGTH
365+
),
374366

375367
ip: anonymizedIP || "",
376368
user_agent: "",
@@ -412,7 +404,9 @@ export function insertTrackEvent(
412404
connection_time: validatePerformanceMetric(trackData.connection_time),
413405
render_time: validatePerformanceMetric(trackData.render_time),
414406
redirect_time: validatePerformanceMetric(trackData.redirect_time),
415-
domain_lookup_time: validatePerformanceMetric(trackData.domain_lookup_time),
407+
domain_lookup_time: validatePerformanceMetric(
408+
trackData.domain_lookup_time
409+
),
416410

417411
properties: trackData.properties
418412
? JSON.stringify(trackData.properties)
@@ -431,10 +425,7 @@ export function insertTrackEvent(
431425
try {
432426
sendEvent("analytics-events", trackEvent);
433427
} catch (error) {
434-
logger.error(
435-
{ error, eventId },
436-
"Failed to queue track event"
437-
);
428+
logger.error({ error, eventId }, "Failed to queue track event");
438429
}
439430
});
440431
}
@@ -485,9 +476,7 @@ export function insertErrorsBatch(events: ErrorEvent[]): Promise<void> {
485476
});
486477
}
487478

488-
export function insertWebVitalsBatch(
489-
events: WebVitalsEvent[]
490-
): Promise<void> {
479+
export function insertWebVitalsBatch(events: WebVitalsEvent[]): Promise<void> {
491480
return record("insertWebVitalsBatch", async () => {
492481
if (events.length === 0) {
493482
return;
@@ -509,9 +498,7 @@ export function insertWebVitalsBatch(
509498
});
510499
}
511500

512-
export function insertCustomEventsBatch(
513-
events: CustomEvent[]
514-
): Promise<void> {
501+
export function insertCustomEventsBatch(events: CustomEvent[]): Promise<void> {
515502
return record("insertCustomEventsBatch", async () => {
516503
if (events.length === 0) {
517504
return;

apps/basket/src/lib/producer.ts

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import type { ClickHouseClient } from "@clickhouse/client";
22
import { clickHouse, TABLE_NAMES } from "@databuddy/db";
3-
import { record, setAttributes } from "@elysiajs/opentelemetry";
43
import { Semaphore } from "async-mutex";
54
import { CompressionTypes, Kafka, type Producer } from "kafkajs";
65
import { logger } from "./logger";
6+
import { record, setAttributes } from "./tracing";
77

88
type BufferedEvent = {
99
table: string;
@@ -176,7 +176,10 @@ export class EventProducer {
176176
this.lastRetry = Date.now();
177177
this.stats.errors += 1;
178178
this.stats.lastErrorTime = Date.now();
179-
logger.error({ error }, "Redpanda connection failed, using ClickHouse fallback");
179+
logger.error(
180+
{ error },
181+
"Redpanda connection failed, using ClickHouse fallback"
182+
);
180183
if (this.dependencies.onError) {
181184
this.dependencies.onError(new Error(String(error)));
182185
}
@@ -247,7 +250,7 @@ export class EventProducer {
247250
logger.error(
248251
{ error },
249252
`Dropped event (retries: ${retries}, age: ${age}ms)`,
250-
{ table, eventId: (event as { event_id?: string }).event_id },
253+
{ table, eventId: (event as { event_id?: string }).event_id }
251254
);
252255
}
253256
}
@@ -259,7 +262,10 @@ export class EventProducer {
259262

260263
const failures = results.filter((r) => r.status === "rejected");
261264
if (failures.length > 0) {
262-
logger.error({ failures: failures.length }, "Table flush operations failed");
265+
logger.error(
266+
{ failures: failures.length },
267+
"Table flush operations failed"
268+
);
263269
}
264270
} catch (error) {
265271
this.stats.errors += 1;
@@ -300,7 +306,10 @@ export class EventProducer {
300306

301307
if (this.buffer.length >= this.config.bufferHardMax) {
302308
this.stats.dropped += 1;
303-
logger.error({ bufferLength: this.buffer.length }, "Buffer overflow, dropping event");
309+
logger.error(
310+
{ bufferLength: this.buffer.length },
311+
"Buffer overflow, dropping event"
312+
);
304313
return;
305314
}
306315

@@ -363,7 +372,10 @@ export class EventProducer {
363372
return;
364373
} catch (error) {
365374
this.stats.failed += 1;
366-
logger.error({ error }, "Redpanda send failed, buffering to ClickHouse");
375+
logger.error(
376+
{ error },
377+
"Redpanda send failed, buffering to ClickHouse"
378+
);
367379
this.failed = true;
368380
setAttributes({
369381
"kafka.send_failed": true,
@@ -457,7 +469,10 @@ export class EventProducer {
457469
return;
458470
} catch (error) {
459471
this.stats.failed += events.length;
460-
logger.error({ error }, "Redpanda batch failed, buffering to ClickHouse");
472+
logger.error(
473+
{ error },
474+
"Redpanda batch failed, buffering to ClickHouse"
475+
);
461476
this.failed = true;
462477
setAttributes({
463478
"kafka.send_failed": true,
@@ -506,11 +521,7 @@ export class EventProducer {
506521
await this.flush();
507522

508523
let finalFlushAttempts = 0;
509-
while (
510-
this.buffer.length > 0 &&
511-
finalFlushAttempts < 3 &&
512-
!this.flushing
513-
) {
524+
while (this.buffer.length > 0 && finalFlushAttempts < 3 && !this.flushing) {
514525
finalFlushAttempts += 1;
515526
await this.flush();
516527
await new Promise((r) => setTimeout(r, 1000));
@@ -602,11 +613,15 @@ export const disconnectProducer = async (): Promise<void> => {
602613
export const getProducerStats = () => getDefaultProducer().getStats();
603614

604615
process.on("SIGTERM", async () => {
605-
await disconnectProducer().catch((error) => logger.error({ error }, "SIGTERM error"));
616+
await disconnectProducer().catch((error) =>
617+
logger.error({ error }, "SIGTERM error")
618+
);
606619
process.exit(0);
607620
});
608621

609622
process.on("SIGINT", async () => {
610-
await disconnectProducer().catch((error) => logger.error({ error }, "SIGINT error"));
623+
await disconnectProducer().catch((error) =>
624+
logger.error({ error }, "SIGINT error")
625+
);
611626
process.exit(0);
612627
});

0 commit comments

Comments
 (0)