File tree Expand file tree Collapse file tree 2 files changed +3
-16
lines changed Expand file tree Collapse file tree 2 files changed +3
-16
lines changed Original file line number Diff line number Diff line change @@ -56,20 +56,7 @@ def decompress
5656 # For some weird reason we need to cut out the first 20 bytes.
5757 data = codec . decompress ( value )
5858 message_set_decoder = Decoder . from_string ( data )
59- message_set = MessageSet . decode ( message_set_decoder )
60-
61- # The contained messages need to have their offset corrected.
62- messages = message_set . messages . each_with_index . map do |message , i |
63- Message . new (
64- offset : offset + i ,
65- value : message . value ,
66- key : message . key ,
67- create_time : message . create_time ,
68- codec_id : message . codec_id
69- )
70- end
71-
72- MessageSet . new ( messages : messages )
59+ MessageSet . decode ( message_set_decoder )
7360 end
7461
7562 def self . decode ( decoder )
Original file line number Diff line number Diff line change 55 it "encodes and decodes compressed messages" do
66 compressor = Kafka ::Compressor . new ( codec_name : :snappy , threshold : 1 , instrumenter : instrumenter )
77
8- message1 = Kafka ::Protocol ::Message . new ( value : "hello1" )
9- message2 = Kafka ::Protocol ::Message . new ( value : "hello2" )
8+ message1 = Kafka ::Protocol ::Message . new ( value : "hello1" , offset : - 1 )
9+ message2 = Kafka ::Protocol ::Message . new ( value : "hello2" , offset : 0 )
1010
1111 message_set = Kafka ::Protocol ::MessageSet . new ( messages : [ message1 , message2 ] )
1212 compressed_message_set = compressor . compress ( message_set )
You can’t perform that action at this time.
0 commit comments