Skip to content

Commit 027f394

Browse files
authored
Merge pull request #510 from zendesk/dasch/refactor-decompress
Refactor decompression code
2 parents 726d8dc + 89e6133 commit 027f394

File tree

2 files changed

+7
-9
lines changed

2 files changed

+7
-9
lines changed

lib/kafka/protocol/message.rb

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def decompress
5858
message_set_decoder = Decoder.from_string(data)
5959
message_set = MessageSet.decode(message_set_decoder)
6060

61-
correct_offsets(message_set)
61+
correct_offsets(message_set.messages)
6262
end
6363

6464
def self.decode(decoder)
@@ -111,15 +111,16 @@ def self.decode(decoder)
111111
# All other cases, compressed inner messages should have relative offset, with below attributes:
112112
# - The container message should have the 'real' offset
113113
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
114-
def correct_offsets(message_set)
115-
max_relative_offset = message_set.messages.last.offset
114+
def correct_offsets(messages)
115+
max_relative_offset = messages.last.offset
116116

117117
# The offsets are already correct, do nothing.
118-
return message_set if max_relative_offset == offset
118+
return messages if max_relative_offset == offset
119119

120120
# The contained messages have relative offsets, and needs to be corrected.
121121
base_offset = offset - max_relative_offset
122-
messages = message_set.messages.map do |message|
122+
123+
messages.map do |message|
123124
Message.new(
124125
offset: message.offset + base_offset,
125126
value: message.value,
@@ -128,8 +129,6 @@ def correct_offsets(message_set)
128129
codec_id: message.codec_id
129130
)
130131
end
131-
132-
MessageSet.new(messages: messages)
133132
end
134133

135134
def encode_with_crc

lib/kafka/protocol/message_set.rb

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ def self.decode(decoder)
3131
message = Message.decode(decoder)
3232

3333
if message.compressed?
34-
wrapped_message_set = message.decompress
35-
fetched_messages.concat(wrapped_message_set.messages)
34+
fetched_messages.concat(message.decompress)
3635
else
3736
fetched_messages << message
3837
end

0 commit comments

Comments
 (0)