Skip to content

Commit c0c178f

Browse files
committed
Updated with comments
1 parent ceeeb4d commit c0c178f

File tree

2 files changed

+33
-16
lines changed

2 files changed

+33
-16
lines changed

lib/kafka/compressor.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def initialize(codec_name: nil, threshold: 1, instrumenter:)
2727
end
2828

2929
# @param message_set [Protocol::MessageSet]
30+
# @param offset [Integer] used to simulate broker behaviour in tests
3031
# @return [Protocol::MessageSet]
3132
def compress(message_set, offset: -1)
3233
return message_set if @codec.nil? || message_set.size < @threshold

lib/kafka/protocol/message.rb

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,7 @@ def decompress
5858
message_set_decoder = Decoder.from_string(data)
5959
message_set = MessageSet.decode(message_set_decoder)
6060

61-
max_relative_offset = message_set.messages.last.offset
62-
return message_set if max_relative_offset == offset
63-
64-
# The contained messages need to have their offset corrected.
65-
base_offset = offset - max_relative_offset
66-
messages = message_set.messages.each_with_index.map do |message, i|
67-
Message.new(
68-
offset: message.offset + base_offset,
69-
value: message.value,
70-
key: message.key,
71-
create_time: message.create_time,
72-
codec_id: message.codec_id
73-
)
74-
end
75-
76-
MessageSet.new(messages: messages)
61+
correct_offsets(message_set)
7762
end
7863

7964
def self.decode(decoder)
@@ -117,6 +102,37 @@ def self.decode(decoder)
117102

118103
private
119104

105+
# Offsets may be relative with regards to wrapped message offset, but there are special cases.
106+
#
107+
# Cases when client will receive corrected offsets:
108+
# - When fetch request is version 0, kafka will correct relative offset on broker side before replying fetch response
109+
# - When messages is stored in 0.9 format on disk (broker configured to do so).
110+
#
111+
# All other cases, compressed inner messages should have relative offset, with below attributes:
112+
# - The container message should have the 'real' offset
113+
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
114+
# - The first inner message should always have offset = 0
115+
def correct_offsets(message_set)
116+
max_relative_offset = message_set.messages.last.offset
117+
118+
# The offsets are already correct, do nothing.
119+
return message_set if max_relative_offset == offset
120+
121+
# The contained messages have relative offsets, and needs to be corrected.
122+
base_offset = offset - max_relative_offset
123+
messages = message_set.messages.map do |message|
124+
Message.new(
125+
offset: message.offset + base_offset,
126+
value: message.value,
127+
key: message.key,
128+
create_time: message.create_time,
129+
codec_id: message.codec_id
130+
)
131+
end
132+
133+
MessageSet.new(messages: messages)
134+
end
135+
120136
def encode_with_crc
121137
buffer = StringIO.new
122138
encoder = Encoder.new(buffer)

0 commit comments

Comments
 (0)