|
114 | 114 | expect(received_messages).to match_array messages |
115 | 115 | end |
116 | 116 |
|
117 | | - example "consuming messages from the end of a topic" do |
118 | | - sent_messages = 1_000 |
119 | | - |
120 | | - num_partitions = 1 |
121 | | - topic = create_random_topic(num_partitions: num_partitions) |
122 | | - group_id = "test#{rand(1000)}" |
123 | | - |
124 | | - consumer_thread = Thread.new do |
125 | | - received_messages = 0 |
126 | | - |
127 | | - kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger) |
128 | | - consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time) |
129 | | - consumer.subscribe(topic, start_from_beginning: false) |
130 | | - |
131 | | - consumer.each_message do |message| |
132 | | - if message.value.nil? |
133 | | - consumer.stop |
134 | | - else |
135 | | - received_messages += 1 |
136 | | - end |
137 | | - end |
138 | | - |
139 | | - received_messages |
140 | | - end |
141 | | - |
142 | | - consumer_thread.abort_on_exception = true |
143 | | - |
144 | | - sleep 1 |
145 | | - |
146 | | - Thread.new do |
147 | | - kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test") |
148 | | - producer = kafka.producer |
149 | | - |
150 | | - 1.upto(sent_messages) do |i| |
151 | | - producer.produce("hello", topic: topic, partition_key: i.to_s) |
152 | | - |
153 | | - if i % 100 == 0 |
154 | | - producer.deliver_messages |
155 | | - sleep 1 |
156 | | - end |
157 | | - end |
158 | | - |
159 | | - (0...num_partitions).each do |i| |
160 | | - # Send a tombstone to each partition. |
161 | | - producer.produce(nil, topic: topic, partition: i) |
162 | | - end |
163 | | - |
164 | | - producer.deliver_messages |
165 | | - end |
166 | | - |
167 | | - received_messages = consumer_thread.value |
168 | | - |
169 | | - expect(received_messages).to eq sent_messages |
170 | | - end |
171 | | - |
172 | 117 | example "stopping and restarting a consumer group" do |
173 | 118 | topic = create_random_topic(num_partitions: 1) |
174 | 119 | num_messages = 10 |
|
0 commit comments