2525
2626
2727@click .command ()
28- @click .option ("--dataset-absolute-path" ,
29- envvar = "CSM_DATASET_ABSOLUTE_PATH" ,
30- show_envvar = True ,
31- help = "A local folder to store the main dataset content" ,
32- metavar = "PATH" ,
33- required = True )
34- @click .option ("--parameters-absolute-path" ,
35- envvar = "CSM_PARAMETERS_ABSOLUTE_PATH" ,
36- metavar = "PATH" ,
37- show_envvar = True ,
38- help = "A local folder to store the parameters content" ,
39- required = True )
40- @click .option ("--simulation-id" ,
41- envvar = "CSM_SIMULATION_ID" ,
42- show_envvar = True ,
43- required = True ,
44- metavar = "UUID" ,
45- help = "the Simulation Id to add to records" )
46- @click .option ("--adx-uri" ,
47- envvar = "AZURE_DATA_EXPLORER_RESOURCE_URI" ,
48- show_envvar = True ,
49- required = True ,
50- metavar = "URI" ,
51- help = "the ADX cluster path (URI info can be found into ADX cluster page)" )
52- @click .option ("--adx-ingest-uri" ,
53- envvar = "AZURE_DATA_EXPLORER_RESOURCE_INGEST_URI" ,
54- show_envvar = True ,
55- required = True ,
56- metavar = "URI" ,
57- help = "The ADX cluster ingest path (URI info can be found into ADX cluster page)" )
58- @click .option ("--database-name" ,
59- envvar = "AZURE_DATA_EXPLORER_DATABASE_NAME" ,
60- show_envvar = True ,
61- required = True ,
62- metavar = "NAME" ,
63- help = "The targeted database name" )
64- @click .option ("--send-parameters/--no-send-parameters" ,
65- type = bool ,
66- envvar = "CSM_SEND_DATAWAREHOUSE_PARAMETERS" ,
67- show_envvar = True ,
68- default = False ,
69- show_default = True ,
70- help = "whether or not to send parameters (parameters path is mandatory then)" )
71- @click .option ("--send-datasets/--no-send-datasets" ,
72- type = bool ,
73- envvar = "CSM_SEND_DATAWAREHOUSE_DATASETS" ,
74- show_envvar = True ,
75- default = False ,
76- show_default = True ,
77- help = "whether or not to send datasets (parameters path is mandatory then)" )
78- @click .option ("--wait/--no-wait" ,
79- envvar = "WAIT_FOR_INGESTION" ,
80- show_envvar = True ,
81- default = False ,
82- show_default = True ,
83- help = "Toggle waiting for the ingestion results" )
28+ @click .option (
29+ "--dataset-absolute-path" ,
30+ envvar = "CSM_DATASET_ABSOLUTE_PATH" ,
31+ show_envvar = True ,
32+ help = "A local folder to store the main dataset content" ,
33+ metavar = "PATH" ,
34+ required = True ,
35+ )
36+ @click .option (
37+ "--parameters-absolute-path" ,
38+ envvar = "CSM_PARAMETERS_ABSOLUTE_PATH" ,
39+ metavar = "PATH" ,
40+ show_envvar = True ,
41+ help = "A local folder to store the parameters content" ,
42+ required = True ,
43+ )
44+ @click .option (
45+ "--simulation-id" ,
46+ envvar = "CSM_SIMULATION_ID" ,
47+ show_envvar = True ,
48+ required = True ,
49+ metavar = "UUID" ,
50+ help = "the Simulation Id to add to records" ,
51+ )
52+ @click .option (
53+ "--adx-uri" ,
54+ envvar = "AZURE_DATA_EXPLORER_RESOURCE_URI" ,
55+ show_envvar = True ,
56+ required = True ,
57+ metavar = "URI" ,
58+ help = "the ADX cluster path (URI info can be found into ADX cluster page)" ,
59+ )
60+ @click .option (
61+ "--adx-ingest-uri" ,
62+ envvar = "AZURE_DATA_EXPLORER_RESOURCE_INGEST_URI" ,
63+ show_envvar = True ,
64+ required = True ,
65+ metavar = "URI" ,
66+ help = "The ADX cluster ingest path (URI info can be found into ADX cluster page)" ,
67+ )
68+ @click .option (
69+ "--database-name" ,
70+ envvar = "AZURE_DATA_EXPLORER_DATABASE_NAME" ,
71+ show_envvar = True ,
72+ required = True ,
73+ metavar = "NAME" ,
74+ help = "The targeted database name" ,
75+ )
76+ @click .option (
77+ "--send-parameters/--no-send-parameters" ,
78+ type = bool ,
79+ envvar = "CSM_SEND_DATAWAREHOUSE_PARAMETERS" ,
80+ show_envvar = True ,
81+ default = False ,
82+ show_default = True ,
83+ help = "whether or not to send parameters (parameters path is mandatory then)" ,
84+ )
85+ @click .option (
86+ "--send-datasets/--no-send-datasets" ,
87+ type = bool ,
88+ envvar = "CSM_SEND_DATAWAREHOUSE_DATASETS" ,
89+ show_envvar = True ,
90+ default = False ,
91+ show_default = True ,
92+ help = "whether or not to send datasets (parameters path is mandatory then)" ,
93+ )
94+ @click .option (
95+ "--wait/--no-wait" ,
96+ envvar = "WAIT_FOR_INGESTION" ,
97+ show_envvar = True ,
98+ default = False ,
99+ show_default = True ,
100+ help = "Toggle waiting for the ingestion results" ,
101+ )
84102@web_help ("csm-data/adx-send-scenario-data" )
85103def adx_send_scenariodata (
86104 send_parameters : bool ,
@@ -91,23 +109,23 @@ def adx_send_scenariodata(
91109 adx_uri : str ,
92110 adx_ingest_uri : str ,
93111 database_name : str ,
94- wait : bool
112+ wait : bool ,
95113):
96114 """
97- Uses environment variables to send content of CSV files to ADX
98- Requires a valid Azure connection either with:
99- - The AZ cli command: **az login**
100- - A triplet of env var `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, `AZURE_CLIENT_SECRET`
115+ Uses environment variables to send content of CSV files to ADX
116+ Requires a valid Azure connection either with:
117+ - The AZ cli command: **az login**
118+ - A triplet of env var `AZURE_TENANT_ID`, `AZURE_CLIENT_ID`, `AZURE_CLIENT_SECRET`
101119 """
102120 csv_data = dict ()
103121 if send_parameters :
104122 csv_data .update (prepare_csv_content (parameters_absolute_path ))
105123 if send_datasets :
106124 csv_data .update (prepare_csv_content (dataset_absolute_path ))
107125 queries = construct_create_query (csv_data )
108- adx_client = ADXQueriesWrapper (database = database_name ,
109- cluster_url = adx_uri ,
110- ingest_url = adx_ingest_uri )
126+ adx_client = ADXQueriesWrapper (
127+ database = database_name , cluster_url = adx_uri , ingest_url = adx_ingest_uri
128+ )
111129 for k , v in queries .items ():
112130 LOGGER .info (f"Create table query: { v } " )
113131 r : KustoResponseDataSet = adx_client .run_query (v )
@@ -117,11 +135,13 @@ def adx_send_scenariodata(
117135 LOGGER .error (f"Issue creating table { k } " )
118136 LOGGER .error (r .get_exceptions ())
119137 raise click .Abort ()
120- insert_csv_files (files_data = csv_data ,
121- adx_client = adx_client ,
122- simulation_id = simulation_id ,
123- database = database_name ,
124- wait = wait )
138+ insert_csv_files (
139+ files_data = csv_data ,
140+ adx_client = adx_client ,
141+ simulation_id = simulation_id ,
142+ database = database_name ,
143+ wait = wait ,
144+ )
125145
126146
127147def prepare_csv_content (folder_path ):
@@ -139,15 +159,9 @@ def prepare_csv_content(folder_path):
139159 for _file in root .rglob ("*.csv" ):
140160 with open (_file ) as _csv_content :
141161 header = _csv_content .readline ().replace ("@" , "" ).strip ()
142- headers = header .split (',' ) if header else list ()
143- cols = {
144- k .strip (): "string"
145- for k in headers
146- }
147- csv_datas = {
148- 'filename' : _file .name .removesuffix (".csv" ),
149- 'headers' : cols
150- }
162+ headers = header .split ("," ) if header else list ()
163+ cols = {k .strip (): "string" for k in headers }
164+ csv_datas = {"filename" : _file .name .removesuffix (".csv" ), "headers" : cols }
151165 content [str (_file )] = csv_datas
152166 LOGGER .debug (content )
153167
@@ -167,14 +181,16 @@ def construct_create_query(files_data):
167181 queries[table_name] = query"""
168182 queries = dict ()
169183 for file_path , file_info in files_data .items ():
170- filename = file_info .get (' filename' )
171- fields = file_info .get (' headers' )
184+ filename = file_info .get (" filename" )
185+ fields = file_info .get (" headers" )
172186 query = f".create-merge table { filename } ({ ',' .join (':' .join ((k , v )) for k , v in fields .items ())} )"
173187 queries [filename ] = query
174188 return queries
175189
176190
177- def insert_csv_files (files_data , adx_client : ADXQueriesWrapper , simulation_id , database , wait = False ):
191+ def insert_csv_files (
192+ files_data , adx_client : ADXQueriesWrapper , simulation_id , database , wait = False
193+ ):
178194 """insert_data(csv_infos):
179195 create ingestion client
180196 foreach csv_file:
@@ -186,55 +202,70 @@ def insert_csv_files(files_data, adx_client: ADXQueriesWrapper, simulation_id, d
186202 ingest csv_file + ingestion_properties"""
187203 ingestion_ids = dict ()
188204 for file_path , file_info in files_data .items ():
189- filename = file_info .get (' filename' )
190- fields = file_info .get (' headers' )
205+ filename = file_info .get (" filename" )
206+ fields = file_info .get (" headers" )
191207 with open (file_path ) as _f :
192208 file_size = sum (map (len , _f .readlines ()))
193209 LOGGER .debug (f"{ file_path } size: { file_size } " )
194210 fd = FileDescriptor (file_path , file_size )
195211 ord = 0
196212 mappings = list ()
197213 for column , _type in fields .items ():
198- mapping = ColumnMapping (column_name = column ,
199- column_type = _type ,
200- ordinal = ord )
214+ mapping = ColumnMapping (column_name = column , column_type = _type , ordinal = ord )
201215 ord += 1
202216 mappings .append (mapping )
203- simulation_run_col = ColumnMapping (column_name = "simulationrun" ,
204- column_type = "string" ,
205- ordinal = ord ,
206- const_value = simulation_id )
217+ simulation_run_col = ColumnMapping (
218+ column_name = "simulationrun" ,
219+ column_type = "string" ,
220+ ordinal = ord ,
221+ const_value = simulation_id ,
222+ )
207223 mappings .append (simulation_run_col )
208- ingestion_properties = IngestionProperties (database = database ,
209- table = filename ,
210- column_mappings = mappings ,
211- ingestion_mapping_kind = IngestionMappingKind .CSV ,
212- drop_by_tags = [simulation_id , ],
213- report_level = ReportLevel .FailuresAndSuccesses ,
214- additional_properties = {"ignoreFirstRecord" : "true" })
224+ ingestion_properties = IngestionProperties (
225+ database = database ,
226+ table = filename ,
227+ column_mappings = mappings ,
228+ ingestion_mapping_kind = IngestionMappingKind .CSV ,
229+ drop_by_tags = [
230+ simulation_id ,
231+ ],
232+ report_level = ReportLevel .FailuresAndSuccesses ,
233+ additional_properties = {"ignoreFirstRecord" : "true" },
234+ )
215235 LOGGER .info (f"Ingesting { filename } " )
216- results : IngestionResult = adx_client .ingest_client .ingest_from_file (fd , ingestion_properties )
236+ results : IngestionResult = adx_client .ingest_client .ingest_from_file (
237+ fd , ingestion_properties
238+ )
217239 ingestion_ids [str (results .source_id )] = filename
218240 if wait :
219241 count = 0
220242 limit = 5
221243 pause_duration = 8
222- while any (map (lambda s : s [1 ] in (IngestionStatus .QUEUED , IngestionStatus .UNKNOWN ),
223- adx_client .check_ingestion_status (source_ids = list (ingestion_ids .keys ())))):
244+ while any (
245+ map (
246+ lambda s : s [1 ] in (IngestionStatus .QUEUED , IngestionStatus .UNKNOWN ),
247+ adx_client .check_ingestion_status (
248+ source_ids = list (ingestion_ids .keys ())
249+ ),
250+ )
251+ ):
224252 count += 1
225253 if count > limit :
226254 LOGGER .warning ("Max number of retry, stop waiting" )
227255 break
228- LOGGER .info (f"Waiting for ingestion results, retry in { pause_duration } s ({ count } /{ limit } )" )
256+ LOGGER .info (
257+ f"Waiting for ingestion results, retry in { pause_duration } s ({ count } /{ limit } )"
258+ )
229259 time .sleep (pause_duration )
230260
231261 LOGGER .info ("Status of ingestion:" )
232262 status_color_mapping = defaultdict (lambda : "bright_black" )
233263 status_color_mapping [IngestionStatus .FAILURE .value ] = "red"
234264 status_color_mapping [IngestionStatus .SUCCESS .value ] = "green"
235- for _id , status in adx_client .check_ingestion_status (source_ids = list (ingestion_ids .keys ())):
236- LOGGER .info (f"{ ingestion_ids [_id ]} - "
237- f"{ status .name } " )
265+ for _id , status in adx_client .check_ingestion_status (
266+ source_ids = list (ingestion_ids .keys ())
267+ ):
268+ LOGGER .info (f"{ ingestion_ids [_id ]} - " f"{ status .name } " )
238269 else :
239270 LOGGER .info ("No wait for ingestion result" )
240271
0 commit comments