Skip to content

Commit bba1d98

Browse files
committed
Add test using the ZMQ_CONFLATE option receiver-side
1 parent f4ad565 commit bba1d98

File tree

1 file changed

+36
-0
lines changed

1 file changed

+36
-0
lines changed

tests/test.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,42 @@ test!(test_raw_roundtrip, {
117117
let _ = unsafe { Socket::from_raw(raw) };
118118
});
119119

120+
// The `conflate` option limits the buffer size to one; let's see if we can get
121+
// messages (unreliably) across the connection.
122+
test!(test_conflating_receiver, {
123+
use std::sync::{
124+
atomic::{AtomicBool, Ordering},
125+
Arc,
126+
};
127+
128+
let ctx = zmq::Context::new();
129+
let receiver = ctx.socket(zmq::PULL).unwrap();
130+
receiver.bind("tcp://127.0.0.1:*").unwrap();
131+
let receiver_endpoint = receiver.get_last_endpoint().unwrap().unwrap();
132+
133+
let stop = Arc::new(AtomicBool::new(false));
134+
let sender_thread = {
135+
let stop = Arc::clone(&stop);
136+
std::thread::spawn(move || {
137+
let sender = ctx.socket(zmq::PUSH).unwrap();
138+
sender.connect(&receiver_endpoint).unwrap();
139+
while !stop.load(Ordering::SeqCst) {
140+
sender.send("bar", 0).expect("send failed");
141+
}
142+
})
143+
};
144+
145+
receiver
146+
.set_conflate(true)
147+
.expect("could not set conflate option");
148+
for _ in 0..100 {
149+
let msg = receiver.recv_bytes(0).unwrap();
150+
assert_eq!(&msg[..], b"bar");
151+
}
152+
stop.store(true, Ordering::SeqCst);
153+
sender_thread.join().expect("could not join sender thread");
154+
});
155+
120156
test!(test_version, {
121157
let (major, _, _) = version();
122158
assert!(major == 3 || major == 4);

0 commit comments

Comments
 (0)