Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ filterwarnings = [
# Ignore a coverage warning when COVERAGE_CORE=sysmon for Pythons < 3.12.
"ignore:sys.monitoring isn't available:coverage.exceptions.CoverageWarning",
]
log_cli_level = "INFO"
log_cli_date_format = "%H:%M:%S:%f"
log_cli = false
log_cli_format = "%(asctime)s %(levelname)s %(threadName)s: %(message)s"
log_level = "INFO"
capture = "yes"

[tool.ruff]
target-version = "py39"
Expand Down
43 changes: 29 additions & 14 deletions tests/test_scenario/maint_notifications_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ def find_target_node_and_empty_node(

# Get all node IDs from CLUSTER NODES section
all_nodes = set()
nodes_with_shards = set()
master_nodes = set()
nodes_with_any_shards = set() # Nodes with shards from ANY database
nodes_with_target_db_shards = set() # Nodes with shards from target database
master_nodes = set() # Master nodes for target database only

for line in lines:
line = line.strip()
Expand Down Expand Up @@ -146,31 +147,45 @@ def find_target_node_and_empty_node(
# Parse shard line: db:1 m-standard redis:1 node:2 master 0-8191 1.4MB OK
parts = line.split()
if len(parts) >= 5:
db_id = parts[0] # db:1, db:2, etc.
node_id = parts[3] # node:2
shard_role = parts[4] # master/slave - this is what matters

nodes_with_shards.add(node_id)
if shard_role == "master":
master_nodes.add(node_id)
# Track ALL nodes with shards (for finding truly empty nodes)
nodes_with_any_shards.add(node_id)

# Only track master nodes for the specific database we're testing
bdb_id = endpoint_config.get("bdb_id")
if db_id == f"db:{bdb_id}":
nodes_with_target_db_shards.add(node_id)
if shard_role == "master":
master_nodes.add(node_id)
elif line.startswith("ENDPOINTS:") or not line:
shards_section_started = False

# Find empty node (node with no shards)
empty_nodes = all_nodes - nodes_with_shards
# Find empty node (node with no shards from ANY database)
nodes_with_no_shards_target_bdb = all_nodes - nodes_with_target_db_shards

logging.debug(f"All nodes: {all_nodes}")
logging.debug(f"Nodes with shards: {nodes_with_shards}")
logging.debug(f"Master nodes: {master_nodes}")
logging.debug(f"Empty nodes: {empty_nodes}")
logging.debug(f"Nodes with shards from any database: {nodes_with_any_shards}")
logging.debug(
f"Nodes with target database shards: {nodes_with_target_db_shards}"
)
logging.debug(f"Master nodes (target database only): {master_nodes}")
logging.debug(
f"Nodes with no shards from target database: {nodes_with_no_shards_target_bdb}"
)

if not empty_nodes:
raise ValueError("No empty nodes (nodes without shards) found")
if not nodes_with_no_shards_target_bdb:
raise ValueError("All nodes have shards from target database")

if not master_nodes:
raise ValueError("No nodes with master shards found")
raise ValueError("No nodes with master shards from target database found")

# Return the first available empty node and master node (numeric part only)
empty_node = next(iter(empty_nodes)).split(":")[1] # node:1 -> 1
empty_node = next(iter(nodes_with_no_shards_target_bdb)).split(":")[
1
] # node:1 -> 1
target_node = next(iter(master_nodes)).split(":")[1] # node:2 -> 2

return target_node, empty_node
Expand Down
33 changes: 9 additions & 24 deletions tests/test_scenario/test_maint_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
datefmt="%Y-%m-%d %H:%M:%S:%f",
)

BIND_TIMEOUT = 30
BIND_TIMEOUT = 60
MIGRATE_TIMEOUT = 60
FAILOVER_TIMEOUT = 15

Expand Down Expand Up @@ -109,29 +109,6 @@ def setup_and_cleanup(
except Exception as e:
logging.error(f"Failed to revert failover: {e}")

if self._migration_executed:
try:
if self.target_node and self.empty_node:
self._execute_migration(
fault_injector_client=fault_injector_client,
endpoints_config=endpoints_config,
target_node=self.empty_node,
empty_node=self.target_node,
)
logging.info("Migration cleanup completed")
except Exception as e:
logging.error(f"Failed to revert migration: {e}")

if self._bind_executed:
try:
if self.endpoint_id:
self._execute_bind(
fault_injector_client, endpoints_config, self.endpoint_id
)
logging.info("Bind cleanup completed")
except Exception as e:
logging.error(f"Failed to revert bind endpoint: {e}")

logging.info("Cleanup finished")

def _execute_failover(
Expand Down Expand Up @@ -916,7 +893,15 @@ def test_new_connections_receive_migrating(
)

migrate_thread.join()
logging.info("Executing rladmin bind endpoint command for cleanup...")

bind_thread = Thread(
target=self._execute_bind,
name="bind_thread",
args=(fault_injector_client, endpoints_config, self.endpoint_id),
)
bind_thread.start()
bind_thread.join()
client_maint_notifications.connection_pool.release(first_conn)
client_maint_notifications.connection_pool.release(second_connection)

Expand Down