Skip to content

Commit a86e5a1

Browse files
committed
custom events table
1 parent 0cd912f commit a86e5a1

File tree

5 files changed

+167
-1
lines changed

5 files changed

+167
-1
lines changed

apps/basket/src/lib/event-service.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ import { randomUUID } from "node:crypto";
22
import type {
33
AnalyticsEvent,
44
CustomEvent,
5+
CustomEventSpan,
56
CustomOutgoingLink,
67
ErrorEvent,
78
ErrorSpanRow,
89
WebVitalsEvent,
910
WebVitalsSpan,
1011
} from "@databuddy/db";
11-
import type { ErrorSpan, IndividualVital } from "@databuddy/validation";
12+
import type { CustomEventSpanInput, ErrorSpan, IndividualVital } from "@databuddy/validation";
1213
import { getGeo } from "../utils/ip-geo";
1314
import { parseUserAgent } from "../utils/user-agent";
1415
import {
@@ -233,6 +234,49 @@ export async function insertCustomEvent(
233234
}
234235
}
235236

237+
/**
238+
* Insert lean custom event spans (v2.x format)
239+
*/
240+
export async function insertCustomEventSpans(
241+
events: CustomEventSpanInput[],
242+
clientId: string
243+
): Promise<void> {
244+
if (events.length === 0) {
245+
return;
246+
}
247+
248+
const now = Date.now();
249+
const spans: CustomEventSpan[] = [];
250+
251+
for (const event of events) {
252+
// Dedup by client+session+path+eventName+timestamp
253+
const dedupKey = `custom_${clientId}_${event.sessionId}_${event.path}_${event.eventName}_${event.timestamp}`;
254+
const isDuplicate = await checkDuplicate(dedupKey, "custom");
255+
if (isDuplicate) {
256+
continue;
257+
}
258+
259+
const span: CustomEventSpan = {
260+
client_id: clientId,
261+
session_id: validateSessionId(event.sessionId),
262+
timestamp: typeof event.timestamp === "number" ? event.timestamp : now,
263+
path: sanitizeString(event.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
264+
event_name: sanitizeString(event.eventName, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
265+
properties: (event.properties as Record<string, unknown>) ?? {},
266+
};
267+
268+
spans.push(span);
269+
}
270+
271+
if (spans.length > 0) {
272+
try {
273+
await sendEventBatch("analytics-custom-event-spans", spans);
274+
} catch (error) {
275+
captureError(error, { count: spans.length });
276+
}
277+
}
278+
}
279+
236280
/**
237281
* Insert an outgoing link click event into the database
238282
*/

apps/basket/src/routes/basket.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import type {
77
WebVitalsEvent,
88
} from "@databuddy/db";
99
import {
10+
batchedCustomEventSpansSchema,
1011
batchedErrorsSchema,
1112
batchedVitalsSchema,
1213
} from "@databuddy/validation";
1314
import { Elysia } from "elysia";
1415
import {
1516
insertCustomEvent,
17+
insertCustomEventSpans,
1618
insertCustomEventsBatch,
1719
insertError,
1820
insertErrorSpans,
@@ -414,6 +416,50 @@ const app = new Elysia()
414416
return { status: "error", message: "Internal server error" };
415417
}
416418
})
419+
.post("/events", async (context) => {
420+
const { body, query, request } = context as {
421+
body: unknown;
422+
query: Record<string, string>;
423+
request: Request;
424+
};
425+
426+
try {
427+
const validation = await validateRequest(body, query, request);
428+
if ("error" in validation) {
429+
return validation.error;
430+
}
431+
432+
const { clientId, userAgent } = validation;
433+
434+
const parseResult = batchedCustomEventSpansSchema.safeParse(body);
435+
436+
if (!parseResult.success) {
437+
return createSchemaErrorResponse(parseResult.error.issues);
438+
}
439+
440+
const botError = await checkForBot(
441+
request,
442+
body,
443+
query,
444+
clientId,
445+
userAgent
446+
);
447+
if (botError) {
448+
return botError.error;
449+
}
450+
451+
await insertCustomEventSpans(parseResult.data, clientId);
452+
453+
return {
454+
status: "success",
455+
type: "custom_event",
456+
count: parseResult.data.length,
457+
};
458+
} catch (error) {
459+
captureError(error, { message: "Error processing custom events" });
460+
return { status: "error", message: "Internal server error" };
461+
}
462+
})
417463
.post("/", async (context) => {
418464
const { body, query, request } = context as {
419465
body: any;

infra/ingest/vector.yaml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ sources:
1010
- analytics-web-vitals
1111
- analytics-vitals-spans
1212
- analytics-custom-events
13+
- analytics-custom-event-spans
1314
- analytics-outgoing-links
1415
- analytics-email-events
1516
- analytics-blocked-traffic
@@ -41,6 +42,7 @@ transforms:
4142
web_vitals: '.topic == "analytics-web-vitals"'
4243
vitals_spans: '.topic == "analytics-vitals-spans"'
4344
custom_events: '.topic == "analytics-custom-events"'
45+
custom_event_spans: '.topic == "analytics-custom-event-spans"'
4446
outgoing_links: '.topic == "analytics-outgoing-links"'
4547
email_events: '.topic == "analytics-email-events"'
4648
blocked_traffic: '.topic == "analytics-blocked-traffic"'
@@ -174,6 +176,26 @@ sinks:
174176
encoding:
175177
timestamp_format: unix_ms
176178

179+
clickhouse_custom_event_spans:
180+
type: clickhouse
181+
inputs:
182+
- route_analytics.custom_event_spans
183+
endpoint: "${CLICKHOUSE_URL}"
184+
database: analytics
185+
table: custom_event_spans
186+
auth:
187+
strategy: basic
188+
user: "${CLICKHOUSE_USER}"
189+
password: "${CLICKHOUSE_PASSWORD}"
190+
batch:
191+
max_events: 5000
192+
max_bytes: 5000000
193+
timeout_secs: 5
194+
date_time_best_effort: true
195+
skip_unknown_fields: true
196+
encoding:
197+
timestamp_format: unix_ms
198+
177199
clickhouse_outgoing_links:
178200
type: clickhouse
179201
inputs:

packages/db/src/clickhouse/schema.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ TTL toDateTime(Timestamp) + toIntervalDay(3)
378378
SETTINGS ttl_only_drop_parts = 1
379379
`;
380380

381+
// Legacy table - keeping for backwards compatibility
381382
const CREATE_CUSTOM_EVENTS_TABLE = `
382383
CREATE TABLE IF NOT EXISTS ${ANALYTICS_DATABASE}.custom_events (
383384
id UUID,
@@ -394,6 +395,30 @@ ORDER BY (client_id, timestamp, id)
394395
SETTINGS index_granularity = 8192
395396
`;
396397

398+
/**
399+
* Lean custom event spans table
400+
* Uses JSON for flexible metadata
401+
*/
402+
const CREATE_CUSTOM_EVENT_SPANS_TABLE = `
403+
CREATE TABLE IF NOT EXISTS ${ANALYTICS_DATABASE}.custom_event_spans (
404+
client_id String CODEC(ZSTD(1)),
405+
session_id String CODEC(ZSTD(1)),
406+
407+
timestamp DateTime64(3, 'UTC') CODEC(Delta(8), ZSTD(1)),
408+
path String CODEC(ZSTD(1)),
409+
410+
event_name LowCardinality(String) CODEC(ZSTD(1)),
411+
properties JSON CODEC(ZSTD(1)),
412+
413+
INDEX idx_session_id session_id TYPE bloom_filter(0.01) GRANULARITY 1,
414+
INDEX idx_event_name event_name TYPE bloom_filter(0.01) GRANULARITY 1
415+
) ENGINE = MergeTree
416+
PARTITION BY toDate(timestamp)
417+
ORDER BY (client_id, event_name, path, timestamp)
418+
TTL toDateTime(timestamp) + INTERVAL 90 DAY
419+
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1
420+
`;
421+
397422
const CREATE_CUSTOM_OUTGOING_LINKS_TABLE = `
398423
CREATE TABLE IF NOT EXISTS ${ANALYTICS_DATABASE}.outgoing_links (
399424
id UUID,
@@ -625,6 +650,7 @@ export type OTelLogs = {
625650
LogAttributes: Record<string, string>;
626651
}
627652

653+
// Legacy type - keeping for backwards compatibility
628654
export type CustomEvent = {
629655
id: string;
630656
client_id: string;
@@ -635,6 +661,19 @@ export type CustomEvent = {
635661
timestamp: number;
636662
}
637663

664+
/**
665+
* Lean custom event span
666+
* properties is flexible JSON
667+
*/
668+
export type CustomEventSpan = {
669+
client_id: string;
670+
session_id: string;
671+
timestamp: number;
672+
path: string;
673+
event_name: string;
674+
properties: Record<string, unknown>;
675+
}
676+
638677
export type CustomOutgoingLink = {
639678
id: string;
640679
client_id: string;
@@ -741,6 +780,7 @@ export async function initClickHouseSchema() {
741780
{ name: "blocked_traffic", query: CREATE_BLOCKED_TRAFFIC_TABLE },
742781
{ name: "email_events", query: CREATE_EMAIL_EVENTS_TABLE },
743782
{ name: "custom_events", query: CREATE_CUSTOM_EVENTS_TABLE },
783+
{ name: "custom_event_spans", query: CREATE_CUSTOM_EVENT_SPANS_TABLE },
744784
{ name: "outgoing_links", query: CREATE_CUSTOM_OUTGOING_LINKS_TABLE },
745785
];
746786

packages/validation/src/schemas/custom-events.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import z from "zod";
22
import { VALIDATION_LIMITS } from "../constants";
33

4+
// Legacy schema
45
export const customEventSchema = z.object({
56
eventId: z.string().max(VALIDATION_LIMITS.EVENT_ID_MAX_LENGTH).optional(),
67
name: z.string().min(1).max(VALIDATION_LIMITS.NAME_MAX_LENGTH),
@@ -10,6 +11,19 @@ export const customEventSchema = z.object({
1011
properties: z.json().optional().nullable(),
1112
});
1213

14+
// Lean custom event span schema (v2.x)
15+
export const customEventSpanSchema = z.object({
16+
timestamp: z.number().int(),
17+
path: z.string().max(VALIDATION_LIMITS.PATH_MAX_LENGTH),
18+
eventName: z.string().min(1).max(VALIDATION_LIMITS.NAME_MAX_LENGTH),
19+
sessionId: z.string().max(VALIDATION_LIMITS.SESSION_ID_MAX_LENGTH).nullable().optional(),
20+
properties: z.json().optional().nullable(),
21+
});
22+
23+
export const batchedCustomEventSpansSchema = z.array(customEventSpanSchema).max(VALIDATION_LIMITS.BATCH_MAX_SIZE);
24+
25+
export type CustomEventSpanInput = z.infer<typeof customEventSpanSchema>;
26+
1327
export const outgoingLinkSchema = z.object({
1428
eventId: z.string().max(VALIDATION_LIMITS.EVENT_ID_MAX_LENGTH),
1529
anonymousId: z.string().nullable().optional(),

0 commit comments

Comments
 (0)