Skip to content

Commit 01cda4f

Browse files
authored
feat: add flag pause_wait_timeout (#5328)
* feat: add flag pause_wait_timeout
1 parent f2a5613 commit 01cda4f

File tree

2 files changed

+8
-7
lines changed

2 files changed

+8
-7
lines changed

src/server/server_family.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,10 @@ ABSL_FLAG(int32_t, slowlog_log_slower_than, 10000,
112112
"microseconds and if it's negative - disables the slowlog.");
113113
ABSL_FLAG(uint32_t, slowlog_max_len, 20, "Slow log maximum length.");
114114

115+
ABSL_FLAG(uint32_t, pause_wait_timeout, 1,
116+
"Timeout in seconds, to set up the pause for all connections for CLIENT PAUSE command "
117+
"and cluster slot migration finalization procedure.");
118+
115119
ABSL_FLAG(string, s3_endpoint, "", "endpoint for s3 snapshots, default uses aws regional endpoint");
116120
ABSL_FLAG(bool, s3_use_https, true, "whether to use https for s3 endpoints");
117121
// Disable EC2 metadata by default, or if a users credentials are invalid the
@@ -751,7 +755,7 @@ std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namesp
751755

752756
// Wait for all busy commands to finish running before replying to guarantee
753757
// that no more (write) operations will occur.
754-
const absl::Duration kDispatchTimeout = absl::Seconds(1);
758+
const absl::Duration kDispatchTimeout = absl::Seconds(absl::GetFlag(FLAGS_pause_wait_timeout));
755759
if (!tracker.Wait(kDispatchTimeout)) {
756760
LOG(WARNING) << "Couldn't wait for commands to finish dispatching in " << kDispatchTimeout;
757761
shard_set->pool()->AwaitBrief([pause_state](unsigned, util::ProactorBase*) {

tests/dragonfly/cluster_test.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,8 +1810,7 @@ async def test_cluster_replication_migration(
18101810
assert await seeder.compare(fake_capture, r1_node.instance.port)
18111811

18121812

1813-
@pytest.mark.skip("Flaky test")
1814-
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
1813+
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "pause_wait_timeout": 10})
18151814
async def test_start_replication_during_migration(
18161815
df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory
18171816
):
@@ -2561,8 +2560,7 @@ async def test_replicate_redis_cluster(redis_cluster, df_factory, df_seeder_fact
25612560
assert await seeder.compare(capture, replica.port)
25622561

25632562

2564-
@pytest.mark.skip("Flaky test")
2565-
@dfly_args({"proactor_threads": 4})
2563+
@dfly_args({"proactor_threads": 4, "pause_wait_timeout": 10})
25662564
async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_seeder_factory):
25672565
"""
25682566
Create redis cluster of 3 nodes.
@@ -2859,11 +2857,10 @@ async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_s
28592857
"""
28602858

28612859

2862-
@pytest.mark.skip("Flaky test")
28632860
@pytest.mark.slow
28642861
@pytest.mark.exclude_epoll
28652862
@pytest.mark.asyncio
2866-
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
2863+
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "pause_wait_timeout": 10})
28672864
async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seeder_factory):
28682865
# 1. Create cluster of 3 nodes with all slots allocated to first node.
28692866
instances = [

0 commit comments

Comments
 (0)