Skip to content

Commit 029de2e

Browse files
fix: lz4 compression (#122)
* fix: lz4 compression * fix: filter test node v20 zstd
1 parent 3d6ac55 commit 029de2e

File tree

3 files changed

+42
-36
lines changed

3 files changed

+42
-36
lines changed

docs/producer.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@ Options:
2020
| `producerEpoch` | `number` | Producer epoch. |
2121
| `idempotent` | `boolean` | Idempotency of the producer. |
2222
| `acks` | `number` | Acknowledgement to wait before returning.<br/><br/>Valid values are defined in the `ProduceAcks` enumeration. |
23-
| `compression` | `string` | Compression algorithm to use before sending messages to the broker.<br/><br/>Valid values are exported in the `CompressionAlgorithms` enumeration. |
23+
| `compression` | `string` | Compression algorithm to use before sending messages to the broker.<br/><br/>Valid values are: `snappy`, `lz4`, `gzip`, `zstd` |
2424
| `partitioner` | `(message: MessageToProduce<Key, Value, HeaderKey, HeaderValue>) => number` | Partitioner to use to assign a partition to messages that lack it.<br/><br/>It is a function that receives a message and should return the partition number. |
2525
| `repeatOnStaleMetadata` | `boolean` | Whether to retry a produce operation when the system detects outdated topic or broker information.<br/><br/>Default is `true`. |
2626
| `serializers` | `Serializers<Key, Value, HeaderKey, HeaderValue>` | Object that specifies which serialisers to use.<br/><br/>The object should only contain one or more of the `key`, `value`, `headerKey` and `headerValue` properties. |
2727

2828
It also supports all the constructor options of `Base`.
2929

30+
Notes: `zstd` is not available in node `v20`
31+
3032
## Basic Methods
3133

3234
### `send<Key, Value, HeaderKey, HeaderValue>(options[, callback])`

src/protocol/compression.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ function ensureBuffer (data: Buffer | DynamicBuffer): Buffer {
3838

3939
let snappyCompressSync: CompressionOperation | undefined
4040
let snappyDecompressSync: CompressionOperation | undefined
41-
let lz4CompressSync: CompressionOperation | undefined
42-
let lz4DecompressSync: CompressionOperation | undefined
41+
let lz4CompressFrameSync: CompressionOperation | undefined
42+
let lz4DecompressFrameSync: CompressionOperation | undefined
4343

4444
function loadSnappy () {
4545
try {
@@ -57,8 +57,8 @@ function loadSnappy () {
5757
function loadLZ4 () {
5858
try {
5959
const lz4 = require('lz4-napi')
60-
lz4CompressSync = lz4.compressSync
61-
lz4DecompressSync = lz4.uncompressSync
60+
lz4CompressFrameSync = lz4.compressFrameSync
61+
lz4DecompressFrameSync = lz4.decompressFrameSync
6262
/* c8 ignore next 5 - In tests lz4-napi is always available */
6363
} catch (e) {
6464
throw new UnsupportedCompressionError(
@@ -112,19 +112,19 @@ export const compressionsAlgorithms = {
112112
lz4: {
113113
compressSync (data: Buffer | DynamicBuffer): Buffer {
114114
/* c8 ignore next 4 - In tests lz4-napi is always available */
115-
if (!lz4CompressSync) {
115+
if (!lz4CompressFrameSync) {
116116
loadLZ4()
117117
}
118118

119-
return lz4CompressSync!(ensureBuffer(data))
119+
return lz4CompressFrameSync!(ensureBuffer(data))
120120
},
121121
decompressSync (data: Buffer | DynamicBuffer): Buffer {
122122
/* c8 ignore next 4 - In tests lz4-napi is always available */
123-
if (!lz4DecompressSync) {
123+
if (!lz4DecompressFrameSync) {
124124
loadLZ4()
125125
}
126126

127-
return lz4DecompressSync!(ensureBuffer(data))
127+
return lz4DecompressFrameSync!(ensureBuffer(data))
128128
},
129129
bitmask: 3,
130130
available: true

test/clients/consumer/consumer.test.ts

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { deepStrictEqual, ok, strictEqual } from 'node:assert'
22
import { randomUUID } from 'node:crypto'
33
import { once } from 'node:events'
44
import { test, type TestContext } from 'node:test'
5+
import zlib from 'node:zlib'
56
import * as Prometheus from 'prom-client'
67
import { type FetchResponse } from '../../../src/apis/consumer/fetch-v17.ts'
78
import { kConnections, kFetchConnections, kOptions } from '../../../src/clients/base/base.ts'
@@ -1261,38 +1262,41 @@ test('fetch should retrieve messages from multiple batches', async t => {
12611262
}
12621263
})
12631264

1264-
test.only('fetch should retrieve messages from multiple batches (compressed)', async t => {
1265-
const topic = await createTopic(t, true)
1266-
const producer = await createProducer(t)
1265+
for (const compression of Object.values(CompressionAlgorithms)) {
1266+
test(`fetch should retrieve messages from multiple batches (compressed ${compression})`,
1267+
{ skip: compression === 'snappy' || (compression === 'zstd' && !('zstdCompressSync' in zlib)) }, async t => {
1268+
const topic = await createTopic(t, true)
1269+
const producer = await createProducer(t)
12671270

1268-
const msg: MessageToProduce = { key: Buffer.from('test'), value: Buffer.from('test'), topic }
1269-
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression: CompressionAlgorithms.GZIP, messages: [msg] })
1270-
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression: CompressionAlgorithms.GZIP, messages: [msg] })
1271-
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression: CompressionAlgorithms.GZIP, messages: [msg] })
1271+
const msg: MessageToProduce = { key: Buffer.from('test'), value: Buffer.from('test'), topic }
1272+
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression, messages: [msg] })
1273+
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression, messages: [msg] })
1274+
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression, messages: [msg] })
12721275

1273-
const consumer = createConsumer(t, {})
1276+
const consumer = createConsumer(t, {})
12741277

1275-
const stream = await consumer.consume({
1276-
autocommit: true,
1277-
topics: [topic],
1278-
mode: MessagesStreamModes.EARLIEST,
1279-
fallbackMode: MessagesStreamFallbackModes.EARLIEST,
1280-
minBytes: 1024 * 1024,
1281-
maxBytes: 1024 * 1024,
1282-
maxWaitTime: 100
1283-
})
1278+
const stream = await consumer.consume({
1279+
autocommit: true,
1280+
topics: [topic],
1281+
mode: MessagesStreamModes.EARLIEST,
1282+
fallbackMode: MessagesStreamFallbackModes.EARLIEST,
1283+
minBytes: 1024 * 1024,
1284+
maxBytes: 1024 * 1024,
1285+
maxWaitTime: 100
1286+
})
12841287

1285-
let i = 0
1286-
for await (const message of stream) {
1287-
strictEqual(message.topic, topic)
1288-
strictEqual(message.key.toString(), 'test')
1289-
strictEqual(message.value.toString(), 'test')
1288+
let i = 0
1289+
for await (const message of stream) {
1290+
strictEqual(message.topic, topic)
1291+
strictEqual(message.key.toString(), 'test')
1292+
strictEqual(message.value.toString(), 'test')
12901293

1291-
if (++i === 3) {
1292-
break
1293-
}
1294-
}
1295-
})
1294+
if (++i === 3) {
1295+
break
1296+
}
1297+
}
1298+
})
1299+
}
12961300

12971301
test('commit should commit offsets to Kafka and support diagnostic channels', async t => {
12981302
const consumer = createConsumer(t)

0 commit comments

Comments
 (0)