Skip to content

Commit 5f048e1

Browse files
committed
fix: schema
1 parent a86e5a1 commit 5f048e1

File tree

9 files changed

+162
-114
lines changed

9 files changed

+162
-114
lines changed

apps/basket/src/lib/event-service.ts

Lines changed: 43 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -246,34 +246,20 @@ export async function insertCustomEventSpans(
246246
}
247247

248248
const now = Date.now();
249-
const spans: CustomEventSpan[] = [];
250-
251-
for (const event of events) {
252-
// Dedup by client+session+path+eventName+timestamp
253-
const dedupKey = `custom_${clientId}_${event.sessionId}_${event.path}_${event.eventName}_${event.timestamp}`;
254-
const isDuplicate = await checkDuplicate(dedupKey, "custom");
255-
if (isDuplicate) {
256-
continue;
257-
}
258-
259-
const span: CustomEventSpan = {
260-
client_id: clientId,
261-
session_id: validateSessionId(event.sessionId),
262-
timestamp: typeof event.timestamp === "number" ? event.timestamp : now,
263-
path: sanitizeString(event.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
264-
event_name: sanitizeString(event.eventName, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
265-
properties: (event.properties as Record<string, unknown>) ?? {},
266-
};
267-
268-
spans.push(span);
269-
}
249+
const spans: CustomEventSpan[] = events.map((event) => ({
250+
client_id: clientId,
251+
anonymous_id: sanitizeString(event.anonymousId, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
252+
session_id: validateSessionId(event.sessionId),
253+
timestamp: typeof event.timestamp === "number" ? event.timestamp : now,
254+
path: sanitizeString(event.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
255+
event_name: sanitizeString(event.eventName, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
256+
properties: (event.properties as Record<string, unknown>) ?? {},
257+
}));
270258

271-
if (spans.length > 0) {
272-
try {
273-
await sendEventBatch("analytics-custom-event-spans", spans);
274-
} catch (error) {
275-
captureError(error, { count: spans.length });
276-
}
259+
try {
260+
await sendEventBatch("analytics-custom-event-spans", spans);
261+
} catch (error) {
262+
captureError(error, { count: spans.length });
277263
}
278264
}
279265

@@ -530,39 +516,24 @@ export async function insertErrorSpans(
530516
}
531517

532518
const now = Date.now();
533-
const spans: ErrorSpanRow[] = [];
534-
535-
for (const error of errors) {
536-
// Use message hash as dedup key
537-
const dedupKey = `error_${clientId}_${error.message.slice(0, 50)}_${error.path}`;
538-
const isDuplicate = await checkDuplicate(dedupKey, "error");
539-
if (isDuplicate) {
540-
continue;
541-
}
542-
543-
const errorSpan: ErrorSpanRow = {
544-
client_id: clientId,
545-
anonymous_id: sanitizeString(error.anonymousId, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
546-
session_id: validateSessionId(error.sessionId),
547-
timestamp: typeof error.timestamp === "number" ? error.timestamp : now,
548-
path: sanitizeString(error.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
549-
message: sanitizeString(error.message, VALIDATION_LIMITS.STRING_MAX_LENGTH),
550-
filename: sanitizeString(error.filename, VALIDATION_LIMITS.STRING_MAX_LENGTH),
551-
lineno: error.lineno ?? undefined,
552-
colno: error.colno ?? undefined,
553-
stack: sanitizeString(error.stack, VALIDATION_LIMITS.STRING_MAX_LENGTH),
554-
error_type: sanitizeString(error.errorType, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH) || "Error",
555-
};
556-
557-
spans.push(errorSpan);
558-
}
519+
const spans: ErrorSpanRow[] = errors.map((error) => ({
520+
client_id: clientId,
521+
anonymous_id: sanitizeString(error.anonymousId, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
522+
session_id: validateSessionId(error.sessionId),
523+
timestamp: typeof error.timestamp === "number" ? error.timestamp : now,
524+
path: sanitizeString(error.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
525+
message: sanitizeString(error.message, VALIDATION_LIMITS.STRING_MAX_LENGTH),
526+
filename: sanitizeString(error.filename, VALIDATION_LIMITS.STRING_MAX_LENGTH),
527+
lineno: error.lineno ?? undefined,
528+
colno: error.colno ?? undefined,
529+
stack: sanitizeString(error.stack, VALIDATION_LIMITS.STRING_MAX_LENGTH),
530+
error_type: sanitizeString(error.errorType, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH) || "Error",
531+
}));
559532

560-
if (spans.length > 0) {
561-
try {
562-
await sendEventBatch("analytics-error-spans", spans);
563-
} catch (error) {
564-
captureError(error, { count: spans.length });
565-
}
533+
try {
534+
await sendEventBatch("analytics-error-spans", spans);
535+
} catch (error) {
536+
captureError(error, { count: spans.length });
566537
}
567538
}
568539

@@ -602,34 +573,20 @@ export async function insertIndividualVitals(
602573
}
603574

604575
const now = Date.now();
605-
const spans: WebVitalsSpan[] = [];
606-
607-
for (const vital of vitals) {
608-
// Dedup by client+session+path+metric
609-
const dedupKey = `vital_${clientId}_${vital.sessionId}_${vital.path}_${vital.metricName}`;
610-
const isDuplicate = await checkDuplicate(dedupKey, "web_vitals");
611-
if (isDuplicate) {
612-
continue;
613-
}
614-
615-
const span: WebVitalsSpan = {
616-
client_id: clientId,
617-
session_id: validateSessionId(vital.sessionId),
618-
timestamp: typeof vital.timestamp === "number" ? vital.timestamp : now,
619-
path: sanitizeString(vital.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
620-
metric_name: vital.metricName,
621-
metric_value: vital.metricValue,
622-
};
623-
624-
spans.push(span);
625-
}
576+
const spans: WebVitalsSpan[] = vitals.map((vital) => ({
577+
client_id: clientId,
578+
anonymous_id: sanitizeString(vital.anonymousId, VALIDATION_LIMITS.SHORT_STRING_MAX_LENGTH),
579+
session_id: validateSessionId(vital.sessionId),
580+
timestamp: typeof vital.timestamp === "number" ? vital.timestamp : now,
581+
path: sanitizeString(vital.path, VALIDATION_LIMITS.STRING_MAX_LENGTH),
582+
metric_name: vital.metricName,
583+
metric_value: vital.metricValue,
584+
}));
626585

627-
if (spans.length > 0) {
628-
try {
629-
await sendEventBatch("analytics-vitals-spans", spans);
630-
} catch (error) {
631-
captureError(error, { count: spans.length });
632-
}
586+
try {
587+
await sendEventBatch("analytics-vitals-spans", spans);
588+
} catch (error) {
589+
captureError(error, { count: spans.length });
633590
}
634591
}
635592

infra/ingest/console-config.yml

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
kafka:
2-
brokers: ["redpanda:9092"]
32
sasl:
43
enabled: true
4+
mechanism: SCRAM-SHA-256
5+
tls:
6+
enabled: false
57
redpanda:
68
adminApi:
7-
enabled: true
8-
urls: ["http://redpanda:9644"]
9-
authentication:
10-
useSecureCookies: false
11-
basic:
12-
enabled: true
9+
enabled: false

packages/db/src/clickhouse/schema.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ SETTINGS index_granularity = 8192
193193
const CREATE_WEB_VITALS_SPANS_TABLE = `
194194
CREATE TABLE IF NOT EXISTS ${ANALYTICS_DATABASE}.web_vitals_spans (
195195
client_id String CODEC(ZSTD(1)),
196+
anonymous_id String CODEC(ZSTD(1)),
196197
session_id String CODEC(ZSTD(1)),
197198
198199
timestamp DateTime64(3, 'UTC') CODEC(Delta(8), ZSTD(1)),
@@ -206,7 +207,6 @@ CREATE TABLE IF NOT EXISTS ${ANALYTICS_DATABASE}.web_vitals_spans (
206207
) ENGINE = MergeTree
207208
PARTITION BY toDate(timestamp)
208209
ORDER BY (client_id, metric_name, path, timestamp)
209-
TTL toDateTime(timestamp) + INTERVAL 90 DAY
210210
SETTINGS index_granularity = 8192, ttl_only_drop_parts = 1
211211
`;
212212

@@ -402,6 +402,7 @@ SETTINGS index_granularity = 8192
402402
const CREATE_CUSTOM_EVENT_SPANS_TABLE = `
403403
CREATE TABLE IF NOT EXISTS ${ANALYTICS_DATABASE}.custom_event_spans (
404404
client_id String CODEC(ZSTD(1)),
405+
anonymous_id String CODEC(ZSTD(1)),
405406
session_id String CODEC(ZSTD(1)),
406407
407408
timestamp DateTime64(3, 'UTC') CODEC(Delta(8), ZSTD(1)),
@@ -521,6 +522,7 @@ export type WebVitalMetricName =
521522

522523
export type WebVitalsSpan = {
523524
client_id: string;
525+
anonymous_id: string;
524526
session_id: string;
525527
timestamp: number;
526528
path: string;
@@ -667,6 +669,7 @@ export type CustomEvent = {
667669
*/
668670
export type CustomEventSpan = {
669671
client_id: string;
672+
anonymous_id: string;
670673
session_id: string;
671674
timestamp: number;
672675
path: string;

packages/tracker/src/core/client.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,14 @@ export class HttpClient {
116116
fetch<T>(
117117
endpoint: string,
118118
data: any,
119-
options: RequestInit = {}
119+
options: RequestInit = {},
120+
queryParams?: Record<string, string>
120121
): Promise<T | null> {
121-
const url = `${this.baseUrl}${endpoint}`;
122+
let url = `${this.baseUrl}${endpoint}`;
123+
if (queryParams) {
124+
const params = new URLSearchParams(queryParams);
125+
url = `${url}?${params.toString()}`;
126+
}
122127
return this.post(url, data, options, 0);
123128
}
124129
}

packages/tracker/src/core/tracker.ts

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { HttpClient } from "./client";
2-
import type { BaseEvent, ErrorSpan, EventContext, TrackerOptions, WebVitalEvent } from "./types";
2+
import type { BaseEvent, CustomEventSpan, ErrorSpan, EventContext, TrackerOptions, WebVitalEvent } from "./types";
33
import { generateUUIDv4, logger } from "./utils";
44

55
const HEADLESS_CHROME_REGEX = /\bHeadlessChrome\b/i;
@@ -43,9 +43,18 @@ export class BaseTracker {
4343
errorsTimer: Timer | null = null;
4444
private isFlushingErrors = false;
4545

46+
// Custom Events Queue
47+
customEventsQueue: CustomEventSpan[] = [];
48+
customEventsTimer: Timer | null = null;
49+
private isFlushingCustomEvents = false;
50+
4651
private readonly routeChangeCallbacks: Array<(path: string) => void> = [];
4752

4853
constructor(options: TrackerOptions) {
54+
if (!options.clientId || typeof options.clientId !== "string") {
55+
throw new Error("[Databuddy] clientId is required and must be a string");
56+
}
57+
4958
this.options = {
5059
disabled: false,
5160
trackPerformance: true,
@@ -61,10 +70,9 @@ export class BaseTracker {
6170
...options,
6271
};
6372

64-
const headers: Record<string, string> = {};
65-
if (this.options.clientId) {
66-
headers["databuddy-client-id"] = this.options.clientId;
67-
}
73+
const headers: Record<string, string> = {
74+
"databuddy-client-id": this.options.clientId,
75+
};
6876
headers["databuddy-sdk-name"] = this.options.sdk || "web";
6977
headers["databuddy-sdk-version"] = this.options.sdkVersion || "2.0.0";
7078

@@ -354,7 +362,7 @@ export class BaseTracker {
354362
}
355363

356364
logger.log("Sending event", event);
357-
return this.api.fetch("/", event, { keepalive: true });
365+
return this.api.fetch("/", event, { keepalive: true }, { client_id: this.options.clientId });
358366
}
359367

360368
addToBatch(event: BaseEvent): Promise<void> {
@@ -389,7 +397,7 @@ export class BaseTracker {
389397
try {
390398
const result = await this.api.fetch("/batch", batchEvents, {
391399
keepalive: true,
392-
});
400+
}, { client_id: this.options.clientId });
393401
logger.log("Batch sent", result);
394402
return result;
395403
} catch (_error) {
@@ -444,7 +452,7 @@ export class BaseTracker {
444452
try {
445453
const result = await this.api.fetch("/vitals", vitals, {
446454
keepalive: true,
447-
});
455+
}, { client_id: this.options.clientId });
448456
logger.log("Vitals sent", result);
449457
return result;
450458
} catch (error) {
@@ -496,7 +504,7 @@ export class BaseTracker {
496504
try {
497505
const result = await this.api.fetch("/errors", errors, {
498506
keepalive: true,
499-
});
507+
}, { client_id: this.options.clientId });
500508
logger.log("Errors sent", result);
501509
return result;
502510
} catch (error) {
@@ -507,6 +515,58 @@ export class BaseTracker {
507515
}
508516
}
509517

518+
sendCustomEvent(event: CustomEventSpan): Promise<void> {
519+
if (this.shouldSkipTracking()) {
520+
return Promise.resolve();
521+
}
522+
523+
logger.log("Queueing custom event", event);
524+
return this.addToCustomEventsQueue(event);
525+
}
526+
527+
addToCustomEventsQueue(event: CustomEventSpan): Promise<void> {
528+
this.customEventsQueue.push(event);
529+
if (this.customEventsTimer === null) {
530+
this.customEventsTimer = setTimeout(
531+
() => this.flushCustomEvents(),
532+
this.options.batchTimeout
533+
);
534+
}
535+
if (this.customEventsQueue.length >= 10) {
536+
this.flushCustomEvents();
537+
}
538+
return Promise.resolve();
539+
}
540+
541+
async flushCustomEvents() {
542+
if (this.customEventsTimer) {
543+
clearTimeout(this.customEventsTimer);
544+
this.customEventsTimer = null;
545+
}
546+
if (this.customEventsQueue.length === 0 || this.isFlushingCustomEvents) {
547+
return;
548+
}
549+
550+
this.isFlushingCustomEvents = true;
551+
const events = [...this.customEventsQueue];
552+
this.customEventsQueue = [];
553+
554+
logger.log("Flushing custom events", events.length);
555+
556+
try {
557+
const result = await this.api.fetch("/events", events, {
558+
keepalive: true,
559+
}, { client_id: this.options.clientId });
560+
logger.log("Custom events sent", result);
561+
return result;
562+
} catch (error) {
563+
logger.error("Custom events batch failed", error);
564+
return null;
565+
} finally {
566+
this.isFlushingCustomEvents = false;
567+
}
568+
}
569+
510570
sendBeacon(data: unknown, endpoint = "/vitals"): boolean {
511571
if (this.isServer()) {
512572
return false;
@@ -519,7 +579,8 @@ export class BaseTracker {
519579
type: "application/json",
520580
});
521581
const baseUrl = this.options.apiUrl || "https://basket.databuddy.cc";
522-
return navigator.sendBeacon(`${baseUrl}${endpoint}`, blob);
582+
const url = `${baseUrl}${endpoint}?client_id=${encodeURIComponent(this.options.clientId)}`;
583+
return navigator.sendBeacon(url, blob);
523584
} catch {
524585
return false;
525586
}

packages/tracker/src/core/types.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
/** biome-ignore-all lint/style/useConsistentTypeDefinitions: Interfaces are needed for declaration merging */
22
export type TrackerOptions = {
3-
// Basic Config
3+
clientId: string;
44
disabled?: boolean;
55
apiUrl?: string;
6-
clientId?: string;
76
sdk?: string;
87
sdkVersion?: string;
98

@@ -90,6 +89,15 @@ export type ErrorSpan = {
9089
sessionId?: string;
9190
};
9291

92+
export type CustomEventSpan = {
93+
timestamp: number;
94+
path: string;
95+
eventName: string;
96+
anonymousId?: string;
97+
sessionId?: string;
98+
properties?: Record<string, unknown>;
99+
};
100+
93101
export type DatabuddyGlobal = {
94102
track: (name: string, props?: Record<string, unknown>) => void;
95103
screenView: (props?: Record<string, unknown>) => void;

0 commit comments

Comments
 (0)