Skip to content

Commit 6ca6ea6

Browse files
committed
update system monitoring
1 parent c1d3768 commit 6ca6ea6

File tree

2 files changed

+331
-265
lines changed

2 files changed

+331
-265
lines changed
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
from testflows.core import *
2+
from testflows.asserts import error
3+
import json
4+
from .export_fields import (
5+
get_source_database,
6+
get_destination_table,
7+
get_partition_id,
8+
get_parts,
9+
get_parts_count,
10+
get_create_time,
11+
get_source_replica,
12+
get_transaction_id,
13+
get_parts_to_do,
14+
get_export_field,
15+
)
16+
17+
18+
@TestStep(Then)
19+
def verify_export_fields_populated(self, source_table, s3_table_name):
20+
"""Verify that all export fields are populated in system.replicated_partition_exports."""
21+
assert get_source_database(source_table=source_table).output.strip() != "", error()
22+
assert (
23+
get_destination_table(source_table=source_table).output.strip() == s3_table_name
24+
), error()
25+
assert get_partition_id(source_table=source_table).output.strip() != "", error()
26+
assert get_parts(source_table=source_table).output.strip() != "", error()
27+
assert int(get_parts_count(source_table=source_table).output.strip()) > 0, error()
28+
assert get_create_time(source_table=source_table).output.strip() != "", error()
29+
assert get_source_replica(source_table=source_table).output.strip() != "", error()
30+
31+
32+
@TestStep(Then)
33+
def verify_export_status(self, source_table, status, expected_count_min=1):
34+
"""Verify export status in system.replicated_partition_exports."""
35+
status_count = get_export_field(
36+
field_name="COUNT(*)",
37+
source_table=source_table,
38+
select_clause="COUNT(*)",
39+
where_clause=f"status = '{status}'",
40+
)
41+
assert int(status_count.output.strip()) >= expected_count_min, error()
42+
43+
44+
@TestStep(Then)
45+
def verify_parts_to_do_decreases(self, source_table, timeout=60, delay=2):
46+
"""Verify that parts_to_do decreases as export progresses."""
47+
initial_count = int(get_parts_to_do(source_table=source_table).output.strip())
48+
assert initial_count > 0, error()
49+
50+
for attempt in retries(timeout=timeout, delay=delay):
51+
with attempt:
52+
current_count = int(
53+
get_parts_to_do(source_table=source_table).output.strip()
54+
)
55+
assert current_count < initial_count or current_count == 0, error()
56+
57+
58+
@TestStep(Then)
59+
def verify_exports_appear_in_table(self, source_table):
60+
"""Verify that exports appear in system.replicated_partition_exports with PENDING status."""
61+
exports_count = get_export_field(
62+
field_name="COUNT(*)",
63+
source_table=source_table,
64+
select_clause="COUNT(*)",
65+
where_clause="status = 'PENDING'",
66+
)
67+
assert int(exports_count.output.strip()) > 0, error()
68+
69+
70+
@TestStep(Then)
71+
def verify_partition_ids_match(self, source_table, expected_partitions):
72+
"""Verify that partition_ids in the table match expected partitions."""
73+
all_partition_ids = get_export_field(
74+
field_name="partition_id",
75+
source_table=source_table,
76+
select_clause="DISTINCT partition_id",
77+
)
78+
exported_partition_ids = set(
79+
pid.strip()
80+
for pid in all_partition_ids.output.strip().splitlines()
81+
if pid.strip()
82+
)
83+
84+
assert len(exported_partition_ids) == len(expected_partitions), error()
85+
for partition in expected_partitions:
86+
assert partition in exported_partition_ids, error()
87+
88+
89+
@TestStep(Given)
90+
def get_expected_parts_from_table(self, source_table, node=None):
91+
"""Get expected parts from source table system.parts."""
92+
if node is None:
93+
node = self.context.node
94+
95+
from .export_operations import get_partitions
96+
97+
partitions = get_partitions(table_name=source_table, node=node)
98+
expected_parts = set()
99+
for partition in partitions:
100+
parts_result = node.query(
101+
f"SELECT name FROM system.parts WHERE table = '{source_table}' AND partition_id = '{partition}' AND active = 1",
102+
exitcode=0,
103+
steps=True,
104+
)
105+
for part in parts_result.output.strip().splitlines():
106+
expected_parts.add(part.strip())
107+
108+
return expected_parts
109+
110+
111+
@TestStep(Then)
112+
def verify_parts_array_matches(self, source_table, expected_parts):
113+
"""Verify that parts array in the table matches expected parts."""
114+
all_parts_result = get_export_field(
115+
field_name="parts",
116+
source_table=source_table,
117+
select_clause="parts",
118+
)
119+
exported_parts_set = set()
120+
for line in all_parts_result.output.strip().splitlines():
121+
if line.strip():
122+
try:
123+
parts_array = json.loads(line.strip())
124+
if isinstance(parts_array, list):
125+
exported_parts_set.update(parts_array)
126+
except (json.JSONDecodeError, ValueError):
127+
pass
128+
129+
assert len(exported_parts_set) >= len(expected_parts), error()
130+
for part in expected_parts:
131+
assert part in exported_parts_set, error()
132+
133+
134+
@TestStep(Then)
135+
def verify_table_structure_has_fields(
136+
self, table_name="system.replicated_partition_exports", node=None
137+
):
138+
"""Verify that table structure contains all required fields."""
139+
if node is None:
140+
node = self.context.node
141+
142+
structure_result = node.query(
143+
f"DESCRIBE TABLE {table_name}",
144+
exitcode=0,
145+
steps=True,
146+
)
147+
column_names = [
148+
line.split("\t")[0].strip()
149+
for line in structure_result.output.strip().splitlines()
150+
if line.strip()
151+
]
152+
153+
required_fields = [
154+
"database",
155+
"table",
156+
"destination_database",
157+
"destination_table",
158+
"create_time",
159+
"partition_id",
160+
"transaction_id",
161+
"query_id",
162+
"source_replica",
163+
"parts",
164+
"parts_count",
165+
"parts_to_do",
166+
"status",
167+
"exception_replica",
168+
"last_exception",
169+
"exception_part",
170+
"exception_count",
171+
]
172+
173+
alternative_names = {
174+
"database": ["source_database"],
175+
"table": ["source_table"],
176+
}
177+
178+
missing_fields = []
179+
for field in required_fields:
180+
if field not in column_names:
181+
if field in alternative_names:
182+
found_alternative = any(
183+
alt in column_names for alt in alternative_names[field]
184+
)
185+
if not found_alternative:
186+
missing_fields.append(field)
187+
else:
188+
missing_fields.append(field)
189+
190+
assert len(missing_fields) == 0, error(
191+
f"Missing required fields: {missing_fields}. Available columns: {column_names}"
192+
)
193+
194+
195+
@TestStep(Then)
196+
def verify_all_fields_populated(self, source_table, node=None, timeout=30, delay=2):
197+
"""Verify that all required fields are populated after export."""
198+
if node is None:
199+
node = self.context.node
200+
201+
for retry in retries(timeout=timeout, delay=delay):
202+
with retry:
203+
result = node.query(
204+
f"SELECT source_database, source_table, destination_database, destination_table, "
205+
f"create_time, partition_id, transaction_id, query_id, source_replica, "
206+
f"parts, parts_count, parts_to_do, status, exception_replica, "
207+
f"last_exception, exception_part, exception_count "
208+
f"FROM system.replicated_partition_exports "
209+
f"WHERE source_table = '{source_table}' LIMIT 1",
210+
exitcode=0,
211+
steps=True,
212+
)
213+
assert result.output.strip() != "", error(
214+
"Fields should be populated after export"
215+
)
216+
217+
218+
@TestStep(Then)
219+
def verify_transaction_id_populated(self, source_table):
220+
"""Verify that transaction_id is populated for export operations."""
221+
assert get_transaction_id(source_table=source_table).output.strip() != "", error()
222+
223+
224+
@TestStep(Then)
225+
def verify_active_exports_limited(self, source_table, max_count):
226+
"""Verify that the number of active exports is limited."""
227+
active_exports = get_export_field(
228+
field_name="COUNT(*)",
229+
source_table=source_table,
230+
select_clause="COUNT(*)",
231+
where_clause="status = 'PENDING' OR status = 'COMPLETED'",
232+
)
233+
active_count = int(active_exports.output.strip())
234+
assert active_count <= max_count, error()

0 commit comments

Comments
 (0)