Skip to content

Commit 0fb586d

Browse files
authored
[service-utils] fix: use lz4js for usageV2 (#6130)
1 parent f8ebbe5 commit 0fb586d

File tree

3 files changed

+97
-240
lines changed

3 files changed

+97
-240
lines changed

packages/service-utils/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@
4747
"dependencies": {
4848
"aws4fetch": "1.0.20",
4949
"kafkajs": "2.2.4",
50-
"kafkajs-lz4": "2.0.0-beta.0",
50+
"lz4js": "^0.2.0",
5151
"zod": "3.24.1"
5252
},
5353
"devDependencies": {
5454
"@cloudflare/workers-types": "4.20250129.0",
55+
"@types/lz4js": "^0.2.1",
5556
"@types/node": "22.10.10",
5657
"typescript": "5.7.3",
5758
"vitest": "3.0.4"

packages/service-utils/src/node/usageV2.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
type Producer,
77
type ProducerConfig,
88
} from "kafkajs";
9-
import LZ4Codec from "kafkajs-lz4";
9+
import { compress, decompress } from "lz4js";
1010
import type { ServiceName } from "../core/services.js";
1111
import { type UsageV2Event, getTopicName } from "../core/usageV2.js";
1212

@@ -91,7 +91,20 @@ export class UsageV2Producer {
9191
*/
9292
async init(configOverrides?: ProducerConfig) {
9393
if (this.compression === CompressionTypes.LZ4) {
94-
CompressionCodecs[CompressionTypes.LZ4] = new LZ4Codec().codec;
94+
CompressionCodecs[CompressionTypes.LZ4] = () => ({
95+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
96+
compress: (encoder: { buffer: Buffer }) => {
97+
const compressed = compress(encoder.buffer);
98+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
99+
return Buffer.from(compressed);
100+
},
101+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
102+
decompress: (buffer: Buffer) => {
103+
const decompressed = decompress(buffer);
104+
// biome-ignore lint/style/noRestrictedGlobals: kafkajs expects a Buffer
105+
return Buffer.from(decompressed);
106+
},
107+
});
95108
}
96109

97110
this.producer = this.kafka.producer({

0 commit comments

Comments
 (0)