From d395f7c6ea6773b211157de9052d1287a396aa37 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Mon, 15 Sep 2025 23:58:19 +0300 Subject: [PATCH] chore(regtest): Optimize tests a little Signed-off-by: Vladislav Oleshko --- tests/dragonfly/conftest.py | 10 - tests/dragonfly/list_family_test.py | 22 -- tests/dragonfly/pymemcached_test.py | 323 +++++++++++++--------------- tests/dragonfly/replication_test.py | 261 +++++++++------------- tests/dragonfly/utility.py | 2 +- 5 files changed, 251 insertions(+), 367 deletions(-) delete mode 100644 tests/dragonfly/list_family_test.py diff --git a/tests/dragonfly/conftest.py b/tests/dragonfly/conftest.py index f8904c9e1d3a..811941897d97 100644 --- a/tests/dragonfly/conftest.py +++ b/tests/dragonfly/conftest.py @@ -17,7 +17,6 @@ from time import sleep from typing import Dict, List, Union -import pymemcache import pytest import pytest_asyncio import redis @@ -314,15 +313,6 @@ def port_picker(): yield PortPicker() -@pytest.fixture(scope="function") -def memcached_client(df_server: DflyInstance): - client = pymemcache.Client(f"127.0.0.1:{df_server.mc_port}", default_noreply=False) - - yield client - - client.flush_all() - - @pytest.fixture(scope="session") def with_tls_ca_cert_args(tmp_dir): ca_key = os.path.join(tmp_dir, "ca-key.pem") diff --git a/tests/dragonfly/list_family_test.py b/tests/dragonfly/list_family_test.py deleted file mode 100644 index 12e02b861ed1..000000000000 --- a/tests/dragonfly/list_family_test.py +++ /dev/null @@ -1,22 +0,0 @@ -import asyncio -from redis import asyncio as aioredis - -import pytest - - -@pytest.mark.parametrize("index", range(50)) -class TestBlPop: - async def async_blpop(client: aioredis.Redis): - return await client.blpop(["list1{t}", "list2{t}", "list2{t}", "list1{t}"], 0.5) - - async def blpop_mult_keys(async_client: aioredis.Redis, key: str, val: str): - task = asyncio.create_task(TestBlPop.async_blpop(async_client)) - await async_client.lpush(key, val) - result = await asyncio.wait_for(task, 3) - assert result[1] == val - watched = await async_client.execute_command("DEBUG WATCHED") - assert watched == ["awaked", [], "watched", []] - - async def test_blpop_multiple_keys(self, async_client: aioredis.Redis, index): - await TestBlPop.blpop_mult_keys(async_client, "list1{t}", "a") - await TestBlPop.blpop_mult_keys(async_client, "list2{t}", "b") diff --git a/tests/dragonfly/pymemcached_test.py b/tests/dragonfly/pymemcached_test.py index ddf72bb917b9..47e6f16244f8 100644 --- a/tests/dragonfly/pymemcached_test.py +++ b/tests/dragonfly/pymemcached_test.py @@ -3,180 +3,161 @@ import socket import ssl import time +import pytest +import pymemcache from pymemcache.client.base import Client as MCClient from . import dfly_args from .instance import DflyInstance -DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4} - -# Generic basic tests - - -@dfly_args(DEFAULT_ARGS) -def test_basic(memcached_client: MCClient): - assert not memcached_client.default_noreply - - # set -> replace -> add -> get - assert memcached_client.set("key1", "value1") - assert memcached_client.replace("key1", "value2") - assert not memcached_client.add("key1", "value3") - assert memcached_client.get("key1") == b"value2" - - # add -> get - assert memcached_client.add("key2", "value1") - assert memcached_client.get("key2") == b"value1" - - # delete - assert memcached_client.delete("key1") - assert not memcached_client.delete("key3") - assert memcached_client.get("key1") is None - - # prepend append - assert memcached_client.set("key4", "B") - assert memcached_client.prepend("key4", "A") - assert memcached_client.append("key4", "C") - assert memcached_client.get("key4") == b"ABC" - - # incr - memcached_client.set("key5", 0) - assert memcached_client.incr("key5", 1) == 1 - assert memcached_client.incr("key5", 1) == 2 - assert memcached_client.decr("key5", 1) == 1 - - assert memcached_client.gets("key5") == (b"1", b"0") - - -# Noreply (and pipeline) tests - - -@dfly_args(DEFAULT_ARGS) -async def test_noreply_pipeline(df_server: DflyInstance, memcached_client: MCClient): - """ - With the noreply option the python client doesn't wait for replies, - so all the commands are pipelined. Assert pipelines work correctly and the - succeeding regular command receives a reply (it should join the pipeline as last). - """ - - client = df_server.client() - for attempts in range(2): - keys = [f"k{i}" for i in range(1000)] - values = [f"d{i}" for i in range(len(keys))] - - for k, v in zip(keys, values): - memcached_client.set(k, v, noreply=True) - - # quick follow up before the pipeline finishes - assert memcached_client.get("k10") == b"d10" - # check all commands were executed - assert memcached_client.get_many(keys) == {k: v.encode() for k, v in zip(keys, values)} - - info = await client.info() - if info["total_pipelined_commands"] > 100: - return - logging.warning( - f"Have not identified pipelining at attempt {attempts} Info: \n" + str(info) - ) - await client.flushall() - - assert False, "Pipelining not detected" - - -@dfly_args(DEFAULT_ARGS) -def test_noreply_alternating(memcached_client: MCClient): - """ - Assert alternating noreply works correctly, will cause many dispatch queue emptyings. - """ - for i in range(200): - if i % 2 == 0: - memcached_client.set(f"k{i}", "D1", noreply=True) - memcached_client.set(f"k{i}", "D2", noreply=True) - memcached_client.set(f"k{i}", "D3", noreply=True) - assert memcached_client.add(f"k{i}", "DX", noreply=False) == (i % 2 != 0) - - -# Raw connection tests - - -@dfly_args(DEFAULT_ARGS) -def test_length_in_set_command(df_server: DflyInstance, memcached_client: MCClient): - """ - Test parser correctly reads value based on length and complains about bad chunks - """ - client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client.connect(("127.0.0.1", int(df_server["memcached_port"]))) - - cases = [b"NOTFOUR", b"FOUR", b"F4\r\n", b"\r\n\r\n"] - - # TODO: \r\n hangs - - for case in cases: - print("case", case) - client.sendall(b"set foo 0 0 4\r\n" + case + b"\r\n") - response = client.recv(256) - if len(case) == 4: - assert response == b"STORED\r\n" - else: - assert response == b"CLIENT_ERROR bad data chunk\r\n" - - client.close() - - -# Auxiliary tests - - -@dfly_args(DEFAULT_ARGS) -def test_large_request(memcached_client): - assert memcached_client.set(b"key1", b"d" * 4096, noreply=False) - assert memcached_client.set(b"key2", b"d" * 4096 * 2, noreply=False) - - -@dfly_args(DEFAULT_ARGS) -def test_version(memcached_client: MCClient): - """ - php-memcached client expects version to be in the format of "n.n.n", so we return 1.5.0 emulating an old memcached server. - Our real version is being returned in the stats command. - Also verified manually that php client parses correctly the version string that ends with "DF". - """ - assert b"1.6.0 DF" == memcached_client.version() - stats = memcached_client.stats() - version = stats[b"version"].decode("utf-8") - assert version.startswith("v") or version == "dev" - - -@dfly_args(DEFAULT_ARGS) -def test_flags(memcached_client: MCClient): - for i in range(1, 20): - flags = random.randrange(50, 1000) - memcached_client.set("a", "real-value", flags=flags, noreply=True) - - res = memcached_client.raw_command("get a", "END\r\n").split() - # workaround sometimes memcached_client.raw_command returns empty str - if len(res) > 0: - assert res[2].decode() == str(flags) - - -@dfly_args(DEFAULT_ARGS) -def test_expiration(memcached_client: MCClient): - assert not memcached_client.default_noreply - - assert memcached_client.set("key1", "value1", 2) - assert memcached_client.set("key2", "value2", int(time.time()) + 2) - assert memcached_client.set("key3", "value3", int(time.time()) + 200) - assert memcached_client.get("key1") == b"value1" - assert memcached_client.get("key2") == b"value2" - assert memcached_client.get("key3") == b"value3" - assert memcached_client.set("key3", "value3", int(time.time()) - 200) - assert memcached_client.get("key3") == None - time.sleep(2) - assert memcached_client.get("key1") == None - assert memcached_client.get("key2") == None - assert memcached_client.get("key3") == None - - -@dfly_args(DEFAULT_ARGS) +@dfly_args({"memcached_port": 11211, "proactor_threads": 4}) +class TestBasic: + @pytest.fixture(scope="function") + def memcached_client(self, df_server: DflyInstance): + client = pymemcache.Client(f"127.0.0.1:{df_server.mc_port}", default_noreply=False) + yield client + client.flush_all() + + def test_getset(self, memcached_client: MCClient): + assert not memcached_client.default_noreply + + # set -> replace -> add -> get + assert memcached_client.set("key1", "value1") + assert memcached_client.replace("key1", "value2") + assert not memcached_client.add("key1", "value3") + assert memcached_client.get("key1") == b"value2" + + # add -> get + assert memcached_client.add("key2", "value1") + assert memcached_client.get("key2") == b"value1" + + # delete + assert memcached_client.delete("key1") + assert not memcached_client.delete("key3") + assert memcached_client.get("key1") is None + + # prepend append + assert memcached_client.set("key4", "B") + assert memcached_client.prepend("key4", "A") + assert memcached_client.append("key4", "C") + assert memcached_client.get("key4") == b"ABC" + + # incr + memcached_client.set("key5", 0) + assert memcached_client.incr("key5", 1) == 1 + assert memcached_client.incr("key5", 1) == 2 + assert memcached_client.decr("key5", 1) == 1 + + assert memcached_client.gets("key5") == (b"1", b"0") + + async def test_noreply_pipeline(self, df_server: DflyInstance, memcached_client: MCClient): + """ + With the noreply option the python client doesn't wait for replies, + so all the commands are pipelined. Assert pipelines work correctly and the + succeeding regular command receives a reply (it should join the pipeline as last). + """ + + client = df_server.client() + for attempts in range(2): + keys = [f"k{i}" for i in range(1000)] + values = [f"d{i}" for i in range(len(keys))] + + for k, v in zip(keys, values): + memcached_client.set(k, v, noreply=True) + + # quick follow up before the pipeline finishes + assert memcached_client.get("k10") == b"d10" + # check all commands were executed + assert memcached_client.get_many(keys) == {k: v.encode() for k, v in zip(keys, values)} + + info = await client.info() + if info["total_pipelined_commands"] > 100: + return + logging.warning( + f"Have not identified pipelining at attempt {attempts} Info: \n" + str(info) + ) + await client.flushall() + + assert False, "Pipelining not detected" + + def test_noreply_alternating(self, memcached_client: MCClient): + """ + Assert alternating noreply works correctly, will cause many dispatch queue emptyings. + """ + for i in range(200): + if i % 2 == 0: + memcached_client.set(f"k{i}", "D1", noreply=True) + memcached_client.set(f"k{i}", "D2", noreply=True) + memcached_client.set(f"k{i}", "D3", noreply=True) + assert memcached_client.add(f"k{i}", "DX", noreply=False) == (i % 2 != 0) + + def test_length_in_set_command(self, df_server: DflyInstance, memcached_client: MCClient): + """ + Test parser correctly reads value based on length and complains about bad chunks + """ + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect(("127.0.0.1", int(df_server["memcached_port"]))) + + cases = [b"NOTFOUR", b"FOUR", b"F4\r\n", b"\r\n\r\n"] + + # TODO: \r\n hangs + + for case in cases: + print("case", case) + client.sendall(b"set foo 0 0 4\r\n" + case + b"\r\n") + response = client.recv(256) + if len(case) == 4: + assert response == b"STORED\r\n" + else: + assert response == b"CLIENT_ERROR bad data chunk\r\n" + + client.close() + + def test_large_request(self, memcached_client): + assert memcached_client.set(b"key1", b"d" * 4096, noreply=False) + assert memcached_client.set(b"key2", b"d" * 4096 * 2, noreply=False) + + def test_version(self, memcached_client: MCClient): + """ + php-memcached client expects version to be in the format of "n.n.n", so we return 1.5.0 emulating an old memcached server. + Our real version is being returned in the stats command. + Also verified manually that php client parses correctly the version string that ends with "DF". + """ + assert b"1.6.0 DF" == memcached_client.version() + stats = memcached_client.stats() + version = stats[b"version"].decode("utf-8") + assert version.startswith("v") or version == "dev" + + def test_flags(self, memcached_client: MCClient): + for i in range(1, 20): + flags = random.randrange(50, 1000) + memcached_client.set("a", "real-value", flags=flags, noreply=True) + + res = memcached_client.raw_command("get a", "END\r\n").split() + # workaround sometimes memcached_client.raw_command returns empty str + if len(res) > 0: + assert res[2].decode() == str(flags) + + def test_expiration(self, memcached_client: MCClient): + assert not memcached_client.default_noreply + + assert memcached_client.set("key1", "value1", 2) + assert memcached_client.set("key2", "value2", int(time.time()) + 2) + assert memcached_client.set("key3", "value3", int(time.time()) + 200) + assert memcached_client.get("key1") == b"value1" + assert memcached_client.get("key2") == b"value2" + assert memcached_client.get("key3") == b"value3" + assert memcached_client.set("key3", "value3", int(time.time()) - 200) + assert memcached_client.get("key3") == None + time.sleep(2) + assert memcached_client.get("key1") == None + assert memcached_client.get("key2") == None + assert memcached_client.get("key3") == None + + +@dfly_args({"memcached_port": 11211, "proactor_threads": 4}) def test_memcached_tls_no_requirepass(df_factory, with_tls_server_args, with_tls_ca_cert_args): """ Test for issue #5084: ability to use TLS for Memcached without requirepass. @@ -185,16 +166,10 @@ def test_memcached_tls_no_requirepass(df_factory, with_tls_server_args, with_tls does not support password authentication. This test verifies that we can start the server with TLS enabled but without specifying requirepass and with the Memcached port. """ - # Create arguments for TLS without specifying requirepass - server_args = {**DEFAULT_ARGS, **with_tls_server_args, "requirepass": "test_password"} - # Create and start the server - it should not crash - server = df_factory.create(**server_args) + server = df_factory.create(**{**with_tls_server_args, "requirepass": "test_password"}) server.start() - # Give the server time to start - time.sleep(1) - # Create SSL context for client ssl_context = ssl.create_default_context() ssl_context.load_verify_locations(with_tls_ca_cert_args["ca_cert"]) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 01d4c149bc1b..fe917b2e9259 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1207,43 +1207,6 @@ async def test_flushall_in_full_sync(df_factory): assert new_syncid != syncid -""" -Test read-only scripts work with replication. EVAL_RO and the 'no-writes' flags are currently not supported. -""" - -READONLY_SCRIPT = """ -redis.call('GET', 'A') -redis.call('EXISTS', 'B') -return redis.call('GET', 'WORKS') -""" - -WRITE_SCRIPT = """ -redis.call('SET', 'A', 'ErrroR') -""" - - -async def test_readonly_script(df_factory): - master = df_factory.create(proactor_threads=2) - replica = df_factory.create(proactor_threads=2) - - df_factory.start_all([master, replica]) - - c_master = master.client() - c_replica = replica.client() - - await c_master.set("WORKS", "YES") - - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - await wait_available_async(c_replica) - - await c_replica.eval(READONLY_SCRIPT, 3, "A", "B", "WORKS") == "YES" - - with pytest.raises(aioredis.ResponseError) as roe: - await c_replica.eval(WRITE_SCRIPT, 1, "A") - - assert "READONLY " in str(roe) - - take_over_cases = [ [2, 2], [2, 4], @@ -1644,6 +1607,13 @@ async def test_replicaof_flag(df_factory): val = await c_replica.get("KEY") assert "VALUE" == val + # Test disconnect + + await c_replica.replicaof("no", "one") # disconnect + + role = await c_replica.role() + assert role[0] == "master" + async def test_replicaof_flag_replication_waits(df_factory): # tests --replicaof works when we launch replication before the master @@ -1688,72 +1658,6 @@ async def test_replicaof_flag_replication_waits(df_factory): assert "VALUE" == val -async def test_replicaof_flag_disconnect(df_factory): - # test stopping replication when started using --replicaof - master = df_factory.create( - proactor_threads=2, - ) - - # set up master - master.start() - c_master = master.client() - await wait_available_async(c_master) - - await c_master.set("KEY", "VALUE") - db_size = await c_master.dbsize() - assert 1 == db_size - - replica = df_factory.create( - proactor_threads=2, - replicaof=f"localhost:{master.port}", # start to replicate master - ) - - # set up replica. check that it is replicating - replica.start() - - c_replica = replica.client() - await wait_available_async(c_replica) - await check_all_replicas_finished([c_replica], c_master) - - dbsize = await c_replica.dbsize() - assert 1 == dbsize - - val = await c_replica.get("KEY") - assert "VALUE" == val - - await c_replica.replicaof("no", "one") # disconnect - - role = await c_replica.role() - assert role[0] == "master" - - -async def test_df_crash_on_memcached_error(df_factory): - master = df_factory.create( - memcached_port=11211, - proactor_threads=2, - ) - - replica = df_factory.create( - memcached_port=master.mc_port + 1, - proactor_threads=2, - ) - - master.start() - replica.start() - - c_master = master.client() - await wait_available_async(c_master) - - c_replica = replica.client() - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - await wait_available_async(c_replica) - - memcached_client = pymemcache.Client(f"127.0.0.1:{replica.mc_port}") - - with pytest.raises(pymemcache.exceptions.MemcacheServerError): - memcached_client.set("key", "data", noreply=False) - - async def test_df_crash_on_replicaof_flag(df_factory): master = df_factory.create( proactor_threads=2, @@ -2626,43 +2530,6 @@ async def check_if_empty(): assert await c_replica.execute_command(f"dbsize") == 0 -async def test_replicating_mc_flags(df_factory): - master = df_factory.create(memcached_port=11211, proactor_threads=1) - replica = df_factory.create( - memcached_port=11212, proactor_threads=1, dbfilename=f"dump_{tmp_file_name()}" - ) - df_factory.start_all([master, replica]) - - c_mc_master = pymemcache.Client(f"127.0.0.1:{master.mc_port}", default_noreply=False) - - c_replica = replica.client() - - assert c_mc_master.set("key1", "value0", noreply=True) - assert c_mc_master.set("key2", "value2", noreply=True, expire=3600, flags=123456) - assert c_mc_master.replace("key1", "value1", expire=4000, flags=2, noreply=True) - - c_master = master.client() - for i in range(3, 100): - await c_master.set(f"key{i}", f"value{i}") - - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - await wait_available_async(c_replica) - - c_mc_replica = pymemcache.Client(f"127.0.0.1:{replica.mc_port}", default_noreply=False) - - async def check_flag(key, flag): - res = c_mc_replica.raw_command("get " + key, "END\r\n").split() - # workaround sometimes memcached_client.raw_command returns empty str - if len(res) > 2: - assert res[2].decode() == str(flag) - - await check_flag("key1", 2) - await check_flag("key2", 123456) - - for i in range(1, 100): - assert c_mc_replica.get(f"key{i}") == str.encode(f"value{i}") - - async def test_double_take_over(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=4, dbfilename="", admin_port=ADMIN_PORT) replica = df_factory.create(proactor_threads=4, dbfilename="", admin_port=ADMIN_PORT + 1) @@ -3225,26 +3092,6 @@ async def test_partial_replication_on_same_source_master_with_replica_lsn_inc(df assert len(lines) == 1 -async def test_replicate_hset_with_expiry(df_factory: DflyInstanceFactory): - master = df_factory.create(proactor_threads=2) - replica = df_factory.create(proactor_threads=2) - - master.start() - replica.start() - - cm = master.client() - await cm.execute_command("HSETEX key 86400 name 1234") - - cr = replica.client() - await cr.execute_command(f"REPLICAOF localhost {master.port}") - await wait_available_async(cr) - - result = await cr.hgetall("key") - - assert "name" in result - assert result["name"] == "1234" - - async def test_bug_5221(df_factory): master = df_factory.create( proactor_threads=1, @@ -3514,3 +3361,97 @@ async def get_memory(client, field): line = lines[0] peak_bytes = extract_int_after_prefix("Serialization peak bytes: ", line) assert peak_bytes < value_size + + +@dfly_args({"proactor_threads": 2}) +class Test1To1Integrity: + """ + Test class for tests that use a single replica-master connection without any specific options. + Fits best for testing integrity of simple values and behaviour. + """ + + @pytest.fixture(scope="class") + def master(self, df_factory): + return df_factory.create(memcached_port=11211) + + @pytest.fixture(scope="class") + def replica(self, df_factory): + return df_factory.create(memcached_port=11212) + + @pytest.fixture(scope="class") + def instances(self, df_factory, master, replica): + df_factory.start_all([master, replica]) + + @pytest.fixture(scope="function") + async def do_replicate(self, instances, replica, master): + c_replica = replica.client() + + async def connect(): + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + yield connect + + await c_replica.execute_command("REPLICAOF NO ONE") + await asyncio.gather(*(master.client().flushall(), c_replica.flushall())) + + async def test_readonly_script(self, master, replica, do_replicate): + READONLY_SCRIPT = """ + redis.call('GET', 'A') + redis.call('EXISTS', 'B') + return redis.call('GET', 'WORKS') + """ + + WRITE_SCRIPT = """ + redis.call('SET', 'A', 'ErrroR') + """ + c_replica = replica.client() + await master.client().set("WORKS", "YES") + + await do_replicate() + + assert await c_replica.eval(READONLY_SCRIPT, 3, "A", "B", "WORKS") == "YES" + with pytest.raises(aioredis.ResponseError) as roe: + await c_replica.eval(WRITE_SCRIPT, 1, "A") + assert "READONLY " in str(roe) + + async def test_hset_with_expiry(self, master, replica, do_replicate): + await master.client().execute_command("HSETEX key 86400 name 1234") + + await do_replicate() + + assert await replica.client().hgetall("key") == {"name": "1234"} + + async def test_df_crash_on_memcached_error(self, master, replica, do_replicate): + await do_replicate() + + memcached_client = pymemcache.Client(f"127.0.0.1:{replica.mc_port}") + with pytest.raises(pymemcache.exceptions.MemcacheServerError): + memcached_client.set("key", "data", noreply=False) + + async def test_replicating_mc_flags(self, master, replica, do_replicate): + c_mc_master = pymemcache.Client(f"127.0.0.1:{master.mc_port}", default_noreply=False) + + assert c_mc_master.set("key1", "value0", noreply=True) + assert c_mc_master.set("key2", "value2", noreply=True, expire=3600, flags=123456) + assert c_mc_master.replace("key1", "value1", expire=4000, flags=2, noreply=True) + + c_master = master.client() + for i in range(3, 100): + await c_master.set(f"key{i}", f"value{i}") + + await do_replicate() + + c_mc_replica = pymemcache.Client(f"127.0.0.1:{replica.mc_port}", default_noreply=False) + + async def check_flag(key, flag): + res = c_mc_replica.raw_command("get " + key, "END\r\n").split() + # workaround sometimes memcached_client.raw_command returns empty str + if len(res) > 2: + assert res[2].decode() == str(flag) + + await check_flag("key1", 2) + await check_flag("key2", 123456) + + for i in range(1, 100): + assert c_mc_replica.get(f"key{i}") == str.encode(f"value{i}") diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index a7e80e43ce7e..01ffaf71ed5e 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -118,7 +118,7 @@ async def wait_available_async( raise TimeoutError("Timed out!") # Secondly for replicas, we make sure they reached stable state replicaton - async for info, breaker in info_tick_timer(clients, "REPLICATION", timeout=timeout): + async for info, breaker in info_tick_timer(clients, "REPLICATION", timeout=timeout, step=0.05): with breaker: assert info["role"] == "master" or "slave_repl_offset" in info, info