Skip to content

Commit 1b0495b

Browse files
committed
refactor: completely switch to kafka system
1 parent f1e21ae commit 1b0495b

File tree

3 files changed

+20
-127
lines changed

3 files changed

+20
-127
lines changed

apps/basket/src/lib/blocked-traffic.ts

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
import { randomUUID } from 'node:crypto';
2-
import {
3-
type AnalyticsEvent,
4-
type BlockedTraffic,
5-
clickHouse,
6-
} from '@databuddy/db';
2+
import { type BlockedTraffic } from '@databuddy/db';
73
import { extractIpFromRequest, getGeo } from '../utils/ip-geo';
84
import { parseUserAgent } from '../utils/user-agent';
95
import { sanitizeString, VALIDATION_LIMITS } from '../utils/validation';
@@ -87,23 +83,11 @@ export async function logBlockedTraffic(
8783
created_at: now,
8884
};
8985

90-
clickHouse
91-
.insert({
92-
table: 'analytics.blocked_traffic',
93-
values: [blockedEvent],
94-
format: 'JSONEachRow',
95-
})
96-
.then(() => {
97-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
98-
sendEvent('analytics-blocked-traffic', blockedEvent);
99-
}
100-
})
101-
.catch((err) => {
102-
console.error('Failed to log blocked traffic', { error: err as Error });
103-
throw err;
104-
});
86+
sendEvent('analytics-blocked-traffic', blockedEvent);
10587
} catch (error) {
106-
console.error('Failed to log blocked traffic', { error: error as Error });
88+
console.error('Failed to send blocked traffic to Kafka', {
89+
error: error as Error,
90+
});
10791
throw error;
10892
}
10993
}

apps/basket/src/lib/event-service.ts

Lines changed: 11 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
type CustomOutgoingLink,
66
type ErrorEvent,
77
type WebVitalsEvent,
8-
clickHouse,
98
} from '@databuddy/db';
109
import { checkDuplicate } from './security';
1110
import { sendEvent } from './producer';
@@ -86,24 +85,9 @@ export async function insertError(
8685
};
8786

