Skip to content

Commit 0a40727

Browse files
committed
Add spec for extract client
--- Changelog: test
1 parent c939433 commit 0a40727

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
require "spec_helper"
2+
3+
describe Rdkafka::Producer::Client do
4+
let(:native) { double }
5+
let(:closing) { false }
6+
let(:thread) { double(Thread) }
7+
8+
subject(:client) { described_class.new(native) }
9+
10+
before do
11+
allow(Rdkafka::Bindings).to receive(:rd_kafka_poll).with(native, 250)
12+
allow(Rdkafka::Bindings).to receive(:rd_kafka_outq_len).with(native).and_return(0)
13+
allow(Rdkafka::Bindings).to receive(:rd_kafka_destroy)
14+
allow(Thread).to receive(:new).and_return(thread)
15+
16+
allow(thread).to receive(:[]=).with(:closing, anything)
17+
allow(thread).to receive(:join)
18+
allow(thread).to receive(:abort_on_exception=).with(anything)
19+
end
20+
21+
context "defaults" do
22+
it "sets the thread to abort on exception" do
23+
expect(thread).to receive(:abort_on_exception=).with(true)
24+
25+
client
26+
end
27+
28+
it "sets the thread `closing` flag to false" do
29+
expect(thread).to receive(:[]=).with(:closing, false)
30+
31+
client
32+
end
33+
end
34+
35+
context "the polling thread" do
36+
it "is created" do
37+
expect(Thread).to receive(:new)
38+
39+
client
40+
end
41+
42+
it "polls the native with default 250ms timeout" do
43+
polling_loop_expects do
44+
expect(Rdkafka::Bindings).to receive(:rd_kafka_poll).with(native, 250)
45+
end
46+
end
47+
48+
it "check the out queue of native client" do
49+
polling_loop_expects do
50+
expect(Rdkafka::Bindings).to receive(:rd_kafka_outq_len).with(native)
51+
end
52+
end
53+
end
54+
55+
def polling_loop_expects(&block)
56+
Thread.current[:closing] = true # this forces the loop break with line #12
57+
58+
allow(Thread).to receive(:new).and_yield do |_|
59+
block.call
60+
end.and_return(thread)
61+
62+
client
63+
end
64+
65+
it "exposes `native` client" do
66+
expect(client.native).to eq(native)
67+
end
68+
69+
context "when client was not yet closed (`nil`)" do
70+
it "is not closed" do
71+
expect(client.closed?).to eq(false)
72+
end
73+
74+
context "and attempt to close" do
75+
it "calls the `destroy` binding" do
76+
expect(Rdkafka::Bindings).to receive(:rd_kafka_destroy).with(native)
77+
78+
client.close
79+
end
80+
81+
it "indicates to the polling thread that it is closing" do
82+
expect(thread).to receive(:[]=).with(:closing, true)
83+
84+
client.close
85+
end
86+
87+
it "joins the polling thread" do
88+
expect(thread).to receive(:join)
89+
90+
client.close
91+
end
92+
93+
it "closes and unassign the native client" do
94+
client.close
95+
96+
expect(client.native).to eq(nil)
97+
expect(client.closed?).to eq(true)
98+
end
99+
end
100+
end
101+
102+
context "when client was already closed" do
103+
before { client.close }
104+
105+
it "is closed" do
106+
expect(client.closed?).to eq(true)
107+
end
108+
109+
context "and attempt to close again" do
110+
it "does not call the `destroy` binding" do
111+
expect(Rdkafka::Bindings).not_to receive(:rd_kafka_destroy)
112+
113+
client.close
114+
end
115+
116+
it "does not indicate to the polling thread that it is closing" do
117+
expect(thread).not_to receive(:[]=).with(:closing, true)
118+
119+
client.close
120+
end
121+
122+
it "does not join the polling thread" do
123+
expect(thread).not_to receive(:join)
124+
125+
client.close
126+
end
127+
128+
it "does not close and unassign the native client again" do
129+
client.close
130+
131+
expect(client.native).to eq(nil)
132+
expect(client.closed?).to eq(true)
133+
end
134+
end
135+
end
136+
137+
it "provide a finalizer Proc that closes the `native` client" do
138+
expect(client.closed?).to eq(false)
139+
140+
client.finalizer.call("some-ignored-object-id")
141+
142+
expect(client.closed?).to eq(true)
143+
end
144+
end

0 commit comments

Comments
 (0)