Skip to content

Commit dcd299f

Browse files
committed
Resurrect the fix provided by @ale7714 in #495
1 parent 574fbed commit dcd299f

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

lib/kafka/compressor.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,12 @@ def compress(message_set)
3232
return message_set if @codec.nil? || message_set.size < @threshold
3333

3434
compressed_data = compress_data(message_set)
35+
offset = message_set.messages.map(&:offset).max
3536

3637
wrapper_message = Protocol::Message.new(
3738
value: compressed_data,
3839
codec_id: @codec.codec_id,
40+
offset: offset
3941
)
4042

4143
Protocol::MessageSet.new(messages: [wrapper_message])

lib/kafka/protocol/message.rb

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,22 @@ def decompress
5656
# For some weird reason we need to cut out the first 20 bytes.
5757
data = codec.decompress(value)
5858
message_set_decoder = Decoder.from_string(data)
59-
MessageSet.decode(message_set_decoder)
59+
message_set = MessageSet.decode(message_set_decoder)
60+
61+
base_offset = offset - message_set.size + 1
62+
63+
# The contained messages need to have their offset corrected.
64+
messages = message_set.messages.each_with_index.map do |message, i|
65+
Message.new(
66+
offset: base_offset + i,
67+
value: message.value,
68+
key: message.key,
69+
create_time: message.create_time,
70+
codec_id: message.codec_id
71+
)
72+
end
73+
74+
MessageSet.new(messages: messages)
6075
end
6176

6277
def self.decode(decoder)

spec/compressor_spec.rb

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
it "encodes and decodes compressed messages" do
66
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
77

8-
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: -1)
9-
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 0)
8+
message1 = Kafka::Protocol::Message.new(value: "hello1")
9+
message2 = Kafka::Protocol::Message.new(value: "hello2")
1010

1111
message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2])
1212
compressed_message_set = compressor.compress(message_set)
@@ -22,10 +22,28 @@
2222
# When decoding a compressed message, the offsets are calculated relative to that
2323
# of the container message. The broker will set the offset in normal operation,
2424
# but at the client-side we set it to -1.
25-
expect(messages.map(&:offset)).to eq [-1, 0]
25+
expect(messages.map(&:offset)).to eq [-2, -1]
2626
end
2727

28-
it "encodes and decodes compressed messages" do
28+
it "sets offsets correctly for compressed messages with relative offsets" do
29+
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
30+
31+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
32+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 1)
33+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 772043)
34+
35+
message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2, message3])
36+
compressed_message_set = compressor.compress(message_set)
37+
data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
38+
39+
decoder = Kafka::Protocol::Decoder.from_string(data)
40+
decoded_message_set = Kafka::Protocol::MessageSet.decode(decoder)
41+
messages = decoded_message_set.messages
42+
43+
expect(messages.map(&:offset)).to eq [772041, 772042, 772043]
44+
end
45+
46+
it "keeps the offsets for compressed messages with provided offsets" do
2947
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
3048

3149
message1 = Kafka::Protocol::Message.new(
@@ -45,9 +63,9 @@
4563
compressed_message_set = compressor.compress(message_set)
4664

4765
data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
66+
4867
decoder = Kafka::Protocol::Decoder.from_string(data)
49-
decoded_message = Kafka::Protocol::Message.decode(decoder)
50-
decoded_message_set = decoded_message.decompress
68+
decoded_message_set = Kafka::Protocol::MessageSet.decode(decoder)
5169
messages = decoded_message_set.messages
5270

5371
expect(messages.map(&:offset)).to eq [772033, 772034, 772035, 772036, 772037, 772038, 772039, 772040, 772041, 772042, 772043]

0 commit comments

Comments
 (0)