@@ -2,11 +2,303 @@ package kafka
2
2
3
3
import (
4
4
"bufio"
5
+ "bytes"
6
+ "encoding/hex"
7
+ "fmt"
8
+ "io"
5
9
"math/rand"
10
+ "os"
6
11
"testing"
7
12
"time"
13
+
14
+ "github.com/segmentio/kafka-go/compress/gzip"
15
+ "github.com/segmentio/kafka-go/compress/lz4"
16
+ "github.com/segmentio/kafka-go/compress/snappy"
17
+ "github.com/segmentio/kafka-go/compress/zstd"
18
+ "github.com/stretchr/testify/require"
8
19
)
9
20
21
+ func TestMessageSetReader (t * testing.T ) {
22
+ const startOffset = 1000
23
+ const highWatermark = 5000
24
+ const topic = "test-topic"
25
+ msgs := make ([]Message , 100 )
26
+ for i := 0 ; i < 100 ; i ++ {
27
+ msgs [i ] = Message {
28
+ Time : time .Now (),
29
+ Offset : int64 (i + startOffset ),
30
+ Key : []byte (fmt .Sprintf ("key-%d" , i )),
31
+ Value : []byte (fmt .Sprintf ("val-%d" , i )),
32
+ }
33
+ }
34
+ defaultHeader := fetchResponseHeader {
35
+ highWatermarkOffset : highWatermark ,
36
+ lastStableOffset : highWatermark ,
37
+ topic : topic ,
38
+ }
39
+ for _ , tc := range []struct {
40
+ name string
41
+ builder fetchResponseBuilder
42
+ err error
43
+ debug bool
44
+ }{
45
+ {
46
+ name : "empty" ,
47
+ builder : fetchResponseBuilder {
48
+ header : defaultHeader ,
49
+ },
50
+ err : errShortRead ,
51
+ },
52
+ {
53
+ name : "v0" ,
54
+ builder : fetchResponseBuilder {
55
+ header : defaultHeader ,
56
+ msgSets : []messageSetBuilder {
57
+ v0MessageSetBuilder {
58
+ Message : msgs [0 ],
59
+ },
60
+ },
61
+ },
62
+ },
63
+ {
64
+ name : "v0 compressed" ,
65
+ builder : fetchResponseBuilder {
66
+ header : defaultHeader ,
67
+ msgSets : []messageSetBuilder {
68
+ v0MessageSetBuilder {
69
+ codec : new (gzip.Codec ),
70
+ Message : msgs [0 ],
71
+ },
72
+ },
73
+ },
74
+ },
75
+ {
76
+ name : "v1" ,
77
+ builder : fetchResponseBuilder {
78
+ header : defaultHeader ,
79
+ msgSets : []messageSetBuilder {
80
+ v1MessageSetBuilder {
81
+ Message : msgs [0 ],
82
+ },
83
+ },
84
+ },
85
+ },
86
+ {
87
+ name : "v1 compressed" ,
88
+ builder : fetchResponseBuilder {
89
+ header : defaultHeader ,
90
+ msgSets : []messageSetBuilder {
91
+ v1MessageSetBuilder {
92
+ codec : new (gzip.Codec ),
93
+ Message : msgs [0 ],
94
+ },
95
+ },
96
+ },
97
+ },
98
+ {
99
+ name : "v2" ,
100
+ builder : fetchResponseBuilder {
101
+ header : defaultHeader ,
102
+ msgSets : []messageSetBuilder {
103
+ v2MessageSetBuilder {
104
+ msgs : []Message {msgs [0 ]},
105
+ },
106
+ },
107
+ },
108
+ },
109
+ {
110
+ name : "v2 compressed" ,
111
+ builder : fetchResponseBuilder {
112
+ header : defaultHeader ,
113
+ msgSets : []messageSetBuilder {
114
+ v2MessageSetBuilder {
115
+ codec : new (zstd.Codec ),
116
+ msgs : []Message {msgs [0 ]},
117
+ },
118
+ },
119
+ },
120
+ },
121
+ {
122
+ name : "v2 multiple messages" ,
123
+ builder : fetchResponseBuilder {
124
+ header : defaultHeader ,
125
+ msgSets : []messageSetBuilder {
126
+ v2MessageSetBuilder {
127
+ msgs : []Message {msgs [0 ], msgs [1 ], msgs [2 ], msgs [3 ], msgs [4 ]},
128
+ },
129
+ },
130
+ },
131
+ },
132
+ {
133
+ name : "v2 multiple messages compressed" ,
134
+ builder : fetchResponseBuilder {
135
+ header : defaultHeader ,
136
+ msgSets : []messageSetBuilder {
137
+ v2MessageSetBuilder {
138
+ codec : new (snappy.Codec ),
139
+ msgs : []Message {msgs [0 ], msgs [1 ], msgs [2 ], msgs [3 ], msgs [4 ]},
140
+ },
141
+ },
142
+ },
143
+ },
144
+ {
145
+ name : "v2 mix of compressed and uncompressed message sets" ,
146
+ builder : fetchResponseBuilder {
147
+ header : defaultHeader ,
148
+ msgSets : []messageSetBuilder {
149
+ v2MessageSetBuilder {
150
+ codec : new (snappy.Codec ),
151
+ msgs : []Message {msgs [0 ], msgs [1 ], msgs [2 ], msgs [3 ], msgs [4 ]},
152
+ },
153
+ v2MessageSetBuilder {
154
+ msgs : []Message {msgs [5 ], msgs [6 ], msgs [7 ], msgs [8 ], msgs [9 ]},
155
+ },
156
+ v2MessageSetBuilder {
157
+ codec : new (snappy.Codec ),
158
+ msgs : []Message {msgs [10 ], msgs [11 ], msgs [12 ], msgs [13 ], msgs [14 ]},
159
+ },
160
+ v2MessageSetBuilder {
161
+ msgs : []Message {msgs [15 ], msgs [16 ], msgs [17 ], msgs [18 ], msgs [19 ]},
162
+ },
163
+ },
164
+ },
165
+ },
166
+ {
167
+ name : "v0 v2 v1 v2 v1 v1 v0 v2" ,
168
+ builder : fetchResponseBuilder {
169
+ header : defaultHeader ,
170
+ msgSets : []messageSetBuilder {
171
+ v0MessageSetBuilder {
172
+ Message : msgs [0 ],
173
+ },
174
+ v2MessageSetBuilder {
175
+ msgs : []Message {msgs [1 ], msgs [2 ]},
176
+ },
177
+ v1MessageSetBuilder {
178
+ Message : msgs [3 ],
179
+ },
180
+ v2MessageSetBuilder {
181
+ msgs : []Message {msgs [4 ], msgs [5 ]},
182
+ },
183
+ v1MessageSetBuilder {
184
+ Message : msgs [6 ],
185
+ },
186
+ v1MessageSetBuilder {
187
+ Message : msgs [7 ],
188
+ },
189
+ v0MessageSetBuilder {
190
+ Message : msgs [8 ],
191
+ },
192
+ v2MessageSetBuilder {
193
+ msgs : []Message {msgs [9 ], msgs [10 ]},
194
+ },
195
+ },
196
+ },
197
+ },
198
+ {
199
+ name : "v0 v2 v1 v2 v1 v1 v0 v2 mixed compression" ,
200
+ builder : fetchResponseBuilder {
201
+ header : defaultHeader ,
202
+ msgSets : []messageSetBuilder {
203
+ v0MessageSetBuilder {
204
+ codec : new (gzip.Codec ),
205
+ Message : msgs [0 ],
206
+ },
207
+ v2MessageSetBuilder {
208
+ codec : new (zstd.Codec ),
209
+ msgs : []Message {msgs [1 ], msgs [2 ]},
210
+ },
211
+ v1MessageSetBuilder {
212
+ codec : new (snappy.Codec ),
213
+ Message : msgs [3 ],
214
+ },
215
+ v2MessageSetBuilder {
216
+ codec : new (lz4.Codec ),
217
+ msgs : []Message {msgs [4 ], msgs [5 ]},
218
+ },
219
+ v1MessageSetBuilder {
220
+ codec : new (gzip.Codec ),
221
+ Message : msgs [6 ],
222
+ },
223
+ v1MessageSetBuilder {
224
+ codec : new (zstd.Codec ),
225
+ Message : msgs [7 ],
226
+ },
227
+ v0MessageSetBuilder {
228
+ codec : new (snappy.Codec ),
229
+ Message : msgs [8 ],
230
+ },
231
+ v2MessageSetBuilder {
232
+ codec : new (lz4.Codec ),
233
+ msgs : []Message {msgs [9 ], msgs [10 ]},
234
+ },
235
+ },
236
+ },
237
+ },
238
+ {
239
+ name : "v0 v2 v1 v2 v1 v1 v0 v2 mixed compression with non-compressed" ,
240
+ builder : fetchResponseBuilder {
241
+ header : defaultHeader ,
242
+ msgSets : []messageSetBuilder {
243
+ v0MessageSetBuilder {
244
+ codec : new (gzip.Codec ),
245
+ Message : msgs [0 ],
246
+ },
247
+ v2MessageSetBuilder {
248
+ msgs : []Message {msgs [1 ], msgs [2 ]},
249
+ },
250
+ v1MessageSetBuilder {
251
+ codec : new (snappy.Codec ),
252
+ Message : msgs [3 ],
253
+ },
254
+ v2MessageSetBuilder {
255
+ msgs : []Message {msgs [4 ], msgs [5 ]},
256
+ },
257
+ v1MessageSetBuilder {
258
+ Message : msgs [6 ],
259
+ },
260
+ v1MessageSetBuilder {
261
+ codec : new (zstd.Codec ),
262
+ Message : msgs [7 ],
263
+ },
264
+ v0MessageSetBuilder {
265
+ Message : msgs [8 ],
266
+ },
267
+ v2MessageSetBuilder {
268
+ codec : new (lz4.Codec ),
269
+ msgs : []Message {msgs [9 ], msgs [10 ]},
270
+ },
271
+ },
272
+ },
273
+ },
274
+ } {
275
+ t .Run (tc .name , func (t * testing.T ) {
276
+ rh , err := newReaderHelper (t , tc .builder .bytes ())
277
+ require .Equal (t , tc .err , err )
278
+ if tc .err != nil {
279
+ return
280
+ }
281
+ rh .offset = tc .builder .messages ()[0 ].Offset
282
+ rh .debug = tc .debug
283
+ for _ , expected := range tc .builder .messages () {
284
+ msg := rh .readMessage ()
285
+ require .Equal (t , string (expected .Key ), string (msg .Key ))
286
+ require .Equal (t , string (expected .Value ), string (msg .Value ))
287
+ require .Equal (t , expected .Offset , msg .Offset )
288
+ }
289
+ // verify the reader stack is empty
290
+ require .EqualValues (t , 0 , rh .remain )
291
+ require .EqualValues (t , 0 , rh .count )
292
+ require .EqualValues (t , 0 , rh .remaining ())
293
+ require .Nil (t , rh .readerStack .parent )
294
+ // any further message is a short read
295
+ _ , err = rh .readMessageErr ()
296
+ require .EqualError (t , err , errShortRead .Error ())
297
+ })
298
+ }
299
+
300
+ }
301
+
10
302
func TestMessageSetReaderEmpty (t * testing.T ) {
11
303
m := messageSetReader {empty : true }
12
304
@@ -65,3 +357,53 @@ func randate() time.Time {
65
357
sec := rand .Int63n (delta ) + min
66
358
return time .Unix (sec , 0 )
67
359
}
360
+
361
+ // readerHelper composes a messageSetReader to provide convenience methods to read
362
+ // messages.
363
+ type readerHelper struct {
364
+ t * testing.T
365
+ * messageSetReader
366
+ offset int64
367
+ }
368
+
369
+ func newReaderHelper (t * testing.T , bs []byte ) (r * readerHelper , err error ) {
370
+ bufReader := bufio .NewReader (bytes .NewReader (bs ))
371
+ _ , _ , remain , err := readFetchResponseHeaderV10 (bufReader , len (bs ))
372
+ require .NoError (t , err )
373
+ var msgs * messageSetReader
374
+ msgs , err = newMessageSetReader (bufReader , remain )
375
+ if err != nil {
376
+ return
377
+ }
378
+ r = & readerHelper {t : t , messageSetReader : msgs }
379
+ require .Truef (t , msgs .remaining () > 0 , "remaining should be > 0 but was %d" , msgs .remaining ())
380
+ return
381
+ }
382
+
383
+ func (r * readerHelper ) readMessageErr () (msg Message , err error ) {
384
+ keyFunc := func (r * bufio.Reader , size int , nbytes int ) (remain int , err error ) {
385
+ msg .Key , remain , err = readNewBytes (r , size , nbytes )
386
+ return
387
+ }
388
+ valueFunc := func (r * bufio.Reader , size int , nbytes int ) (remain int , err error ) {
389
+ msg .Value , remain , err = readNewBytes (r , size , nbytes )
390
+ return
391
+ }
392
+ var timestamp int64
393
+ var headers []Header
394
+ r .offset , timestamp , headers , err = r .messageSetReader .readMessage (r .offset , keyFunc , valueFunc )
395
+ if err != nil {
396
+ return
397
+ }
398
+ msg .Offset = r .offset
399
+ msg .Time = time .UnixMilli (timestamp )
400
+ msg .Headers = headers
401
+ return
402
+ }
403
+
404
+ func (r * readerHelper ) readMessage () (msg Message ) {
405
+ var err error
406
+ msg , err = r .readMessageErr ()
407
+ require .NoError (r .t , err )
408
+ return
409
+ }
0 commit comments