Skip to content

Commit ffe95fa

Browse files
committed
fix Kafka record size calculation
1 parent 925aae2 commit ffe95fa

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

kafka/record.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ func (r *Record) Size(baseOffSet int64, base time.Time) int {
8181
if r.Value != nil {
8282
valueLength = r.Value.Size()
8383
}
84+
8485
size := 1 + // attribute
8586
sizeVarInt(deltaTimestamp) +
8687
sizeVarInt(deltaOffset) +
@@ -229,8 +230,8 @@ func sizeVarInt(x int64) int {
229230
ux = ^ux
230231
}
231232
i := 0
232-
for x >= 0x80 {
233-
x >>= 7
233+
for ux >= 0x80 {
234+
ux >>= 7
234235
i++
235236
}
236237
return i + 1

kafka/record_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bufio"
55
"bytes"
66
"github.com/stretchr/testify/require"
7+
"strings"
78
"testing"
89
"time"
910
)
@@ -346,3 +347,19 @@ func TestRecordBatch_WriteTo_Bytes_Compare(t *testing.T) {
346347

347348
require.Equal(t, expected, b)
348349
}
350+
351+
func TestSize_LongValue(t *testing.T) {
352+
// 80 because it must be lower than 128 but the value * 2 must be >= 128 (x<<1 is equal to *2)
353+
val := []byte(strings.Repeat("b", 80))
354+
records := RecordBatch{Records: []*Record{
355+
{
356+
Offset: 0,
357+
Time: ToTime(1657010762684),
358+
Key: NewBytes([]byte("foo")),
359+
Value: NewBytes(val),
360+
Headers: nil,
361+
},
362+
}}
363+
364+
require.Equal(t, 150, records.Size())
365+
}

0 commit comments

Comments
 (0)