Skip to content

Commit f1e21ae

Browse files
committed
feat: add producer to all events
1 parent 03a838f commit f1e21ae

File tree

6 files changed

+192
-61
lines changed

6 files changed

+192
-61
lines changed

apps/basket/src/lib/blocked-traffic.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { extractIpFromRequest, getGeo } from '../utils/ip-geo';
88
import { parseUserAgent } from '../utils/user-agent';
99
import { sanitizeString, VALIDATION_LIMITS } from '../utils/validation';
10+
import { sendEvent } from './producer';
1011

1112
/**
1213
* Log blocked traffic for security and monitoring purposes
@@ -93,7 +94,9 @@ export async function logBlockedTraffic(
9394
format: 'JSONEachRow',
9495
})
9596
.then(() => {
96-
// Successfully logged blocked traffic
97+
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
98+
sendEvent('analytics-blocked-traffic', blockedEvent);
99+
}
97100
})
98101
.catch((err) => {
99102
console.error('Failed to log blocked traffic', { error: err as Error });

apps/basket/src/lib/event-service.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,17 @@ export async function insertError(
9191
values: [errorEvent],
9292
format: 'JSONEachRow',
9393
});
94+
95+
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
96+
try {
97+
sendEvent('analytics-errors', errorEvent);
98+
} catch (kafkaErr) {
99+
console.error('Failed to send error event to Kafka', {
100+
error: kafkaErr as Error,
101+
eventId,
102+
});
103+
}
104+
}
94105
} catch (err) {
95106
console.error('Failed to insert error event', {
96107
error: err as Error,
@@ -162,6 +173,17 @@ export async function insertWebVitals(
162173
values: [webVitalsEvent],
163174
format: 'JSONEachRow',
164175
});
176+
177+
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
178+
try {
179+
sendEvent('analytics-web-vitals', webVitalsEvent);
180+
} catch (kafkaErr) {
181+
console.error('Failed to send web vitals event to Kafka', {
182+
error: kafkaErr as Error,
183+
eventId,
184+
});
185+
}
186+
}
165187
} catch (err) {
166188
console.error('Failed to insert web vitals event', {
167189
error: err as Error,
@@ -220,7 +242,22 @@ export async function insertCustomEvent(
220242
values: [customEvent],
221243
format: 'JSONEachRow',
222244
});
245+
246+
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
247+
try {
248+
sendEvent('analytics-custom-events', customEvent);
249+
} catch (kafkaErr) {
250+
console.error('Failed to send custom event to Kafka', {
251+
error: kafkaErr as Error,
252+
eventId,
253+
});
254+
}
255+
}
223256
} catch (err) {
257+
console.error('Failed to insert custom event', {
258+
error: err as Error,
259+
eventId,
260+
});
224261
throw err;
225262
}
226263
}
@@ -272,6 +309,17 @@ export async function insertOutgoingLink(
272309
values: [outgoingLinkEvent],
273310
format: 'JSONEachRow',
274311
});
312+
313+
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
314+
try {
315+
sendEvent('analytics-outgoing-links', outgoingLinkEvent);
316+
} catch (kafkaErr) {
317+
console.error('Failed to send outgoing link event to Kafka', {
318+
error: kafkaErr as Error,
319+
eventId,
320+
});
321+
}
322+
}
275323
} catch (err) {
276324
console.error('Failed to insert outgoing link event', {
277325
error: err as Error,
@@ -283,7 +331,6 @@ export async function insertOutgoingLink(
283331

284332
/**
285333
* Insert a track event (pageview/analytics event) into ClickHouse
286-
* Optionally also sends to Kafka if feature flag is enabled
287334
*/
288335
export async function insertTrackEvent(
289336
trackData: any,

apps/basket/src/lib/producer.ts

Lines changed: 41 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,64 @@
11
import { CompressionTypes, Kafka } from 'kafkajs';
22

33
const BROKER = process.env.KAFKA_BROKERS as string;
4-
console.log('BROKER', BROKER);
54

65
const kafka = new Kafka({
7-
clientId: 'basket',
8-
brokers: [BROKER],
6+
clientId: 'basket',
7+
brokers: [BROKER],
98
});
109

1110
const producer = kafka.producer({
12-
allowAutoTopicCreation: true,
11+
allowAutoTopicCreation: true,
1312
});
1413

1514
let connected = false;
1615

1716
const connectProducer = async () => {
18-
if (!connected) {
19-
await producer.connect();
20-
connected = true;
21-
}
17+
if (!connected) {
18+
await producer.connect();
19+
connected = true;
20+
}
2221
};
2322

2423
export const sendEventSync = async (topic: string, event: any, key?: string) => {
25-
try {
26-
await connectProducer();
27-
await producer.send({
28-
topic,
29-
messages: [{
30-
value: JSON.stringify(event),
31-
key: key || event.client_id
32-
}],
33-
timeout: 10000,
34-
compression: CompressionTypes.GZIP,
35-
});
36-
} catch (err) {
37-
console.error('Failed to send event', err);
38-
throw err;
39-
}
24+
try {
25+
await connectProducer();
26+
await producer.send({
27+
topic,
28+
messages: [{
29+
value: JSON.stringify(event),
30+
key: key || event.client_id,
31+
}],
32+
timeout: 10000,
33+
compression: CompressionTypes.GZIP,
34+
});
35+
} catch (err) {
36+
console.error('Failed to send event', err);
37+
throw err;
38+
}
4039
};
4140

4241
export const sendEvent = async (topic: string, event: any, key?: string) => {
43-
try {
44-
await connectProducer();
45-
producer.send({
46-
topic,
47-
messages: [{
48-
value: JSON.stringify(event),
49-
key: key || event.client_id
50-
}],
51-
timeout: 10000,
52-
compression: CompressionTypes.GZIP,
53-
});
54-
} catch (err) {
55-
console.error('Failed to send event', err);
56-
throw err;
57-
}
42+
try {
43+
await connectProducer();
44+
producer.send({
45+
topic,
46+
messages: [{
47+
value: JSON.stringify(event),
48+
key: key || event.client_id,
49+
}],
50+
timeout: 10000,
51+
compression: CompressionTypes.GZIP,
52+
});
53+
} catch (err) {
54+
console.error('Failed to send event', err);
55+
throw err;
56+
}
5857
};
5958

6059
export const disconnectProducer = async () => {
61-
if (connected) {
62-
await producer.disconnect();
63-
connected = false;
64-
}
60+
if (connected) {
61+
await producer.disconnect();
62+
connected = false;
63+
}
6564
};

apps/basket/src/routes/email.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
type EmailEventInput,
77
emailEventSchema,
88
} from '@databuddy/validation';
9+
import { sendEvent } from '../lib/producer';
910
import { Elysia } from 'elysia';
1011
import { logger } from '../lib/logger';
1112

