|
22 | 22 | import json |
23 | 23 | import logging |
24 | 24 | import os |
25 | | -from typing import Any, Union |
| 25 | +from typing import Any, Union, override |
26 | 26 | from absl import flags |
27 | 27 | from perfkitbenchmarker import data |
28 | 28 | from perfkitbenchmarker import edw_service |
@@ -319,6 +319,39 @@ def GetSnowflakeClientInterface( |
319 | 319 | class Snowflake(edw_service.EdwService): |
320 | 320 | """Object representing a Snowflake Data Warehouse Instance.""" |
321 | 321 |
|
| 322 | + SEARCH_QUERY_TEMPLATE_LOCATION = 'edw/snowflake_aws/search_index' |
| 323 | + |
| 324 | + CREATE_INDEX_QUERY_TEMPLATE = ( |
| 325 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/create_index_query.sql.j2' |
| 326 | + ) |
| 327 | + DELETE_INDEX_QUERY_TEMPLATE = ( |
| 328 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/delete_index_query.sql.j2' |
| 329 | + ) |
| 330 | + GET_INDEX_STATUS_QUERY_TEMPLATE = ( |
| 331 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/index_status.sql.j2' |
| 332 | + ) |
| 333 | + INITIALIZE_SEARCH_TABLE_QUERY_TEMPLATE = ( |
| 334 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/table_init.sql.j2' |
| 335 | + ) |
| 336 | + LOAD_SEARCH_DATA_QUERY_TEMPLATE = ( |
| 337 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/ingestion_query.sql.j2' |
| 338 | + ) |
| 339 | + INDEX_SEARCH_QUERY_TEMPLATE = ( |
| 340 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/search_query.sql.j2' |
| 341 | + ) |
| 342 | + GET_ROW_COUNT_QUERY_TEMPLATE = ( |
| 343 | + f'{SEARCH_QUERY_TEMPLATE_LOCATION}/get_row_count.sql.j2' |
| 344 | + ) |
| 345 | + TIME_BOUND_QUERY_HISTORY_TEMPLATE = ( |
| 346 | + 'edw/snowflake_aws/metadata/time_bound_query_history.sql.j2' |
| 347 | + ) |
| 348 | + INDIVIDUAL_QUERY_PLAN_TEMPLATE = ( |
| 349 | + 'edw/snowflake_aws/metadata/individual_query_plan.sql.j2' |
| 350 | + ) |
| 351 | + INDIVIDUAL_QUERY_STATS_TEMPLATE = ( |
| 352 | + 'edw/snowflake_aws/metadata/individual_query_stats.sql.j2' |
| 353 | + ) |
| 354 | + |
322 | 355 | CLOUD: str = None |
323 | 356 | SERVICE_TYPE = None |
324 | 357 |
|
@@ -515,30 +548,6 @@ def GetMetadata(self): |
515 | 548 | basic_data.update(self.client_interface.GetMetadata()) |
516 | 549 | return basic_data |
517 | 550 |
|
518 | | - SEARCH_QUERY_TEMPLATE_LOCATION = 'edw/snowflake_aws/search_index' |
519 | | - |
520 | | - CREATE_INDEX_QUERY_TEMPLATE = ( |
521 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/create_index_query.sql.j2' |
522 | | - ) |
523 | | - DELETE_INDEX_QUERY_TEMPLATE = ( |
524 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/delete_index_query.sql.j2' |
525 | | - ) |
526 | | - GET_INDEX_STATUS_QUERY_TEMPLATE = ( |
527 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/index_status.sql.j2' |
528 | | - ) |
529 | | - INITIALIZE_SEARCH_TABLE_QUERY_TEMPLATE = ( |
530 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/table_init.sql.j2' |
531 | | - ) |
532 | | - LOAD_SEARCH_DATA_QUERY_TEMPLATE = ( |
533 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/ingestion_query.sql.j2' |
534 | | - ) |
535 | | - INDEX_SEARCH_QUERY_TEMPLATE = ( |
536 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/search_query.sql.j2' |
537 | | - ) |
538 | | - GET_ROW_COUNT_QUERY_TEMPLATE = ( |
539 | | - f'{SEARCH_QUERY_TEMPLATE_LOCATION}/get_row_count.sql.j2' |
540 | | - ) |
541 | | - |
542 | 551 | def CreateSearchIndex( |
543 | 552 | self, table_path: str, index_name: str |
544 | 553 | ) -> tuple[float, dict[str, Any]]: |
@@ -667,3 +676,88 @@ def SetWarehouse(self, warehouse: str): |
667 | 676 | ) |
668 | 677 | self.warehouse = warehouse |
669 | 678 | self.client_interface.warehouse = warehouse |
| 679 | + |
| 680 | + def _RunMetadataQuery( |
| 681 | + self, query_template: str, query_name: str, context: dict[str, Any] |
| 682 | + ) -> dict[str, Any]: |
| 683 | + self.client_interface.client_vm.RenderTemplate( |
| 684 | + data.ResourcePath(query_template), |
| 685 | + query_name, |
| 686 | + context, |
| 687 | + ) |
| 688 | + # log the query text |
| 689 | + self.client_interface.client_vm.RemoteCommand(f'cat {query_name}') |
| 690 | + _, output = self.client_interface.ExecuteQuery( |
| 691 | + query_name, print_results=True |
| 692 | + ) |
| 693 | + col_res = output['query_results'] |
| 694 | + return col_res |
| 695 | + |
| 696 | + def _GetIndividualQueryMetadata(self, query_id: str) -> list[dict[str, Any]]: |
| 697 | + query_plan_file_name = f'individual_query_plan_{query_id}.sql' |
| 698 | + query_stats_file_name = f'individual_query_stats_{query_id}.sql' |
| 699 | + context = { |
| 700 | + 'query_id': query_id, |
| 701 | + } |
| 702 | + query_plan_rows = self.ColsToRows( |
| 703 | + self._RunMetadataQuery( |
| 704 | + self.INDIVIDUAL_QUERY_PLAN_TEMPLATE, |
| 705 | + query_plan_file_name, |
| 706 | + context, |
| 707 | + ) |
| 708 | + ) |
| 709 | + query_stats_rows = self.ColsToRows( |
| 710 | + self._RunMetadataQuery( |
| 711 | + self.INDIVIDUAL_QUERY_STATS_TEMPLATE, |
| 712 | + query_stats_file_name, |
| 713 | + context, |
| 714 | + ) |
| 715 | + ) |
| 716 | + results = [ |
| 717 | + { |
| 718 | + 'metric': 'edw_sf_query_plan', |
| 719 | + 'value': 1, |
| 720 | + 'unit': 'metadata', |
| 721 | + 'metadata': { |
| 722 | + f'sf_{key}': value for key, value in query_plan_rows[0].items() |
| 723 | + }, |
| 724 | + }, |
| 725 | + { |
| 726 | + 'metric': 'edw_sf_query_stats', |
| 727 | + 'value': 1, |
| 728 | + 'unit': 'metadata', |
| 729 | + 'metadata': { |
| 730 | + 'sf_query_stats': json.dumps(query_stats_rows, default=str) |
| 731 | + }, |
| 732 | + }, |
| 733 | + ] |
| 734 | + return results |
| 735 | + |
| 736 | + @override |
| 737 | + def GetTimeBoundAuxiliaryMetrics( |
| 738 | + self, start_timestamp: float, end_timestamp: float |
| 739 | + ) -> list[dict[str, Any]]: |
| 740 | + """Returns the auxiliary metrics for the given run.""" |
| 741 | + query_file_name = f'metadata_query_{start_timestamp}.sql' |
| 742 | + context = { |
| 743 | + 'start_timestamp': start_timestamp, |
| 744 | + 'end_timestamp': end_timestamp, |
| 745 | + 'warehouse': self.warehouse, |
| 746 | + } |
| 747 | + col_res = self._RunMetadataQuery( |
| 748 | + self.TIME_BOUND_QUERY_HISTORY_TEMPLATE, |
| 749 | + query_file_name, |
| 750 | + context, |
| 751 | + ) |
| 752 | + row_res = self.ColsToRows(col_res) |
| 753 | + history_results = [] |
| 754 | + for row in row_res: |
| 755 | + history_results.append({ |
| 756 | + 'metric': 'sf_query_metadata', |
| 757 | + 'value': 1, |
| 758 | + 'unit': 'metadata', |
| 759 | + 'metadata': {f'sf_{key}': value for key, value in row.items()}, |
| 760 | + }) |
| 761 | + for qid in col_res['QUERY_ID']: |
| 762 | + history_results.extend(self._GetIndividualQueryMetadata(qid)) |
| 763 | + return history_results |
0 commit comments