@@ -29,6 +29,12 @@ func TestMessageSetReader(t *testing.T) {
29
29
Offset : int64 (i + startOffset ),
30
30
Key : []byte (fmt .Sprintf ("key-%d" , i )),
31
31
Value : []byte (fmt .Sprintf ("val-%d" , i )),
32
+ Headers : []Header {
33
+ {
34
+ Key : fmt .Sprintf ("header-key-%d" , i ),
35
+ Value : []byte (fmt .Sprintf ("header-value-%d" , i )),
36
+ },
37
+ },
32
38
}
33
39
}
34
40
defaultHeader := fetchResponseHeader {
@@ -280,11 +286,23 @@ func TestMessageSetReader(t *testing.T) {
280
286
}
281
287
rh .offset = tc .builder .messages ()[0 ].Offset
282
288
rh .debug = tc .debug
283
- for _ , expected := range tc .builder .messages () {
284
- msg := rh .readMessage ()
285
- require .Equal (t , string (expected .Key ), string (msg .Key ))
286
- require .Equal (t , string (expected .Value ), string (msg .Value ))
287
- require .Equal (t , expected .Offset , msg .Offset )
289
+ for _ , messageSet := range tc .builder .msgSets {
290
+ for _ , expected := range messageSet .messages () {
291
+ msg := rh .readMessage ()
292
+ require .Equal (t , string (expected .Key ), string (msg .Key ))
293
+ require .Equal (t , string (expected .Value ), string (msg .Value ))
294
+ switch messageSet .(type ) {
295
+ case v0MessageSetBuilder , v1MessageSetBuilder :
296
+ // v0 and v1 message sets do not have headers
297
+ require .Len (t , msg .Headers , 0 )
298
+ case v2MessageSetBuilder :
299
+ // v2 message sets can have headers
300
+ require .EqualValues (t , expected .Headers , msg .Headers )
301
+ default :
302
+ t .Fatalf ("unknown builder: %T" , messageSet )
303
+ }
304
+ require .Equal (t , expected .Offset , msg .Offset )
305
+ }
288
306
}
289
307
// verify the reader stack is empty
290
308
require .EqualValues (t , 0 , rh .remain )
0 commit comments