Skip to content

Commit 9abdb92

Browse files
authored
[service-utils] import LZ4 compression for usageV2 (#6123)
1 parent 6ed2fd1 commit 9abdb92

File tree

1 file changed

+11
-16
lines changed

1 file changed

+11
-16
lines changed

packages/service-utils/src/node/usageV2.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
11
import { randomUUID } from "node:crypto";
22
import { checkServerIdentity } from "node:tls";
3-
import {
4-
CompressionCodecs,
5-
CompressionTypes,
6-
Kafka,
7-
type Producer,
8-
type ProducerConfig,
9-
} from "kafkajs";
3+
import * as KafkaJS from "kafkajs";
104
import LZ4Codec from "kafkajs-lz4";
115
import type { ServiceName } from "../core/services.js";
126
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
@@ -25,10 +19,10 @@ import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
2519
* ```
2620
*/
2721
export class UsageV2Producer {
28-
private kafka: Kafka;
29-
private producer: Producer | null = null;
22+
private kafka: KafkaJS.Kafka;
23+
private producer: KafkaJS.Producer | null = null;
3024
private topic: string;
31-
private compression: CompressionTypes;
25+
private compression: KafkaJS.CompressionTypes;
3226

3327
constructor(config: {
3428
/**
@@ -46,7 +40,7 @@ export class UsageV2Producer {
4640
/**
4741
* The compression algorithm to use.
4842
*/
49-
compression?: CompressionTypes;
43+
compression?: KafkaJS.CompressionTypes;
5044

5145
username: string;
5246
password: string;
@@ -55,12 +49,12 @@ export class UsageV2Producer {
5549
producerName,
5650
environment,
5751
productName,
58-
compression = CompressionTypes.LZ4,
52+
compression = KafkaJS.CompressionTypes.LZ4,
5953
username,
6054
password,
6155
} = config;
6256

63-
this.kafka = new Kafka({
57+
this.kafka = new KafkaJS.Kafka({
6458
clientId: `${producerName}-${environment}`,
6559
brokers:
6660
environment === "production"
@@ -86,9 +80,10 @@ export class UsageV2Producer {
8680
* Connect the producer.
8781
* This must be called before calling `sendEvents()`.
8882
*/
89-
async init(configOverrides?: ProducerConfig) {
90-
if (this.compression === CompressionTypes.LZ4) {
91-
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
83+
async init(configOverrides?: KafkaJS.ProducerConfig) {
84+
if (this.compression === KafkaJS.CompressionTypes.LZ4) {
85+
KafkaJS.CompressionCodecs[KafkaJS.CompressionTypes.LZ4] =
86+
new LZ4Codec().codec;
9287
}
9388

9489
this.producer = this.kafka.producer({

0 commit comments

Comments
 (0)