Skip to content

Commit 0939a8f

Browse files
authored
fix: Fixed decompression with multiple batches. (#123)
Signed-off-by: Paolo Insogna <[email protected]>
1 parent 7b1bcc2 commit 0939a8f

File tree

3 files changed

+38
-19
lines changed

3 files changed

+38
-19
lines changed

src/apis/consumer/fetch-v17.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,14 @@ export function parseResponse (
174174
preferredReadReplica: r.readInt32()
175175
}
176176

177-
let recordsSize = r.readUnsignedVarInt()
178-
179177
if (partition.errorCode !== 0) {
180178
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
181179
}
182180

183-
if (recordsSize > 1) {
184-
recordsSize--
181+
// We need to reduce the size by one to follow the COMPACT_RECORDS specification
182+
const recordsSize = r.readUnsignedVarInt() - 1
185183

184+
if (recordsSize > 0) {
186185
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
187186
partition.records = []
188187
do {

src/protocol/records.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
compressionsAlgorithmsByBitmask
88
} from './compression.ts'
99
import { crc32c } from './crc32c.ts'
10-
import { type NullableString } from './definitions.ts'
10+
import { INT32_SIZE, INT64_SIZE, type NullableString } from './definitions.ts'
1111
import { DynamicBuffer } from './dynamic-buffer.ts'
1212
import { Reader } from './reader.ts'
1313
import { Writer } from './writer.ts'
@@ -253,6 +253,7 @@ export function createRecordsBatch (
253253
}
254254

255255
export function readRecordsBatch (reader: Reader): RecordsBatch {
256+
const initialPosition = reader.position
256257
const batch = {
257258
firstOffset: reader.readInt64(),
258259
length: reader.readInt32(),
@@ -279,10 +280,13 @@ export function readRecordsBatch (reader: Reader): RecordsBatch {
279280
throw new UnsupportedCompressionError(`Unsupported compression algorithm with bitmask ${compression}`)
280281
}
281282

282-
const buffer = algorithm.decompressSync(reader.buffer.slice(reader.position, reader.buffer.length))
283+
// The length of all headers immediately following Length up to the length of the Records array
284+
const headersLength = reader.position - initialPosition - INT32_SIZE - INT64_SIZE
285+
const compressedDataLen = batch.length - headersLength
286+
const buffer = algorithm.decompressSync(reader.buffer.slice(reader.position, reader.position + compressedDataLen))
283287

284-
// Move the original reader to the end
285-
reader.skip(reader.buffer.length - reader.position)
288+
// Move the original reader to the end of the compressed data
289+
reader.skip(compressedDataLen)
286290

287291
// Replace the reader with the decompressed buffer
288292
reader = Reader.from(buffer)

test/clients/consumer/consumer.test.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,16 +1262,29 @@ test('fetch should retrieve messages from multiple batches', async t => {
12621262
}
12631263
})
12641264

1265-
for (const compression of Object.values(CompressionAlgorithms)) {
1266-
test(`fetch should retrieve messages from multiple batches (compressed ${compression})`,
1267-
{ skip: compression === 'snappy' || (compression === 'zstd' && !('zstdCompressSync' in zlib)) }, async t => {
1265+
for (const [name, compression] of Object.entries(CompressionAlgorithms)) {
1266+
test(
1267+
`fetch should retrieve messages from multiple batches (compressed with CompressionAlgorithms.${name})`,
1268+
{ skip: compression === 'zstd' && !('zstdCompressSync' in zlib) },
1269+
async t => {
12681270
const topic = await createTopic(t, true)
12691271
const producer = await createProducer(t)
12701272

1271-
const msg: MessageToProduce = { key: Buffer.from('test'), value: Buffer.from('test'), topic }
1272-
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression, messages: [msg] })
1273-
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression, messages: [msg] })
1274-
await producer.send({ acks: ProduceAcks.NO_RESPONSE, compression, messages: [msg] })
1273+
await producer.send({
1274+
acks: ProduceAcks.NO_RESPONSE,
1275+
compression,
1276+
messages: [{ key: Buffer.from('test1'), value: Buffer.from('test1'), topic }]
1277+
})
1278+
await producer.send({
1279+
acks: ProduceAcks.NO_RESPONSE,
1280+
compression,
1281+
messages: [{ key: Buffer.from('test2'), value: Buffer.from('test2'), topic }]
1282+
})
1283+
await producer.send({
1284+
acks: ProduceAcks.NO_RESPONSE,
1285+
compression,
1286+
messages: [{ key: Buffer.from('test3'), value: Buffer.from('test3'), topic }]
1287+
})
12751288

12761289
const consumer = createConsumer(t, {})
12771290

@@ -1287,15 +1300,18 @@ for (const compression of Object.values(CompressionAlgorithms)) {
12871300

12881301
let i = 0
12891302
for await (const message of stream) {
1303+
i++
1304+
12901305
strictEqual(message.topic, topic)
1291-
strictEqual(message.key.toString(), 'test')
1292-
strictEqual(message.value.toString(), 'test')
1306+
strictEqual(message.key.toString(), 'test' + i)
1307+
strictEqual(message.value.toString(), 'test' + i)
12931308

1294-
if (++i === 3) {
1309+
if (i === 3) {
12951310
break
12961311
}
12971312
}
1298-
})
1313+
}
1314+
)
12991315
}
13001316

13011317
test('commit should commit offsets to Kafka and support diagnostic channels', async t => {

0 commit comments

Comments
 (0)