|
1 | 1 | describe "Consumer API", functional: true do |
2 | | - let!(:topic) { create_random_topic(num_partitions: 3) } |
3 | 2 | let(:offset_retention_time) { 30 } |
4 | 3 |
|
5 | 4 | example "consuming messages from the beginning of a topic" do |
|
52 | 51 | expect(received_messages.map(&:value).map(&:to_i)).to match_array messages |
53 | 52 | end |
54 | 53 |
|
55 | | - example "consuming messages a topic that's being written to" do |
56 | | - num_partitions = 15 |
57 | | - messages = (1..1000).to_a |
| 54 | + example "consuming messages from a topic that's being written to" do |
| 55 | + num_partitions = 3 |
| 56 | + topic = create_random_topic(num_partitions: num_partitions) |
| 57 | + messages = (1..100).to_a |
| 58 | + |
| 59 | + mutex = Mutex.new |
| 60 | + var = ConditionVariable.new |
58 | 61 |
|
59 | 62 | Thread.new do |
60 | 63 | kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test") |
61 | 64 | producer = kafka.producer |
62 | 65 |
|
63 | 66 | messages.each do |i| |
64 | | - producer.produce(i.to_s, topic: topic, partition_key: i.to_s) |
| 67 | + producer.produce(i.to_s, topic: topic, partition: i % 3) |
65 | 68 |
|
66 | 69 | if i % 100 == 0 |
67 | 70 | producer.deliver_messages |
68 | | - sleep 1 |
| 71 | + |
| 72 | + mutex.synchronize do |
| 73 | + var.wait(mutex) |
| 74 | + end |
69 | 75 | end |
70 | 76 | end |
71 | 77 |
|
|
78 | 84 | end |
79 | 85 |
|
80 | 86 | group_id = "test#{rand(1000)}" |
| 87 | + received_messages = [] |
81 | 88 |
|
82 | 89 | threads = 2.times.map do |thread_id| |
83 | 90 | t = Thread.new do |
84 | | - received_messages = [] |
85 | | - |
86 | 91 | kafka = Kafka.new(seed_brokers: kafka_brokers, client_id: "test", logger: logger) |
87 | 92 | consumer = kafka.consumer(group_id: group_id, offset_retention_time: offset_retention_time) |
88 | 93 | consumer.subscribe(topic) |
|
91 | 96 | if message.value.nil? |
92 | 97 | consumer.stop |
93 | 98 | else |
94 | | - received_messages << Integer(message.value) |
| 99 | + mutex.synchronize do |
| 100 | + received_messages << Integer(message.value) |
| 101 | + var.signal |
| 102 | + end |
95 | 103 | end |
96 | 104 | end |
97 | | - |
98 | | - received_messages |
99 | 105 | end |
100 | 106 |
|
101 | 107 | t.abort_on_exception = true |
102 | 108 |
|
103 | 109 | t |
104 | 110 | end |
105 | 111 |
|
106 | | - received_messages = threads.map(&:value).flatten |
| 112 | + threads.each(&:join) |
107 | 113 |
|
108 | | - expect(received_messages.sort).to match_array messages |
| 114 | + expect(received_messages).to match_array messages |
109 | 115 | end |
110 | 116 |
|
111 | 117 | example "consuming messages from the end of a topic" do |
|
0 commit comments