Skip to content

Commit ceeeb4d

Browse files
committed
Move specs from compressor spec to message set spec
1 parent dd96dda commit ceeeb4d

File tree

2 files changed

+55
-54
lines changed

2 files changed

+55
-54
lines changed

spec/compressor_spec.rb

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,60 +2,6 @@
22
describe ".compress" do
33
let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") }
44

5-
it "sets offsets correctly for compressed messages with relative offsets" do
6-
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
7-
8-
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
9-
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 1)
10-
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 2)
11-
12-
message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2, message3])
13-
compressed_message_set = compressor.compress(message_set, offset: 1000)
14-
data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
15-
16-
decoder = Kafka::Protocol::Decoder.from_string(data)
17-
decoded_message_set = Kafka::Protocol::MessageSet.decode(decoder)
18-
messages = decoded_message_set.messages
19-
20-
expect(messages.map(&:offset)).to eq [998, 999, 1000]
21-
end
22-
23-
it "sets offsets correctly for compressed messages with relative offsets on a compacted topic" do
24-
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
25-
26-
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
27-
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 2)
28-
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 3)
29-
30-
message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2, message3])
31-
compressed_message_set = compressor.compress(message_set, offset: 1000)
32-
data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
33-
34-
decoder = Kafka::Protocol::Decoder.from_string(data)
35-
decoded_message_set = Kafka::Protocol::MessageSet.decode(decoder)
36-
messages = decoded_message_set.messages
37-
38-
expect(messages.map(&:offset)).to eq [997, 999, 1000]
39-
end
40-
41-
it "keeps the predefined offsets for messages delivered in 0.9 format" do
42-
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
43-
44-
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 997)
45-
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 999)
46-
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 1000)
47-
48-
message_set = Kafka::Protocol::MessageSet.new(messages: [message1, message2, message3])
49-
compressed_message_set = compressor.compress(message_set, offset: 1000)
50-
data = Kafka::Protocol::Encoder.encode_with(compressed_message_set)
51-
52-
decoder = Kafka::Protocol::Decoder.from_string(data)
53-
decoded_message_set = Kafka::Protocol::MessageSet.decode(decoder)
54-
messages = decoded_message_set.messages
55-
56-
expect(messages.map(&:offset)).to eq [997, 999, 1000]
57-
end
58-
595
it "only compresses the messages if there are at least the configured threshold" do
606
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 3, instrumenter: instrumenter)
617

spec/protocol/message_set_spec.rb

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,59 @@
5959
Kafka::Protocol::MessageSet.decode(decoder)
6060
}.to raise_exception(Kafka::MessageTooLargeToRead)
6161
end
62+
63+
describe '.decode' do
64+
let(:instrumenter) { Kafka::Instrumenter.new(client_id: "test") }
65+
let(:compressor) { Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter) }
66+
67+
def encode(messages: [], wrapper_message_offset: -1)
68+
message_set = Kafka::Protocol::MessageSet.new(messages: messages)
69+
compressed_message_set = compressor.compress(message_set, offset: wrapper_message_offset)
70+
Kafka::Protocol::Encoder.encode_with(compressed_message_set)
71+
end
72+
73+
def decode(data)
74+
decoder = Kafka::Protocol::Decoder.from_string(data)
75+
Kafka::Protocol::MessageSet
76+
.decode(decoder)
77+
.messages
78+
end
79+
80+
it "sets offsets correctly for compressed messages with relative offsets" do
81+
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
82+
83+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
84+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 1)
85+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 2)
86+
87+
data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
88+
messages = decode(data)
89+
90+
expect(messages.map(&:offset)).to eq [998, 999, 1000]
91+
end
92+
93+
it "sets offsets correctly for compressed messages with relative offsets on a compacted topic" do
94+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 0)
95+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 2)
96+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 3)
97+
98+
data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
99+
messages = decode(data)
100+
101+
expect(messages.map(&:offset)).to eq [997, 999, 1000]
102+
end
103+
104+
it "keeps the predefined offsets for messages delivered in 0.9 format" do
105+
compressor = Kafka::Compressor.new(codec_name: :snappy, threshold: 1, instrumenter: instrumenter)
106+
107+
message1 = Kafka::Protocol::Message.new(value: "hello1", offset: 997)
108+
message2 = Kafka::Protocol::Message.new(value: "hello2", offset: 999)
109+
message3 = Kafka::Protocol::Message.new(value: "hello3", offset: 1000)
110+
111+
data = encode(messages: [message1, message2, message3], wrapper_message_offset: 1000)
112+
messages = decode(data)
113+
114+
expect(messages.map(&:offset)).to eq [997, 999, 1000]
115+
end
116+
end
62117
end

0 commit comments

Comments
 (0)