Skip to content

Commit e15b2ed

Browse files
fix: Avoid out of bounds error. (#125)
Co-authored-by: Simone Sanfratello <[email protected]> Signed-off-by: Paolo Insogna <[email protected]>
1 parent 2ccc98b commit e15b2ed

File tree

11 files changed

+481
-87
lines changed

11 files changed

+481
-87
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@ external/
33
node_modules
44
coverage
55
.eslintcache
6-
.env
6+
.env
7+
.vscode

src/apis/consumer/fetch-v15.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ResponseError } from '../../errors.ts'
1+
import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts'
22
import { Reader } from '../../protocol/reader.ts'
33
import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts'
44
import { Writer } from '../../protocol/writer.ts'
@@ -174,19 +174,28 @@ export function parseResponse (
174174
preferredReadReplica: r.readInt32()
175175
}
176176

177-
let recordsSize = r.readUnsignedVarInt()
178-
179177
if (partition.errorCode !== 0) {
180178
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
181179
}
182180

183-
if (recordsSize > 1) {
184-
recordsSize--
181+
// We need to reduce the size by one to follow the COMPACT_RECORDS specification.
182+
const recordsSize = r.readUnsignedVarInt() - 1
185183

184+
if (recordsSize > 0) {
186185
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
187186
partition.records = []
188187
do {
189-
partition.records.push(readRecordsBatch(recordsBatchesReader))
188+
try {
189+
partition.records.push(readRecordsBatch(recordsBatchesReader))
190+
} catch (err) {
191+
// Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument.
192+
// In that case we just ignore the error.
193+
if ((err as GenericError).code === OutOfBoundsError.code) {
194+
break
195+
}
196+
/* c8 ignore next 3 - Hard to test */
197+
throw err
198+
}
190199
} while (recordsBatchesReader.position < recordsSize)
191200

192201
r.skip(recordsSize)

src/apis/consumer/fetch-v16.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ResponseError } from '../../errors.ts'
1+
import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts'
22
import { Reader } from '../../protocol/reader.ts'
33
import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts'
44
import { Writer } from '../../protocol/writer.ts'
@@ -174,19 +174,28 @@ export function parseResponse (
174174
preferredReadReplica: r.readInt32()
175175
}
176176

177-
let recordsSize = r.readUnsignedVarInt()
178-
179177
if (partition.errorCode !== 0) {
180178
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
181179
}
182180

183-
if (recordsSize > 1) {
184-
recordsSize--
181+
// We need to reduce the size by one to follow the COMPACT_RECORDS specification.
182+
const recordsSize = r.readUnsignedVarInt() - 1
185183

184+
if (recordsSize > 0) {
186185
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
187186
partition.records = []
188187
do {
189-
partition.records.push(readRecordsBatch(recordsBatchesReader))
188+
try {
189+
partition.records.push(readRecordsBatch(recordsBatchesReader))
190+
} catch (err) {
191+
// Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument.
192+
// In that case we just ignore the error.
193+
if ((err as GenericError).code === OutOfBoundsError.code) {
194+
break
195+
}
196+
/* c8 ignore next 3 - Hard to test */
197+
throw err
198+
}
190199
} while (recordsBatchesReader.position < recordsSize)
191200

192201
r.skip(recordsSize)

src/apis/consumer/fetch-v17.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ResponseError } from '../../errors.ts'
1+
import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts'
22
import { Reader } from '../../protocol/reader.ts'
33
import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts'
44
import { Writer } from '../../protocol/writer.ts'
@@ -178,14 +178,24 @@ export function parseResponse (
178178
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
179179
}
180180

181-
// We need to reduce the size by one to follow the COMPACT_RECORDS specification
181+
// We need to reduce the size by one to follow the COMPACT_RECORDS specification.
182182
const recordsSize = r.readUnsignedVarInt() - 1
183183

184184
if (recordsSize > 0) {
185185
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
186186
partition.records = []
187187
do {
188-
partition.records.push(readRecordsBatch(recordsBatchesReader))
188+
try {
189+
partition.records.push(readRecordsBatch(recordsBatchesReader))
190+
} catch (err) {
191+
// Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument.
192+
// In that case we just ignore the error.
193+
if ((err as GenericError).code === OutOfBoundsError.code) {
194+
break
195+
}
196+
/* c8 ignore next 3 - Hard to test */
197+
throw err
198+
}
189199
} while (recordsBatchesReader.position < recordsSize)
190200

191201
r.skip(recordsSize)

src/errors.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export const errorCodes = [
1111
'PLT_KFK_AUTHENTICATION',
1212
'PLT_KFK_MULTIPLE',
1313
'PLT_KFK_NETWORK',
14+
'PLT_KFK_OUT_OF_BOUNDS',
1415
'PLT_KFK_PROTOCOL',
1516
'PLT_KFK_RESPONSE',
1617
'PLT_KFK_TIMEOUT',
@@ -163,6 +164,14 @@ export class ProtocolError extends GenericError {
163164
}
164165
}
165166

167+
export class OutOfBoundsError extends GenericError {
168+
static code: ErrorCode = 'PLT_KFK_OUT_OF_BOUNDS'
169+
170+
constructor (message: string, properties: ErrorProperties = {}) {
171+
super(OutOfBoundsError.code, message, { isOut: true, ...properties })
172+
}
173+
}
174+
166175
export class ResponseError extends MultipleErrors {
167176
static code: ErrorCode = 'PLT_KFK_RESPONSE'
168177

src/protocol/dynamic-buffer.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { UserError } from '../errors.ts'
1+
import { OutOfBoundsError } from '../errors.ts'
22
import { EMPTY_BUFFER, INT16_SIZE, INT32_SIZE, INT64_SIZE, INT8_SIZE } from './definitions.ts'
33
import {
44
BITS_8PLUS_MASK,
@@ -91,7 +91,7 @@ export class DynamicBuffer {
9191
}
9292

9393
if (start < 0 || start > this.length || end > this.length) {
94-
throw new UserError('Out of bounds.')
94+
throw new OutOfBoundsError('Out of bounds.')
9595
}
9696

9797
if (this.buffers.length === 0) {
@@ -128,13 +128,13 @@ export class DynamicBuffer {
128128
}
129129

130130
if (start < 0 || start > this.length || end > this.length) {
131-
throw new UserError('Out of bounds.')
131+
throw new OutOfBoundsError('Out of bounds.')
132132
}
133133

134134
if (this.buffers.length === 0) {
135135
return EMPTY_BUFFER
136136
} else if (this.buffers.length === 1) {
137-
return this.buffers[0].slice(start, end)
137+
return this.buffers[0].subarray(start, end)
138138
}
139139

140140
let position = 0
@@ -176,7 +176,7 @@ export class DynamicBuffer {
176176

177177
consume (offset: number): this {
178178
if (offset < 0 || offset > this.length) {
179-
throw new UserError('Out of bounds.')
179+
throw new OutOfBoundsError('Out of bounds.')
180180
}
181181

182182
if (offset === 0) {
@@ -210,7 +210,7 @@ export class DynamicBuffer {
210210

211211
get (offset: number): number {
212212
if (offset < 0 || offset >= this.length) {
213-
throw new UserError('Out of bounds.')
213+
throw new OutOfBoundsError('Out of bounds.')
214214
}
215215

216216
const [finalIndex, current] = this.#findInitialBuffer(offset)
@@ -219,7 +219,7 @@ export class DynamicBuffer {
219219

220220
readUInt8 (offset: number = 0): number {
221221
if (offset < 0 || offset >= this.length) {
222-
throw new UserError('Out of bounds.')
222+
throw new OutOfBoundsError('Out of bounds.')
223223
}
224224

225225
const [finalIndex, current] = this.#findInitialBuffer(offset)
@@ -265,7 +265,7 @@ export class DynamicBuffer {
265265
let read = 0
266266

267267
if (offset < 0 || offset >= this.length) {
268-
throw new UserError('Out of bounds.')
268+
throw new OutOfBoundsError('Out of bounds.')
269269
}
270270

271271
// Find the initial buffer
@@ -294,7 +294,7 @@ export class DynamicBuffer {
294294
let read = 0
295295

296296
if (offset < 0 || offset >= this.length) {
297-
throw new UserError('Out of bounds.')
297+
throw new OutOfBoundsError('Out of bounds.')
298298
}
299299

300300
// Find the initial buffer
@@ -318,7 +318,7 @@ export class DynamicBuffer {
318318

319319
readInt8 (offset: number = 0): number {
320320
if (offset < 0 || offset >= this.length) {
321-
throw new UserError('Out of bounds.')
321+
throw new OutOfBoundsError('Out of bounds.')
322322
}
323323

324324
const [finalIndex, current] = this.#findInitialBuffer(offset)
@@ -681,7 +681,7 @@ export class DynamicBuffer {
681681

682682
#readMultiple (index: number, length: number) {
683683
if (index < 0 || index + length > this.length) {
684-
throw new UserError('Out of bounds.')
684+
throw new OutOfBoundsError('Out of bounds.')
685685
}
686686

687687
let [startOffset, current] = this.#findInitialBuffer(index)

test/apis/consumer/fetch-v15.test.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -825,3 +825,106 @@ test('parseResponse parses record data', () => {
825825
// Verify value is a Buffer
826826
ok(Buffer.isBuffer(record.value))
827827
})
828+
829+
test('parseResponse handles truncated records', () => {
830+
// Create a response with records data
831+
// First create a record batch
832+
const timestamp = BigInt(Date.now())
833+
const recordsBatch = Writer.create()
834+
// Record batch structure
835+
.appendInt64(0n) // firstOffset
836+
.appendInt32(60) // length - this would be dynamically computed in real usage
837+
.appendInt32(0) // partitionLeaderEpoch
838+
.appendInt8(2) // magic (record format version)
839+
.appendUnsignedInt32(0) // crc - would be computed properly in real code
840+
.appendInt16(0) // attributes
841+
.appendInt32(0) // lastOffsetDelta
842+
.appendInt64(timestamp) // firstTimestamp
843+
.appendInt64(timestamp) // maxTimestamp
844+
.appendInt64(-1n) // producerId - not specified
845+
.appendInt16(0) // producerEpoch
846+
.appendInt32(0) // firstSequence
847+
.appendInt32(1) // number of records
848+
// Single record
849+
.appendVarInt(8) // length of the record
850+
.appendInt8(0) // attributes
851+
.appendVarInt64(0n) // timestampDelta
852+
.appendVarInt(0) // offsetDelta
853+
.appendVarIntBytes(null) // key
854+
.appendVarIntBytes(Buffer.from('test-value')) // value
855+
.appendVarIntArray([], () => {}) // No headers
856+
// Truncated batch
857+
.appendInt64(0n) // firstOffset
858+
.appendInt32(60) // length
859+
860+
// Now create the full response
861+
const writer = Writer.create()
862+
.appendInt32(0) // throttleTimeMs
863+
.appendInt16(0) // errorCode (success)
864+
.appendInt32(123) // sessionId
865+
// Responses array - using tagged fields format
866+
.appendArray(
867+
[
868+
{
869+
topicId: '12345678-1234-1234-1234-123456789abc',
870+
partitions: [
871+
{
872+
partitionIndex: 0,
873+
errorCode: 0,
874+
highWatermark: 100n,
875+
lastStableOffset: 100n,
876+
logStartOffset: 0n,
877+
abortedTransactions: [],
878+
preferredReadReplica: -1,
879+
recordsBatch
880+
}
881+
]
882+
}
883+
],
884+
(w, topic) => {
885+
w.appendUUID(topic.topicId)
886+
// Partitions array
887+
.appendArray(topic.partitions, (w, partition) => {
888+
w.appendInt32(partition.partitionIndex)
889+
.appendInt16(partition.errorCode)
890+
.appendInt64(partition.highWatermark)
891+
.appendInt64(partition.lastStableOffset)
892+
.appendInt64(partition.logStartOffset)
893+
// Aborted transactions array (empty)
894+
.appendArray(partition.abortedTransactions, () => {})
895+
.appendInt32(partition.preferredReadReplica)
896+
897+
// Add records batch
898+
.appendUnsignedVarInt(partition.recordsBatch.length + 1)
899+
.appendFrom(partition.recordsBatch)
900+
})
901+
}
902+
)
903+
.appendInt8(0) // Root tagged fields
904+
905+
const response = parseResponse(1, 1, 17, Reader.from(writer))
906+
907+
// Verify the records were parsed correctly
908+
ok(response.responses[0].partitions[0].records, 'Records should be defined')
909+
910+
const batch = response.responses[0].partitions[0].records[0]!
911+
const record = batch.records[0]
912+
913+
deepStrictEqual(
914+
{
915+
firstOffset: batch.firstOffset,
916+
recordsLength: batch.records.length,
917+
offsetDelta: record.offsetDelta,
918+
valueString: record.value.toString()
919+
},
920+
{
921+
firstOffset: 0n,
922+
recordsLength: 1,
923+
offsetDelta: 0,
924+
valueString: 'test-value'
925+
}
926+
)
927+
928+
// Verify value is a Buffer
929+
ok(Buffer.isBuffer(record.value))
930+
})

0 commit comments

Comments
 (0)