Skip to content

Commit 041b553

Browse files
committed
Use low level lz4js library
1 parent 20e71be commit 041b553

File tree

3 files changed

+42
-87
lines changed

3 files changed

+42
-87
lines changed

packages/service-utils/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@
4545
],
4646
"sideEffects": false,
4747
"dependencies": {
48+
"@types/lz4js": "^0.2.1",
4849
"aws4fetch": "1.0.20",
49-
"kafka-lz4-lite": "^1.0.5",
5050
"kafkajs": "2.2.4",
51+
"lz4js": "^0.2.0",
5152
"zod": "3.24.1"
5253
},
5354
"devDependencies": {

packages/service-utils/src/node/usageV2.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { randomUUID } from "node:crypto";
22
import { checkServerIdentity } from "node:tls";
3-
import { codec as lz4Codec } from "kafka-lz4-lite";
43
import {
54
CompressionTypes,
65
Kafka,
76
type Producer,
87
type ProducerConfig,
98
} from "kafkajs";
9+
import { compress, decompress } from "lz4js";
1010
import type { ServiceName } from "../core/services.js";
1111
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
1212

@@ -91,7 +91,20 @@ export class UsageV2Producer {
9191
*/
9292
async init(configOverrides?: ProducerConfig) {
9393
if (this.compression === CompressionTypes.LZ4) {
94-
CompressionCodecs[CompressionTypes.LZ4] = lz4Codec;
94+
CompressionCodecs[CompressionTypes.LZ4] = () => ({
95+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
96+
compress: (encoder: { buffer: Buffer }) => {
97+
const compressed = compress(encoder.buffer);
98+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
99+
return Buffer.from(compressed);
100+
},
101+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
102+
decompress: (buffer: Buffer) => {
103+
const decompressed = decompress(buffer);
104+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
105+
return Buffer.from(decompressed);
106+
},
107+
});
95108
}
96109

97110
this.producer = this.kafka.producer({

pnpm-lock.yaml

Lines changed: 25 additions & 84 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)