8887
try {
89-
await clickHouse.insert({
90-
table: 'analytics.errors',
91-
values: [errorEvent],
92-
format: 'JSONEachRow',
93-
});
94-
95-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
96-
try {
97-
sendEvent('analytics-errors', errorEvent);
98-
} catch (kafkaErr) {
99-
console.error('Failed to send error event to Kafka', {
100-
error: kafkaErr as Error,
101-
eventId,
102-
});
103-
}
104-
}
88+
sendEvent('analytics-errors', errorEvent);
10589
} catch (err) {
106-
console.error('Failed to insert error event', {
90+
console.error('Failed to send error event to Kafka', {
10791
error: err as Error,
10892
eventId,
10993
});
@@ -168,24 +152,9 @@ export async function insertWebVitals(
168152
};
169153

170154
try {
171-
await clickHouse.insert({
172-
table: 'analytics.web_vitals',
173-
values: [webVitalsEvent],
174-
format: 'JSONEachRow',
175-
});
176-
177-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
178-
try {
179-
sendEvent('analytics-web-vitals', webVitalsEvent);
180-
} catch (kafkaErr) {
181-
console.error('Failed to send web vitals event to Kafka', {
182-
error: kafkaErr as Error,
183-
eventId,
184-
});
185-
}
186-
}
155+
sendEvent('analytics-web-vitals', webVitalsEvent);
187156
} catch (err) {
188-
console.error('Failed to insert web vitals event', {
157+
console.error('Failed to send web vitals event to Kafka', {
189158
error: err as Error,
190159
eventId,
191160
});
@@ -237,24 +206,9 @@ export async function insertCustomEvent(
237206
};
238207

239208
try {
240-
await clickHouse.insert({
241-
table: 'analytics.custom_events',
242-
values: [customEvent],
243-
format: 'JSONEachRow',
244-
});
245-
246-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
247-
try {
248-
sendEvent('analytics-custom-events', customEvent);
249-
} catch (kafkaErr) {
250-
console.error('Failed to send custom event to Kafka', {
251-
error: kafkaErr as Error,
252-
eventId,
253-
});
254-
}
255-
}
209+
sendEvent('analytics-custom-events', customEvent);
256210
} catch (err) {
257-
console.error('Failed to insert custom event', {
211+
console.error('Failed to send custom event to Kafka', {
258212
error: err as Error,
259213
eventId,
260214
});
@@ -304,24 +258,9 @@ export async function insertOutgoingLink(
304258
};
305259

306260
try {
307-
await clickHouse.insert({
308-
table: 'analytics.outgoing_links',
309-
values: [outgoingLinkEvent],
310-
format: 'JSONEachRow',
311-
});
312-
313-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
314-
try {
315-
sendEvent('analytics-outgoing-links', outgoingLinkEvent);
316-
} catch (kafkaErr) {
317-
console.error('Failed to send outgoing link event to Kafka', {
318-
error: kafkaErr as Error,
319-
eventId,
320-
});
321-
}
322-
}
261+
sendEvent('analytics-outgoing-links', outgoingLinkEvent);
323262
} catch (err) {
324-
console.error('Failed to insert outgoing link event', {
263+
console.error('Failed to send outgoing link event to Kafka', {
325264
error: err as Error,
326265
eventId,
327266
});
@@ -330,7 +269,7 @@ export async function insertOutgoingLink(
330269
}
331270

332271
/**
333-
* Insert a track event (pageview/analytics event) into ClickHouse
272+
* Insert a track event (pageview/analytics event) via Kafka
334273
*/
335274
export async function insertTrackEvent(
336275
trackData: any,
@@ -443,24 +382,9 @@ export async function insertTrackEvent(
443382
};
444383

445384
try {
446-
await clickHouse.insert({
447-
table: 'analytics.events',
448-
values: [trackEvent],
449-
format: 'JSONEachRow',
450-
});
451-
452-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
453-
try {
454-
sendEvent('analytics-events', trackEvent);
455-
} catch (kafkaErr) {
456-
console.error('Failed to send track event to Kafka', {
457-
error: kafkaErr as Error,
458-
eventId,
459-
});
460-
}
461-
}
385+
sendEvent('analytics-events', trackEvent);
462386
} catch (err) {
463-
console.error('Failed to insert track event', {
387+
console.error('Failed to send track event to Kafka', {
464388
error: err as Error,
465389
eventId,
466390
});

apps/basket/src/routes/email.ts

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createHash, randomUUID } from 'node:crypto';
2-
import { clickHouse, type EmailEvent } from '@databuddy/db';
2+
import { type EmailEvent } from '@databuddy/db';
33
import { redis } from '@databuddy/redis';
44
import {
55
batchEmailEventSchema,
@@ -44,30 +44,15 @@ async function insertEmailEvent(emailData: EmailEventInput): Promise<void> {
4444
};
4545

4646
try {
47-
await clickHouse.insert({
48-
table: 'analytics.email_events',
49-
values: [emailEvent],
50-
format: 'JSONEachRow',
51-
});
52-
53-
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
54-
try {
55-
sendEvent('analytics-email-events', emailEvent);
56-
} catch (kafkaErr) {
57-
logger.error('Failed to send email event to Kafka', {
58-
error: kafkaErr as Error,
59-
eventId: emailEvent.event_id,
60-
});
61-
}
62-
}
47+
sendEvent('analytics-email-events', emailEvent);
6348

64-
logger.info('Email event inserted successfully', {
49+
logger.info('Email event sent to Kafka successfully', {
6550
domain: emailEvent.domain,
6651
labels: emailEvent.labels,
6752
eventId: emailEvent.event_id,
6853
});
6954
} catch (err) {
70-
logger.error('Failed to insert email event', {
55+
logger.error('Failed to send email event to Kafka', {
7156
error: err as Error,
7257
domain: emailEvent.domain,
7358
eventId: emailEvent.event_id,

0 commit comments

Comments
 (0)