Skip to content

Commit c9eecb8

Browse files
committed
refactor: simplify & cleanup producer
1 parent a65ffb0 commit c9eecb8

File tree

1 file changed

+21
-63
lines changed

1 file changed

+21
-63
lines changed

apps/basket/src/lib/producer.ts

Lines changed: 21 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
import type { ClickHouseClient } from "@clickhouse/client";
22
import { clickHouse, TABLE_NAMES } from "@databuddy/db";
3-
import { Semaphore } from "async-mutex";
43
import { CompressionTypes, Kafka, type Producer } from "kafkajs";
54
import { captureError, record, setAttributes } from "./tracing";
65

76
type BufferedEvent = {
87
table: string;
98
event: unknown;
10-
retries: number;
11-
timestamp: number;
129
};
1310

1411
type ProducerStats = {
@@ -26,15 +23,13 @@ type ProducerConfig = {
2623
username?: string;
2724
password?: string;
2825
selfHost?: boolean;
29-
semaphoreLimit?: number;
3026
reconnectCooldown?: number;
3127
kafkaTimeout?: number;
3228
maxProducerRetries?: number;
3329
producerRetryDelay?: number;
3430
bufferInterval?: number;
3531
bufferMax?: number;
3632
bufferHardMax?: number;
37-
maxRetries?: number;
3833
chunkSize?: number;
3934
flushTimeout?: number;
4035
};
@@ -44,15 +39,13 @@ type RequiredProducerConfig = {
4439
username?: string;
4540
password?: string;
4641
selfHost: boolean;
47-
semaphoreLimit: number;
4842
reconnectCooldown: number;
4943
kafkaTimeout: number;
5044
maxProducerRetries: number;
5145
producerRetryDelay: number;
5246
bufferInterval: number;
5347
bufferMax: number;
5448
bufferHardMax: number;
55-
maxRetries: number;
5649
chunkSize: number;
5750
flushTimeout: number;
5851
};
@@ -66,7 +59,6 @@ type ProducerDependencies = {
6659
export class EventProducer {
6760
private readonly config: RequiredProducerConfig;
6861
private readonly dependencies: ProducerDependencies;
69-
private readonly semaphore: Semaphore;
7062
private readonly stats: ProducerStats;
7163
private readonly buffer: BufferedEvent[] = [];
7264

@@ -83,21 +75,18 @@ export class EventProducer {
8375
constructor(config: ProducerConfig, dependencies: ProducerDependencies) {
8476
this.config = {
8577
selfHost: false,
86-
semaphoreLimit: 15,
8778
reconnectCooldown: 60_000,
8879
kafkaTimeout: 10_000,
8980
maxProducerRetries: 3,
9081
producerRetryDelay: 300,
9182
bufferInterval: 5000,
9283
bufferMax: 1000,
9384
bufferHardMax: 10_000,
94-
maxRetries: 3,
9585
chunkSize: 5000,
9686
flushTimeout: 30_000,
9787
...config,
9888
};
9989
this.dependencies = dependencies;
100-
this.semaphore = new Semaphore(this.config.semaphoreLimit);
10190
this.stats = {
10291
sent: 0,
10392
failed: 0,
@@ -150,7 +139,7 @@ export class EventProducer {
150139
maxRetryTime: 3000,
151140
},
152141
idempotent: true,
153-
maxInFlightRequests: 5,
142+
maxInFlightRequests: 15,
154143
});
155144
}
156145

@@ -198,30 +187,25 @@ export class EventProducer {
198187

199188
try {
200189
const grouped = items.reduce(
201-
(acc, { table, event, retries, timestamp }) => {
190+
(acc, { table, event }) => {
202191
if (!acc[table]) {
203192
acc[table] = [];
204193
}
205-
acc[table].push({ event, retries, timestamp });
194+
acc[table].push(event);
206195
return acc;
207196
},
208-
{} as Record<
209-
string,
210-
Array<{ event: unknown; retries: number; timestamp: number }>
211-
>
197+
{} as Record<string, unknown[]>
212198
);
213199

214200
const results = await Promise.allSettled(
215-
Object.entries(grouped).map(async ([table, items]) => {
201+
Object.entries(grouped).map(async ([table, events]) => {
216202
const controller = new AbortController();
217203
const timeout = setTimeout(
218204
() => controller.abort(),
219205
this.config.flushTimeout
220206
);
221207

222208
try {
223-
const events = items.map((i) => i.event);
224-
225209
for (let i = 0; i < events.length; i += this.config.chunkSize) {
226210
const chunk = events.slice(i, i + this.config.chunkSize);
227211
await this.dependencies.clickHouse.insert({
@@ -237,28 +221,17 @@ export class EventProducer {
237221
this.stats.errors += 1;
238222
captureError(error, { message: `Flush failed for ${table}` });
239223

240-
for (const { event, retries, timestamp } of items) {
241-
const age = Date.now() - timestamp;
242-
if (
243-
retries < this.config.maxRetries &&
244-
age < 300_000 &&
245-
this.buffer.length < this.config.bufferHardMax
246-
) {
247-
this.buffer.push({
248-
table,
249-
event,
250-
retries: retries + 1,
251-
timestamp,
252-
});
253-
} else {
254-
this.stats.dropped += 1;
255-
captureError(error, {
256-
message: `Dropped event (retries: ${retries}, age: ${age}ms, buffer: ${this.buffer.length})`,
257-
table,
258-
eventId:
259-
(event as { event_id?: string }).event_id || "unknown",
260-
});
224+
if (this.buffer.length + events.length <= this.config.bufferHardMax) {
225+
for (const event of events) {
226+
this.buffer.push({ table, event });
261227
}
228+
} else {
229+
this.stats.dropped += events.length;
230+
captureError(error, {
231+
message: `Dropped ${events.length} events - buffer full`,
232+
table,
233+
bufferSize: this.buffer.length,
234+
});
262235
}
263236
} finally {
264237
clearTimeout(timeout);
@@ -320,7 +293,7 @@ export class EventProducer {
320293
return;
321294
}
322295

323-
this.buffer.push({ table, event, retries: 0, timestamp: Date.now() });
296+
this.buffer.push({ table, event });
324297
this.stats.buffered += 1;
325298

326299
if (!this.timer) {
@@ -350,8 +323,6 @@ export class EventProducer {
350323
return;
351324
}
352325

353-
const [, release] = await this.semaphore.acquire();
354-
355326
try {
356327
if (
357328
this.isEnabled() &&
@@ -403,8 +374,6 @@ export class EventProducer {
403374
"kafka.error": true,
404375
"kafka.buffered": true,
405376
});
406-
} finally {
407-
release();
408377
}
409378
});
410379
}
@@ -446,8 +415,6 @@ export class EventProducer {
446415
return;
447416
}
448417

449-
const [, release] = await this.semaphore.acquire();
450-
451418
try {
452419
if (
453420
this.isEnabled() &&
@@ -502,8 +469,6 @@ export class EventProducer {
502469
"kafka.error": true,
503470
"kafka.buffered": true,
504471
});
505-
} finally {
506-
release();
507472
}
508473
});
509474
}
@@ -514,22 +479,15 @@ export class EventProducer {
514479
}
515480
this.shuttingDown = true;
516481

517-
let checks = 0;
518-
while (
519-
this.semaphore.getValue() < this.config.semaphoreLimit &&
520-
checks < 50
521-
) {
522-
checks += 1;
523-
await new Promise((r) => setTimeout(r, 100));
524-
}
482+
// Wait a bit for in-flight requests to complete
483+
await new Promise((r) => setTimeout(r, 1000));
525484

485+
// Flush remaining buffer
526486
await this.flush();
527487

528-
let finalFlushAttempts = 0;
529-
while (this.buffer.length > 0 && finalFlushAttempts < 3 && !this.flushing) {
530-
finalFlushAttempts += 1;
488+
// Try one more time if there's still items
489+
if (this.buffer.length > 0 && !this.flushing) {
531490
await this.flush();
532-
await new Promise((r) => setTimeout(r, 1000));
533491
}
534492

535493
if (this.timer) {

0 commit comments

Comments
 (0)