Skip to content

Commit 5d063eb

Browse files
author
Collin Van Dyck
authored
Merge pull request #753 from segmentio/mixed-mode-reader
Allow different batch versions in fetch response.
2 parents d8dd9b0 + ab97659 commit 5d063eb

27 files changed

+1498
-455
lines changed

.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
fixtures/*.hex binary

builder_test.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package kafka
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
"github.com/segmentio/kafka-go/compress"
10+
)
11+
12+
// This file defines builders to assist in creating kafka payloads for unit testing.
13+
14+
// fetchResponseBuilder builds v10 fetch responses. The version of the v10 fetch
15+
// responses are not as important as the message sets contained within, as this
16+
// type is ultimately used to unit test the message set reader that consumes the
17+
// rest of the response once the header has been parsed.
18+
type fetchResponseBuilder struct {
19+
header fetchResponseHeader
20+
msgSets []messageSetBuilder
21+
rendered []byte
22+
}
23+
24+
type fetchResponseHeader struct {
25+
throttle int32
26+
errorCode int16
27+
sessionID int32
28+
topic string
29+
partition int32
30+
partitionErrorCode int16
31+
highWatermarkOffset int64
32+
lastStableOffset int64
33+
logStartOffset int64
34+
}
35+
36+
func (b *fetchResponseBuilder) messages() (res []Message) {
37+
for _, set := range b.msgSets {
38+
res = append(res, set.messages()...)
39+
}
40+
return
41+
}
42+
43+
func (b *fetchResponseBuilder) bytes() []byte {
44+
if b.rendered == nil {
45+
b.rendered = newWB().call(func(wb *kafkaWriteBuffer) {
46+
wb.writeInt32(b.header.throttle)
47+
wb.writeInt16(b.header.errorCode)
48+
wb.writeInt32(b.header.sessionID)
49+
wb.writeInt32(1) // num topics
50+
wb.writeString(b.header.topic)
51+
wb.writeInt32(1) // how many partitions
52+
wb.writeInt32(b.header.partition)
53+
wb.writeInt16(b.header.partitionErrorCode)
54+
wb.writeInt64(b.header.highWatermarkOffset)
55+
wb.writeInt64(b.header.lastStableOffset)
56+
wb.writeInt64(b.header.logStartOffset)
57+
wb.writeInt32(-1) // num aborted tx
58+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
59+
for _, msgSet := range b.msgSets {
60+
wb.Write(msgSet.bytes())
61+
}
62+
}))
63+
})
64+
}
65+
return b.rendered
66+
}
67+
68+
func (b *fetchResponseBuilder) Len() int {
69+
return len(b.bytes())
70+
}
71+
72+
type messageSetBuilder interface {
73+
bytes() []byte
74+
messages() []Message
75+
}
76+
77+
type v0MessageSetBuilder struct {
78+
msgs []Message
79+
codec CompressionCodec
80+
}
81+
82+
func (f v0MessageSetBuilder) messages() []Message {
83+
return f.msgs
84+
}
85+
86+
func (f v0MessageSetBuilder) bytes() []byte {
87+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
88+
for _, msg := range f.msgs {
89+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
90+
wb.writeInt64(msg.Offset) // offset
91+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
92+
wb.writeInt32(-1) // crc, unused
93+
wb.writeInt8(0) // magic
94+
wb.writeInt8(0) // attributes -- zero, no compression for the inner message
95+
wb.writeBytes(msg.Key)
96+
wb.writeBytes(msg.Value)
97+
}))
98+
})
99+
wb.Write(bs)
100+
}
101+
})
102+
if f.codec != nil {
103+
bs = newWB().call(func(wb *kafkaWriteBuffer) {
104+
wb.writeInt64(f.msgs[0].Offset) // offset
105+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
106+
compressed := mustCompress(bs, f.codec)
107+
wb.writeInt32(-1) // crc, unused
108+
wb.writeInt8(0) // magic
109+
wb.writeInt8(f.codec.Code()) // attributes
110+
wb.writeBytes(nil) // key is always nil for compressed
111+
wb.writeBytes(compressed) // the value is the compressed message
112+
}))
113+
})
114+
}
115+
return bs
116+
}
117+
118+
type v1MessageSetBuilder struct {
119+
msgs []Message
120+
codec CompressionCodec
121+
}
122+
123+
func (f v1MessageSetBuilder) messages() []Message {
124+
return f.msgs
125+
}
126+
127+
func (f v1MessageSetBuilder) bytes() []byte {
128+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
129+
for i, msg := range f.msgs {
130+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
131+
if f.codec != nil {
132+
wb.writeInt64(int64(i)) // compressed inner message offsets are relative
133+
} else {
134+
wb.writeInt64(msg.Offset) // offset
135+
}
136+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
137+
wb.writeInt32(-1) // crc, unused
138+
wb.writeInt8(1) // magic
139+
wb.writeInt8(0) // attributes -- zero, no compression for the inner message
140+
wb.writeInt64(msg.Time.UnixMilli()) // timestamp
141+
wb.writeBytes(msg.Key)
142+
wb.writeBytes(msg.Value)
143+
}))
144+
})
145+
wb.Write(bs)
146+
}
147+
})
148+
if f.codec != nil {
149+
bs = newWB().call(func(wb *kafkaWriteBuffer) {
150+
wb.writeInt64(f.msgs[len(f.msgs)-1].Offset) // offset of the wrapper message is the last offset of the inner messages
151+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
152+
bs := mustCompress(bs, f.codec)
153+
wb.writeInt32(-1) // crc, unused
154+
wb.writeInt8(1) // magic
155+
wb.writeInt8(f.codec.Code()) // attributes
156+
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // timestamp
157+
wb.writeBytes(nil) // key is always nil for compressed
158+
wb.writeBytes(bs) // the value is the compressed message
159+
}))
160+
})
161+
}
162+
return bs
163+
}
164+
165+
type v2MessageSetBuilder struct {
166+
msgs []Message
167+
codec CompressionCodec
168+
}
169+
170+
func (f v2MessageSetBuilder) messages() []Message {
171+
return f.msgs
172+
}
173+
174+
func (f v2MessageSetBuilder) bytes() []byte {
175+
attributes := int16(0)
176+
if f.codec != nil {
177+
attributes = int16(f.codec.Code()) // set codec code on attributes
178+
}
179+
return newWB().call(func(wb *kafkaWriteBuffer) {
180+
wb.writeInt64(f.msgs[0].Offset)
181+
wb.writeBytes(newWB().call(func(wb *kafkaWriteBuffer) {
182+
wb.writeInt32(0) // leader epoch
183+
wb.writeInt8(2) // magic = 2
184+
wb.writeInt32(0) // crc, unused
185+
wb.writeInt16(attributes) // record set attributes
186+
wb.writeInt32(0) // record set last offset delta
187+
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // record set first timestamp
188+
wb.writeInt64(f.msgs[0].Time.UnixMilli()) // record set last timestamp
189+
wb.writeInt64(0) // record set producer id
190+
wb.writeInt16(0) // record set producer epoch
191+
wb.writeInt32(0) // record set base sequence
192+
wb.writeInt32(int32(len(f.msgs))) // record set count
193+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
194+
for i, msg := range f.msgs {
195+
wb.Write(newWB().call(func(wb *kafkaWriteBuffer) {
196+
bs := newWB().call(func(wb *kafkaWriteBuffer) {
197+
wb.writeInt8(0) // record attributes, not used here
198+
wb.writeVarInt(time.Now().UnixMilli() - msg.Time.UnixMilli()) // timestamp
199+
wb.writeVarInt(int64(i)) // offset delta
200+
wb.writeVarInt(int64(len(msg.Key))) // key len
201+
wb.Write(msg.Key) // key bytes
202+
wb.writeVarInt(int64(len(msg.Value))) // value len
203+
wb.Write(msg.Value) // value bytes
204+
wb.writeVarInt(int64(len(msg.Headers))) // number of headers
205+
for _, header := range msg.Headers {
206+
wb.writeVarInt(int64(len(header.Key)))
207+
wb.Write([]byte(header.Key))
208+
wb.writeVarInt(int64(len(header.Value)))
209+
wb.Write(header.Value)
210+
}
211+
})
212+
wb.writeVarInt(int64(len(bs)))
213+
wb.Write(bs)
214+
}))
215+
}
216+
})
217+
if f.codec != nil {
218+
bs = mustCompress(bs, f.codec)
219+
}
220+
wb.Write(bs)
221+
}))
222+
})
223+
}
224+
225+
// kafkaWriteBuffer is a write buffer that helps writing fetch responses
226+
type kafkaWriteBuffer struct {
227+
writeBuffer
228+
buf bytes.Buffer
229+
}
230+
231+
func newWB() *kafkaWriteBuffer {
232+
res := kafkaWriteBuffer{}
233+
res.writeBuffer.w = &res.buf
234+
return &res
235+
}
236+
237+
func (f *kafkaWriteBuffer) Bytes() []byte {
238+
return f.buf.Bytes()
239+
}
240+
241+
// call is a convenience method that allows the kafkaWriteBuffer to be used
242+
// in a functional manner. This is helpful when building
243+
// nested structures, as the return value can be fed into
244+
// other fwWB APIs.
245+
func (f *kafkaWriteBuffer) call(cb func(wb *kafkaWriteBuffer)) []byte {
246+
cb(f)
247+
bs := f.Bytes()
248+
if bs == nil {
249+
bs = []byte{}
250+
}
251+
return bs
252+
}
253+
254+
func mustCompress(bs []byte, codec compress.Codec) (res []byte) {
255+
buf := bytes.Buffer{}
256+
codecWriter := codec.NewWriter(&buf)
257+
_, err := io.Copy(codecWriter, bytes.NewReader(bs))
258+
if err != nil {
259+
panic(fmt.Errorf("compress: %w", err))
260+
}
261+
err = codecWriter.Close()
262+
if err != nil {
263+
panic(fmt.Errorf("close codec writer: %w", err))
264+
}
265+
res = buf.Bytes()
266+
return
267+
}

fixtures/v1-v1.hex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
000001660000000a00000000000015c79861000000010009746573742d6564677900000001000000000000000000000000000400000000000000040000000000000000ffffffff0000011f00000000000000000000003ca293717501000000017c4f08dc7f00000005616c706861000000217b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00000000000000010000003b3d4abab001000000017c4f08dc970000000462657461000000217b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d00000000000000020000003cbcad5cde01000000017c4f09b16d0000000567616d6d61000000217b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d00000000000000030000003c8585230b01000000017c4f09b6b20000000564656c7461000000217b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d

fixtures/v1-v1.pcapng

13.2 KB
Binary file not shown.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
000006b40000000a00000000000021f08796000000010007746573742d383800000001000000000000000000000000001400000000000000140000000000000000ffffffff0000066f00000000000000000000003c42f0d0f101000000017c477ab6a500000005616c706861000000217b22636f756e74223a302c2266696c6c6572223a2261616161616161616161227d00000000000000010000003bf4f7a99e01000000017c477abb610000000462657461000000217b22636f756e74223a302c2266696c6c6572223a2262626262626262626262227d00000000000000020000005fd3cf85ff01010000017c477b3bcbffffffff000000491f8b0800000000000000636080039bba2d51db18810cc61af76aebd340066b624e41462290a158ad949c5f9a57a26465a0a394969993935aa464a59408074ab5001b5f3ee14800000000000000000000030000005e5d1733a801010000017c477b408fffffffff000000481f8b080000000000000063608003eb95673d5f3002198c35eed50efd40064b526a49229056ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8203a55a002737831e4700000000000000000000040000005e00000000020ab23c660000000000000000017c477d995f0000017c477d995fffffffffffffffffffffffffffff00000001580000000a67616d6d61427b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d0000000000000000050000005e000000000238c0553f0000000000000000017c477d9ec80000017c477d9ec8ffffffffffffffffffffffffffff00000001580000000a64656c7461427b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d0000000000000000060000006a0000000002188627120001000000000000017c477dd0b70000017c477dd0b7ffffffffffffffffffffffffffff000000011f8b08000000000000008b606060e04a4fcccd4d74aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8603a55a0600788108a12d00000000000000000000070000006a0000000002b08e2b720001000000000000017c477dd7ef0000017c477dd7efffffffffffffffffffffffffffff000000011f8b08000000000000008b606060e04a49cd294974aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8103a55a0600496dfe822d00000000000000000000080000008d00000000023cc016270000000000010000017c4784fe490000017c47850044ffffffffffffffffffffffffffff000000025c0000000e657073696c6f6e427b22636f756e74223a302c2266696c6c6572223a2265656565656565656565227d005800f60702087a657461427b22636f756e74223a302c2266696c6c6572223a2266666666666666666666227d00000000000000000a0000007d00000000026e844d550001000000010000017c4785514b0000017c47855423ffffffffffffffffffffffffffff000000021f8b08000000000000008b616060e04b2d28ceccc9cf73aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8503a55a8608860ddc4c1c55a92589d815a7c1015031003ebb53a15c000000000000000000000c0000008a0000000002e5dfd9e20000000000010000017c4c8f1e1c0000017c4c8f20e8ffffffffffffffffffffffffffff000000025400000006657461427b22636f756e74223a302c2266696c6c6572223a2267676767676767676767227d005a00980b020a7468657461427b22636f756e74223a302c2266696c6c6572223a2268686868686868686868227d00000000000000000e0000007700000000020f80521f0001000000010000017c4c8f4da50000017c4c8f4fb8ffffffffffffffffffffffffffff000000021f8b08000000000000000b616060604b2d4974aa564ace2fcd2b51b232d0514acbccc9492d52b2524a8703a55a862886651c4c5c2519385567c001503500b01e3aa95900000000000000000000100000003a3b6d4cf601000000017c4cadbd7300000003657461000000217b22636f756e74223a302c2266696c6c6572223a2267676767676767676767227d00000000000000110000003c857f5cd501000000017c4cadbd99000000057468657461000000217b22636f756e74223a302c2266696c6c6572223a2268686868686868686868227d000000000000001300000076dbf0a20a01010000017c4cadf305ffffffff000000601f8b080000000000000063608003ab0d4959758c4006638dcfda8fcf810ce6d4924420a558ad949c5f9a57a26465a0a394969993935aa464a5940e074ab55013409a6d9412cf95c14cf9cc0a64b09664e03327030e946a01e34da7538e000000
Binary file not shown.

fixtures/v1c-v1-v1c.hex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
000002350000000a0000000000003d15acfe00000001000b746573742d627265657a7900000001000000000000000000000000000600000000000000060000000000000000ffffffff000001ec000000000000000100000079779afa8b01010000017c4f11cdc9ffffffff000000631f8b0800000000000000636080039bf9617b7418810cc61a7fc1b32b810cd6c49c828c442043b15a2939bf34af44c9ca4047292d332727b548c94a29110e946aa16680b45b5b967f780937e72490c192945a82db98243850aa05001ea2107b8f00000000000000000000020000003cda0e410e01000000017c4f1212630000000567616d6d61000000217b22636f756e74223a302c2266696c6c6572223a2263636363636363636363227d00000000000000030000003c0470399301000000017c4f12154e0000000564656c7461000000217b22636f756e74223a302c2266696c6c6572223a2264646464646464646464227d0000000000000004000000613b0e4db601010000017c4f124947ffffffff0000004b1f8b080000000000000063608003bb67b39e743302198c35fe429eee40067b6a4171664e7e1e90a958ad949c5f9a57a26465a0a394969993935aa464a5940a074ab5007d95b7894a00000000000000000000050000005edb50180901010000017c4f124fd0ffffffff000000481f8b080000000000000063608003ebfbf2b32c18810cc61a7f21ff0b40064b556a49229056ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8303a55a005ec594df47000000

fixtures/v1c-v1-v1c.pcapng

13.5 KB
Binary file not shown.

fixtures/v1c-v1c.hex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
000001a20000000a0000000000001abffa5700000001000a746573742d677574737900000001000000000000000000000000000400000000000000040000000000000000ffffffff0000015a0000000000000001000000789125e5e201010000017c4f0ee474ffffffff000000621f8b0800000000000000636080039bfcfd51598c4006638d3fdf131f20833531a7202311c850ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8403a55aa81920edd67a221c2e70734a800c96a4d412dcc624c181522d001d8564f48f00000000000000000000020000005f66e75d9b01010000017c4f0f55f5ffffffff000000491f8b0800000000000000636080039bfd2566fe8c4006638d3f7fe8572083353d31373711c850ac564ace2fcd2b51b232d0514acbccc9492d52b2524a8603a55a008ef7186d4800000000000000000000030000005f3cff26a901010000017c4f0f5d5cffffffff000000491f8b0800000000000000636080031b6f8db3d18c4006638d3f7f6c0c90c19a929a5392086428562b25e797e695285919e828a565e6e4a416295929a5c081522d00dd1f6ff148000000

fixtures/v1c-v1c.pcapng

13.3 KB
Binary file not shown.

0 commit comments

Comments
 (0)