@@ -49,6 +50,17 @@ async function insertEmailEvent(emailData: EmailEventInput): Promise<void> {
4950
format: 'JSONEachRow',
5051
});
5152

53+
if (process.env.ENABLE_KAFKA_EVENTS === 'true') {
54+
try {
55+
sendEvent('analytics-email-events', emailEvent);
56+
} catch (kafkaErr) {
57+
logger.error('Failed to send email event to Kafka', {
58+
error: kafkaErr as Error,
59+
eventId: emailEvent.event_id,
60+
});
61+
}
62+
}
63+
5264
logger.info('Email event inserted successfully', {
5365
domain: emailEvent.domain,
5466
labels: emailEvent.labels,

apps/dashboard/app/layout.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export default function RootLayout({
116116
>
117117
<Databuddy
118118
apiUrl={
119-
isLocalhost ? 'http://localhost:4000' : 'https://basket.databuddy.cc'
119+
isLocalhost ? 'https://staging-basket.databuddy.cc' : 'https://basket.databuddy.cc'
120120
}
121121
clientId={
122122
isLocalhost

infra/ingest/vector.yaml

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,97 @@ sources:
55
group_id: vector-analytics
66
topics:
77
- analytics-events
8+
- analytics-errors
9+
- analytics-web-vitals
10+
- analytics-custom-events
11+
- analytics-outgoing-links
12+
- analytics-email-events
13+
- analytics-blocked-traffic
814
auto_offset_reset: earliest
915
decoding:
1016
codec: json
17+
partition_assignment: cooperative-sticky
1118

12-
sinks:
13-
clickhouse:
14-
type: clickhouse
19+
transforms:
20+
route_analytics:
21+
type: route
1522
inputs:
1623
- redpanda
17-
endpoint: "${CLICKHOUSE_URL}"
18-
database: analytics
24+
route:
25+
events: '.topic == "analytics-events"'
26+
errors: '.topic == "analytics-errors"'
27+
web_vitals: '.topic == "analytics-web-vitals"'
28+
custom_events: '.topic == "analytics-custom-events"'
29+
outgoing_links: '.topic == "analytics-outgoing-links"'
30+
email_events: '.topic == "analytics-email-events"'
31+
blocked_traffic: '.topic == "analytics-blocked-traffic"'
32+
33+
sink_defaults: &ch_defaults
34+
type: clickhouse
35+
endpoint: "${CLICKHOUSE_URL}"
36+
database: analytics
37+
auth:
38+
strategy: basic
39+
user: "${CLICKHOUSE_USER}"
40+
password: "${CLICKHOUSE_PASSWORD}"
41+
batch:
42+
max_events: 5000
43+
max_bytes: 5000000
44+
timeout_secs: 5
45+
date_time_best_effort: true
46+
skip_unknown_fields: true
47+
encoding:
48+
timestamp_format: unix_ms
49+
50+
sinks:
51+
clickhouse_events:
52+
<<: *ch_defaults
53+
inputs:
54+
- route_analytics.events
1955
table: events
20-
auth:
21-
strategy: basic
22-
user: "${CLICKHOUSE_USER}"
23-
password: "${CLICKHOUSE_PASSWORD}"
24-
batch:
25-
max_events: 1000
26-
max_bytes: 5000000
27-
timeout_secs: 5
28-
date_time_best_effort: true
29-
skip_unknown_fields: true
56+
57+
clickhouse_errors:
58+
<<: *ch_defaults
59+
inputs:
60+
- route_analytics.errors
61+
table: errors
62+
63+
clickhouse_web_vitals:
64+
<<: *ch_defaults
65+
inputs:
66+
- route_analytics.web_vitals
67+
table: web_vitals
68+
69+
clickhouse_custom_events:
70+
<<: *ch_defaults
71+
inputs:
72+
- route_analytics.custom_events
73+
table: custom_events
74+
75+
clickhouse_outgoing_links:
76+
<<: *ch_defaults
77+
inputs:
78+
- route_analytics.outgoing_links
79+
table: outgoing_links
80+
81+
clickhouse_email_events:
82+
<<: *ch_defaults
83+
inputs:
84+
- route_analytics.email_events
85+
table: email_events
86+
87+
clickhouse_blocked_traffic:
88+
<<: *ch_defaults
89+
inputs:
90+
- route_analytics.blocked_traffic
91+
table: blocked_traffic
92+
93+
dead_letters:
94+
type: file
95+
inputs:
96+
- route_analytics._unmatched
97+
path: /var/log/vector/dead_letters.log
3098
encoding:
31-
timestamp_format: unix_ms
99+
codec: json
100+
101+

0 commit comments

Comments
 (0)