Skip to content

Commit 899c7c9

Browse files
committed
Optimize fetched messages
Avoid copying data from Message to FetchedMessage.
1 parent 1dec9d1 commit 899c7c9

File tree

3 files changed

+26
-25
lines changed

3 files changed

+26
-25
lines changed

lib/kafka/fetch_operation.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,9 @@ def execute
9191

9292
messages = fetched_partition.messages.map {|message|
9393
FetchedMessage.new(
94-
value: message.value,
95-
key: message.key,
94+
message: message,
9695
topic: fetched_topic.name,
9796
partition: fetched_partition.partition,
98-
offset: message.offset,
99-
create_time: message.create_time,
10097
)
10198
}
10299

lib/kafka/fetched_message.rb

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,35 @@
11
module Kafka
22
class FetchedMessage
3-
4-
# @return [String] the value of the message.
5-
attr_reader :value
6-
7-
# @return [String] the key of the message.
8-
attr_reader :key
9-
103
# @return [String] the name of the topic that the message was written to.
114
attr_reader :topic
125

136
# @return [Integer] the partition number that the message was written to.
147
attr_reader :partition
158

9+
def initialize(message:, topic:, partition:)
10+
@message = message
11+
@topic = topic
12+
@partition = partition
13+
end
14+
15+
# @return [String] the value of the message.
16+
def value
17+
@message.value
18+
end
19+
20+
# @return [String] the key of the message.
21+
def key
22+
@message.key
23+
end
24+
1625
# @return [Integer] the offset of the message in the partition.
17-
attr_reader :offset
26+
def offset
27+
@message.offset
28+
end
1829

1930
# @return [Time] the timestamp of the message.
20-
attr_reader :create_time
21-
22-
def initialize(value: nil, key: nil, topic:, partition:, offset:, create_time: nil)
23-
@value = value
24-
@key = key
25-
@topic = topic
26-
@partition = partition
27-
@offset = offset
28-
@create_time = create_time
31+
def create_time
32+
@message.create_time
2933
end
3034
end
3135
end

spec/consumer_spec.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@
5353
describe "#each_message" do
5454
let(:messages) {
5555
[
56-
Kafka::FetchedMessage.new(
56+
double(:message, {
5757
value: "hello",
5858
key: nil,
5959
topic: "greetings",
6060
partition: 0,
6161
offset: 13,
6262
create_time: Time.now,
63-
)
63+
})
6464
]
6565
}
6666

@@ -190,14 +190,14 @@
190190
describe "#each_batch" do
191191
let(:messages) {
192192
[
193-
Kafka::FetchedMessage.new(
193+
double(:message, {
194194
value: "hello",
195195
key: nil,
196196
topic: "greetings",
197197
partition: 0,
198198
offset: 13,
199199
create_time: Time.now,
200-
)
200+
})
201201
]
202202
}
203203

0 commit comments

Comments
 (0)