11import os
2- import csv
32from typing import Union
4- import glob
5-
6- from exceptions .exceptions import ConfigError
7- from utils .dbt import get_model_paths_from_dbt_project , get_target_database_name , \
8- extract_credentials_and_data_from_profiles
93from utils .ordered_yaml import OrderedYaml
104
115ordered_yaml = OrderedYaml ()
@@ -17,12 +11,9 @@ class Config(object):
1711 WORKFLOWS = 'workflows'
1812 CONFIG_FILE_NAME = 'config.yml'
1913
20- def __init__ (self , config_dir : str , profiles_dir : str , profile_name : str ) -> None :
14+ def __init__ (self , config_dir : str , profiles_dir : str ) -> None :
2115 self .config_dir = config_dir
2216 self .profiles_dir = profiles_dir
23- self .profile_name = profile_name
24- self .credentials , self .profiles_data = extract_credentials_and_data_from_profiles (profiles_dir ,
25- profile_name )
2617 self .config_dict = self ._load_configuration ()
2718
2819 def _load_configuration (self ) -> dict :
@@ -35,14 +26,6 @@ def _load_configuration(self) -> dict:
3526
3627 return ordered_yaml .load (config_file_path )
3728
38- @property
39- def query_history_source (self ):
40- return self .profiles_data .get ('query_history_source' )
41-
42- @property
43- def platform (self ):
44- return self .profiles_data .get ('type' , 'unknown' )
45-
4629 @property
4730 def anonymous_tracking_enabled (self ) -> bool :
4831 return self .config_dict .get ('anonymous_usage_tracking' , True )
@@ -70,106 +53,3 @@ def target_dir(self) -> str:
7053 return os .getcwd ()
7154 return target_path
7255
73- @staticmethod
74- def _find_schema_yml_files_in_dbt_project (dbt_project_models_path : str ) -> list :
75- return glob .glob (os .path .join (dbt_project_models_path , '*.yml' ), recursive = True )
76-
77- def _get_sources_from_all_dbt_projects (self ) -> list :
78- config_dict = self ._load_configuration ()
79- dbt_projects = config_dict .get ('dbt_projects' , [])
80- sources = []
81- for dbt_project_path in dbt_projects :
82- try :
83- dbt_project_target_database = get_target_database_name (self .profiles_dir , dbt_project_path )
84- model_paths = get_model_paths_from_dbt_project (dbt_project_path )
85- for model_path in model_paths :
86- dbt_project_models_path = os .path .join (dbt_project_path , model_path )
87- schema_yml_files = self ._find_schema_yml_files_in_dbt_project (dbt_project_models_path )
88- for schema_yml_file in schema_yml_files :
89- schema_dict = ordered_yaml .load (schema_yml_file )
90- schema_sources = schema_dict .get ('sources' )
91- if schema_sources is not None :
92- dbt_project_sources = {'sources' : schema_sources ,
93- 'dbt_project_target_database' : dbt_project_target_database }
94- sources .append (dbt_project_sources )
95- except FileNotFoundError as exc :
96- raise ConfigError (f'No such file - { exc .filename } , please configure a valid dbt project path' )
97- return sources
98-
99- @staticmethod
100- def _alert_on_schema_changes (source_dict : dict ) -> Union [bool , None ]:
101- metadata = source_dict .get ('meta' , {})
102- edr_config = metadata .get ('edr' , {})
103- alert_on_schema_changes = edr_config .get ('schema_changes' )
104-
105- # Normalize alert_on_schema_changes to handle both booleans and strings
106- alert_on_schema_changes_str = str (alert_on_schema_changes ).lower ()
107- if alert_on_schema_changes_str == 'false' :
108- return False
109- elif alert_on_schema_changes_str == 'true' :
110- return True
111- else :
112- return None
113-
114- def monitoring_configuration_in_dbt_sources_to_csv (self , target_csv_path : str ) -> int :
115- row_count = 0
116- with open (target_csv_path , 'w' ) as target_csv :
117- target_csv_writer = csv .DictWriter (target_csv , fieldnames = ['database_name' ,
118- 'schema_name' ,
119- 'table_name' ,
120- 'column_name' ,
121- 'alert_on_schema_changes' ])
122- target_csv_writer .writeheader ()
123-
124- all_configured_sources = self ._get_sources_from_all_dbt_projects ()
125- for sources_dict in all_configured_sources :
126- sources = sources_dict .get ('sources' , [])
127- target_database = sources_dict .get ('dbt_project_target_database' )
128- for source in sources :
129- source_db = source .get ('database' , target_database )
130- if source_db is None :
131- continue
132-
133- schema_name = source .get ('schema' , source .get ('name' ))
134- if schema_name is None :
135- continue
136-
137- alert_on_schema_changes = self ._alert_on_schema_changes (source )
138- if alert_on_schema_changes is not None :
139- target_csv_writer .writerow ({'database_name' : source_db ,
140- 'schema_name' : schema_name ,
141- 'table_name' : None ,
142- 'column_name' : None ,
143- 'alert_on_schema_changes' : alert_on_schema_changes })
144- row_count += 1
145-
146- source_tables = source .get ('tables' , [])
147- for source_table in source_tables :
148- table_name = source_table .get ('identifier' , source_table .get ('name' ))
149- if table_name is None :
150- continue
151-
152- alert_on_schema_changes = self ._alert_on_schema_changes (source_table )
153- if alert_on_schema_changes is not None :
154- target_csv_writer .writerow ({'database_name' : source_db ,
155- 'schema_name' : schema_name ,
156- 'table_name' : table_name ,
157- 'column_name' : None ,
158- 'alert_on_schema_changes' : alert_on_schema_changes })
159- row_count += 1
160-
161- source_columns = source_table .get ('columns' , [])
162- for source_column in source_columns :
163- column_name = source_column .get ('name' )
164- if column_name is None :
165- continue
166-
167- alert_on_schema_changes = self ._alert_on_schema_changes (source_column )
168- if alert_on_schema_changes is not None :
169- target_csv_writer .writerow ({'database_name' : source_db ,
170- 'schema_name' : schema_name ,
171- 'table_name' : table_name ,
172- 'column_name' : column_name ,
173- 'alert_on_schema_changes' : alert_on_schema_changes })
174- row_count += 1
175- return row_count
0 commit comments