Skip to content

Commit 76a97ec

Browse files
authored
Merge pull request #506 from klippx/fix-compressed-offset-bug
Fix compressed offset bug
2 parents ef7633e + 02ad21c commit 76a97ec

File tree

4 files changed

+85
-36
lines changed

4 files changed

+85
-36
lines changed

lib/kafka/compressor.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,17 @@ def initialize(codec_name: nil, threshold: 1, instrumenter:)
2929
end
3030

3131
# @param message_set [Protocol::MessageSet]
32+
# @param offset [Integer] used to simulate broker behaviour in tests
3233
# @return [Protocol::MessageSet]
33-
def compress(message_set)
34+
def compress(message_set, offset: -1)
3435
return message_set if @codec.nil? || message_set.size < @threshold
3536

3637
compressed_data = compress_data(message_set)
3738

3839
wrapper_message = Protocol::Message.new(
3940
value: compressed_data,
4041
codec_id: @codec.codec_id,
42+
offset: offset
4143
)
4244

4345
Protocol::MessageSet.new(messages: [wrapper_message])

lib/kafka/protocol/message.rb

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,7 @@ def decompress
5858
message_set_decoder = Decoder.from_string(data)
5959
message_set = MessageSet.decode(message_set_decoder)
6060

61-
# The contained messages need to have their offset corrected.
62-
messages = message_set.messages.each_with_index.map do |message, i|
63-
Message.new(
64-
offset: offset + i,
65-
value: message.value,
66-
key: message.key,
67-
create_time: message.create_time,
68-
codec_id: message.codec_id
69-
)
70-
end
71-
72-
MessageSet.new(messages: messages)
61+
correct_offsets(message_set)
7362
end
7463

7564
def self.decode(decoder)
@@ -113,6 +102,36 @@ def self.decode(decoder)
113102

114103
private
115104

105+
# Offsets may be relative with regards to wrapped message offset, but there are special cases.
106+
#
107+
# Cases when client will receive corrected offsets:
108+
# - When fetch request is version 0, kafka will correct relative offset on broker side before replying fetch response
109+
# - When messages is stored in 0.9 format on disk (broker configured to do so).
110+
#
111+
# All other cases, compressed inner messages should have relative offset, with below attributes:
112+
# - The container message should have the 'real' offset
113+
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
114+
def correct_offsets(message_set)
115+
max_relative_offset = message_set.messages.last.offset
116+
117+
# The offsets are already correct, do nothing.
118+
return message_set if max_relative_offset == offset
119+
120+
# The contained messages have relative offsets, and needs to be corrected.
121+
base_offset = offset - max_relative_offset
122+
messages = message_set.messages.map do |message|
123+
Message.new(
124+
offset: message.offset + base_offset,
125+
value: message.value,
126+
key: message.key,
127+
create_time: message.create_time,
128+
codec_id: message.codec_id
129+
)
130+
end
131+
132+
MessageSet.new(messages: messages)
133+
end
134+
116135
def encode_with_crc
117136
buffer = StringIO.new
118137
encoder = Encoder.new(buffer)

spec/compressor_spec.rb

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,6 @@
22
describe ".compress" do
33
let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") }
44

5-
it "encodes and decodes compressed messages" do
6-
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
7-
8-
message1 = Kafka::Protocol::Message.new(value: "hello1")
9-
message2 = Kafka::Protocol::Message.new(value: "hello2")
10-
11-
message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2])
12-
compressed_message_set = compressor.compress(message_set)
13-
14-
data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
15-
decoder = Kafka::Protocol::Decoder.from_string(data)
16-
decoded_message = Kafka::Protocol::Message.decode(decoder)
17-
decoded_message_set = decoded_message.decompress
18-
messages = decoded_message_set.messages
19-
20-
expect(messages.map(&:value)).to eq ["hello1", "hello2"]
21-
22-
# When decoding a compressed message, the offsets are calculated relative to that
23-
# of the container message. The broker will set the offset in normal operation,
24-
# but at the client-side we set it to -1.
25-
expect(messages.map(&:offset)).to eq [-1, 0]
26-
end
27-
285
it "only compresses the messages if there are at least the configured threshold" do
296
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 3, instrumenter: instrumenter)
307

spec/protocol/message_set_spec.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,55 @@
5959
Kafka::Protocol::MessageSet.decode(decoder)
6060
}.to raise_exception(Kafka::MessageTooLargeToRead)
6161
end
62+
63+
describe '.decode' do
64+
let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") }
65+
let(:compressor) { Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter) }
66+
67+
def encode(messages: [], wrapper_message_offset: -1)
68+
message_set = Kafka::Protocol::MessageSet.new(messages: messages)
69+
compressed_message_set = compressor.compress(message_set, offset: wrapper_message_offset)
70+
Kafka::Protocol::Encoder.encode_with(compressed_message_set)
71+
end
72+
73+
def decode(data)
74+
decoder = Kafka::Protocol::Decoder.from_string(data)
75+
Kafka::Protocol::MessageSet
76+
.decode(decoder)
77+
.messages
78+
end
79+
80+
it "sets offsets correctly for compressed messages with relative offsets" do
81+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
82+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 1)
83+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 2)
84+
85+
data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
86+
messages = decode(data)
87+
88+
expect(messages.map(&:offset)).to eq [998, 999, 1000]
89+
end
90+
91+
it "sets offsets correctly for compressed messages with relative offsets on a compacted topic" do
92+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
93+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 2)
94+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 3)
95+
96+
data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
97+
messages = decode(data)
98+
99+
expect(messages.map(&:offset)).to eq [997, 999, 1000]
100+
end
101+
102+
it "keeps the predefined offsets for messages delivered in 0.9 format" do
103+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 997)
104+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 999)
105+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 1000)
106+
107+
data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
108+
messages = decode(data)
109+
110+
expect(messages.map(&:offset)).to eq [997, 999, 1000]
111+
end
112+
end
62113
end

0 commit comments

Comments
 (0)