Skip to content

Commit 92c7911

Browse files
author
Collin Van Dyck
committed
Fix message builder compression code.
Due to a bug in the builder code, each compressed message set was creating a compressed message set for each message. This fixes that behavior so that the outer message composes N inner messages. The offset behavior between the v0 and v1 message set types is also accounted for. See: https://kafka.apache.org/documentation/#messages
1 parent de5fea3 commit 92c7911

File tree

2 files changed

+37
-33
lines changed

2 files changed

+37
-33
lines changed

builder_test.go

Lines changed: 37 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (f v0MessageSetBuilder) messages() []Message {
8484
}
8585

8686
func (f v0MessageSetBuilder) bytes() []byte {
87-
return newWB().call(func(wb *kafkaWriteBuffer) {
87+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
8888
for _, msg := range f.msgs {
8989
bs := newWB().call(func(wb *kafkaWriteBuffer) {
9090
wb.writeInt64(msg.Offset) // offset
@@ -96,22 +96,23 @@ func (f v0MessageSetBuilder) bytes() []byte {
9696
wb.writeBytes(msg.Value)
9797
}))
9898
})
99-
if f.codec != nil {
100-
bs = newWB().call(func(wb *kafkaWriteBuffer) {
101-
wb.writeInt64(msg.Offset) // offset
102-
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
103-
compressed := mustCompress(bs, f.codec)
104-
wb.writeInt32(-1) // crc, unused
105-
wb.writeInt8(0) // magic
106-
wb.writeInt8(f.codec.Code()) // attributes
107-
wb.writeBytes(nil) // key is always nil for compressed
108-
wb.writeBytes(compressed) // the value is the compressed message
109-
}))
110-
})
111-
}
11299
wb.Write(bs)
113100
}
114101
})
102+
if f.codec != nil {
103+
bs = newWB().call(func(wb *kafkaWriteBuffer) {
104+
wb.writeInt64(f.msgs[0].Offset) // offset
105+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
106+
compressed := mustCompress(bs, f.codec)
107+
wb.writeInt32(-1) // crc, unused
108+
wb.writeInt8(0) // magic
109+
wb.writeInt8(f.codec.Code()) // attributes
110+
wb.writeBytes(nil) // key is always nil for compressed
111+
wb.writeBytes(compressed) // the value is the compressed message
112+
}))
113+
})
114+
}
115+
return bs
115116
}
116117

117118
type v1MessageSetBuilder struct {
@@ -124,10 +125,14 @@ func (f v1MessageSetBuilder) messages() []Message {
124125
}
125126

126127
func (f v1MessageSetBuilder) bytes() []byte {
127-
return newWB().call(func(wb *kafkaWriteBuffer) {
128-
for _, msg := range f.msgs {
128+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
129+
for i, msg := range f.msgs {
129130
bs := newWB().call(func(wb *kafkaWriteBuffer) {
130-
wb.writeInt64(msg.Offset) // offset
131+
if f.codec != nil {
132+
wb.writeInt64(int64(i)) // compressed inner message offsets are relative
133+
} else {
134+
wb.writeInt64(msg.Offset) // offset
135+
}
131136
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
132137
wb.writeInt32(-1) // crc, unused
133138
wb.writeInt8(1) // magic
@@ -137,23 +142,24 @@ func (f v1MessageSetBuilder) bytes() []byte {
137142
wb.writeBytes(msg.Value)
138143
}))
139144
})
140-
if f.codec != nil {
141-
bs = newWB().call(func(wb *kafkaWriteBuffer) {
142-
wb.writeInt64(msg.Offset) // offset
143-
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
144-
bs := mustCompress(bs, f.codec)
145-
wb.writeInt32(-1) // crc, unused
146-
wb.writeInt8(1) // magic
147-
wb.writeInt8(f.codec.Code()) // attributes
148-
wb.writeInt64(msg.Time.UnixMilli()) // timestamp
149-
wb.writeBytes(nil) // key is always nil for compressed
150-
wb.writeBytes(bs) // the value is the compressed message
151-
}))
152-
})
153-
}
154145
wb.Write(bs)
155146
}
156147
})
148+
if f.codec != nil {
149+
bs = newWB().call(func(wb *kafkaWriteBuffer) {
150+
wb.writeInt64(f.msgs[len(f.msgs)-1].Offset) // offset of the wrapper message is the last offset of the inner messages
151+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
152+
bs := mustCompress(bs, f.codec)
153+
wb.writeInt32(-1) // crc, unused
154+
wb.writeInt8(1) // magic
155+
wb.writeInt8(f.codec.Code()) // attributes
156+
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // timestamp
157+
wb.writeBytes(nil) // key is always nil for compressed
158+
wb.writeBytes(bs) // the value is the compressed message
159+
}))
160+
})
161+
}
162+
return bs
157163
}
158164

159165
type v2MessageSetBuilder struct {

message_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,7 @@ func TestV1BatchOffsets(t *testing.T) {
224224
return res
225225
}
226226
for _, expected := range tc.expected {
227-
t.Logf("Want [%d] %s:%s", expected.Offset, expected.Key, expected.Value)
228227
msg := filter(r.readMessage())
229-
t.Logf("Read [%d] %s:%s", msg.Offset, msg.Key, msg.Value)
230228
require.EqualValues(t, expected, msg)
231229
}
232230
// finally, verify no more bytes remain

0 commit comments

Comments
 (0)