Skip to content

Commit 465cc1c

Browse files
authored
Add test case to verify zmq reconnect behaviour (#84) (#1052)
This PR is a partial duplicate of sonic-net/sonic-dash-ha#84 Since crates/swss-common will be moved in this repo. why During investigating issue #75, I created test case to verify zmq behaviour in handling no connection and connection loss. It is found that with the current swss-common zmq implementation with below attributes, we don't need to handle reconnect explicitly. PUSH/PULL model ZMQ_IMMEDIATE = 0 (default) ZMQ_SNDHWM = 10000 what this PR does add unit test for zmq late connect and reconnect for regression remove unneeded code for falling back to ProducerStateTable in test environment since zmq client won't fail if zmq server is not connected. update test_utils/README.md with missing argument to start redis in test environment
1 parent 93af927 commit 465cc1c

File tree

1 file changed

+41
-0
lines changed

1 file changed

+41
-0
lines changed

crates/swss-common/tests/sync.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,47 @@ fn zmq_consumer_producer_state_tables_sync_api_basic_test() -> Result<(), Except
190190
Ok(())
191191
}
192192

193+
// Below test covers 2 scenarios:
194+
// 1. late connect when zmq server is started after client sending messages.
195+
// 2. reconnect when zmq server is stopped and restarted. messages from client during
196+
// the time should be queued by client and resent when server is restarted.
197+
#[test]
198+
fn zmq_consumer_producer_state_tables_sync_api_connect_late_reconnect() -> Result<(), Exception> {
199+
use SelectResult::*;
200+
enum TestPhase {
201+
LateConnect,
202+
Reconnect,
203+
}
204+
let (endpoint, _delete) = random_zmq_endpoint();
205+
206+
let zmqc = ZmqClient::new(&endpoint)?;
207+
let redis = Redis::start();
208+
let zpst = ZmqProducerStateTable::new(redis.db_connector(), "table_a", zmqc, false)?;
209+
210+
for _ in [TestPhase::LateConnect, TestPhase::Reconnect] {
211+
let kfvs = random_kfvs();
212+
for kfv in &kfvs {
213+
match kfv.operation {
214+
KeyOperation::Set => zpst.set(&kfv.key, kfv.field_values.clone())?,
215+
KeyOperation::Del => zpst.del(&kfv.key)?,
216+
}
217+
}
218+
219+
let mut zmqs = ZmqServer::new(&endpoint)?;
220+
let zcst = ZmqConsumerStateTable::new(redis.db_connector(), "table_a", &mut zmqs, None, None)?;
221+
let mut kfvs_seen = Vec::new();
222+
while kfvs_seen.len() != kfvs.len() {
223+
assert_eq!(zcst.read_data(Duration::from_millis(2000), true)?, Data);
224+
kfvs_seen.extend(zcst.pops()?);
225+
}
226+
assert_eq!(kfvs, kfvs_seen);
227+
drop(zcst);
228+
drop(zmqs);
229+
}
230+
231+
Ok(())
232+
}
233+
193234
#[test]
194235
fn table_sync_api_basic_test() -> Result<(), Exception> {
195236
let redis = Redis::start();

0 commit comments

Comments
 (0)