|
| 1 | +from testflows.asserts import error |
1 | 2 | from helpers.common import getuid |
2 | 3 | import helpers.config.config_d as config_d |
3 | 4 | from alter.stress.tests.tc_netem import * |
4 | | -from helpers.common import getuid |
5 | 5 | from s3.requirements.export_partition import * |
6 | 6 | from s3.tests.export_part.steps import ( |
7 | 7 | default_columns, |
|
16 | 16 | wait_for_export_to_complete, |
17 | 17 | get_export_field, |
18 | 18 | get_source_database, |
| 19 | + get_destination_database, |
19 | 20 | get_destination_table, |
20 | 21 | get_create_time, |
21 | 22 | get_partition_id, |
|
24 | 25 | get_parts, |
25 | 26 | get_parts_count, |
26 | 27 | get_parts_to_do, |
| 28 | + get_exception_replica, |
| 29 | + get_last_exception, |
| 30 | + get_exception_part, |
| 31 | + get_exception_count, |
27 | 32 | ) |
28 | 33 |
|
29 | 34 |
|
@@ -433,20 +438,118 @@ def concurrent_exports_limit(self, background_move_pool_size): |
433 | 438 | assert active_count <= background_move_pool_size, error() |
434 | 439 |
|
435 | 440 |
|
| 441 | +@TestScenario |
| 442 | +@Requirements(RQ_ClickHouse_ExportPartition_SystemTables_Exports("1.0")) |
| 443 | +def all_required_fields_present(self): |
| 444 | + """Check that system.replicated_partition_exports contains all required fields after export.""" |
| 445 | + |
| 446 | + source_table = f"source_{getuid()}" |
| 447 | + |
| 448 | + with Given("create source and S3 tables"): |
| 449 | + partitioned_replicated_merge_tree_table( |
| 450 | + table_name=source_table, |
| 451 | + partition_by="p", |
| 452 | + columns=default_columns(), |
| 453 | + stop_merges=True, |
| 454 | + cluster="replicated_cluster", |
| 455 | + ) |
| 456 | + s3_table_name = create_s3_table(table_name="s3", create_new_bucket=True) |
| 457 | + |
| 458 | + with When("export partitions"): |
| 459 | + export_partitions( |
| 460 | + source_table=source_table, |
| 461 | + destination_table=s3_table_name, |
| 462 | + node=self.context.node, |
| 463 | + ) |
| 464 | + |
| 465 | + with Then("verify all required fields exist in table structure"): |
| 466 | + structure_result = self.context.node.query( |
| 467 | + "DESCRIBE TABLE system.replicated_partition_exports", |
| 468 | + exitcode=0, |
| 469 | + steps=True, |
| 470 | + ) |
| 471 | + column_names = [ |
| 472 | + line.split("\t")[0].strip() |
| 473 | + for line in structure_result.output.strip().splitlines() |
| 474 | + if line.strip() |
| 475 | + ] |
| 476 | + |
| 477 | + required_fields = [ |
| 478 | + "database", |
| 479 | + "table", |
| 480 | + "destination_database", |
| 481 | + "destination_table", |
| 482 | + "create_time", |
| 483 | + "partition_id", |
| 484 | + "transaction_id", |
| 485 | + "query_id", |
| 486 | + "source_replica", |
| 487 | + "parts", |
| 488 | + "parts_count", |
| 489 | + "parts_to_do", |
| 490 | + "status", |
| 491 | + "exception_replica", |
| 492 | + "last_exception", |
| 493 | + "exception_part", |
| 494 | + "exception_count", |
| 495 | + ] |
| 496 | + |
| 497 | + alternative_names = { |
| 498 | + "database": ["source_database"], |
| 499 | + "table": ["source_table"], |
| 500 | + } |
| 501 | + |
| 502 | + missing_fields = [] |
| 503 | + for field in required_fields: |
| 504 | + if field not in column_names: |
| 505 | + if field in alternative_names: |
| 506 | + found_alternative = any( |
| 507 | + alt in column_names for alt in alternative_names[field] |
| 508 | + ) |
| 509 | + if not found_alternative: |
| 510 | + missing_fields.append(field) |
| 511 | + else: |
| 512 | + missing_fields.append(field) |
| 513 | + |
| 514 | + assert ( |
| 515 | + len(missing_fields) == 0 |
| 516 | + ), error( |
| 517 | + f"Missing required fields: {missing_fields}. Available columns: {column_names}" |
| 518 | + ) |
| 519 | + |
| 520 | + with And("verify fields are populated after export"): |
| 521 | + for retry in retries(timeout=30, delay=2): |
| 522 | + with retry: |
| 523 | + result = self.context.node.query( |
| 524 | + f"SELECT source_database, source_table, destination_database, destination_table, " |
| 525 | + f"create_time, partition_id, transaction_id, query_id, source_replica, " |
| 526 | + f"parts, parts_count, parts_to_do, status, exception_replica, " |
| 527 | + f"last_exception, exception_part, exception_count " |
| 528 | + f"FROM system.replicated_partition_exports " |
| 529 | + f"WHERE source_table = '{source_table}' LIMIT 1", |
| 530 | + exitcode=0, |
| 531 | + steps=True, |
| 532 | + ) |
| 533 | + assert result.output.strip() != "", error( |
| 534 | + "Fields should be populated after export" |
| 535 | + ) |
| 536 | + |
| 537 | + |
436 | 538 | @TestFeature |
437 | 539 | @Name("system monitoring") |
438 | 540 | @Requirements(RQ_ClickHouse_ExportPartition_SystemTables_Exports("1.0")) |
439 | 541 | def feature(self): |
440 | 542 | """Check system monitoring of export partition operations via system.replicated_partition_exports table.""" |
441 | 543 |
|
442 | | - Scenario(run=export_appears_in_table) |
443 | | - Scenario(run=export_fields_populated) |
444 | | - Scenario(run=export_status_transitions) |
445 | | - Scenario(run=parts_to_do_decreases) |
446 | | - Scenario(run=concurrent_exports_tracking) |
447 | | - Scenario(run=partition_id_matches_exported) |
448 | | - Scenario(run=parts_array_matches_table_parts) |
449 | | - Scenario(run=transaction_id_populated) |
450 | | - Scenario(test=concurrent_exports_limit)(background_move_pool_size=1) |
451 | | - Scenario(test=concurrent_exports_limit)(background_move_pool_size=4) |
452 | | - Scenario(test=concurrent_exports_limit)(background_move_pool_size=8) |
| 544 | + # Scenario(run=export_appears_in_table) |
| 545 | + # Scenario(run=export_fields_populated) |
| 546 | + # Scenario(run=export_status_transitions) |
| 547 | + # Scenario(run=parts_to_do_decreases) |
| 548 | + # Scenario(run=concurrent_exports_tracking) |
| 549 | + # Scenario(run=partition_id_matches_exported) |
| 550 | + # Scenario(run=parts_array_matches_table_parts) |
| 551 | + # Scenario(run=transaction_id_populated) |
| 552 | + # Scenario(test=concurrent_exports_limit)(background_move_pool_size=1) |
| 553 | + # Scenario(test=concurrent_exports_limit)(background_move_pool_size=4) |
| 554 | + # Scenario(test=concurrent_exports_limit)(background_move_pool_size=8) |
| 555 | + Scenario(run=all_required_fields_present) |
0 commit comments