4545 show_default = True ,
4646 help = "Wait for ingestion to complete" ,
4747)
48+ @click .option (
49+ "--tag" ,
50+ envvar = "CSM_DATA_ADX_TAG" ,
51+ show_envvar = True ,
52+ default = None ,
53+ help = "Optional tag to use for tracking and potential rollback of this ingestion operation" ,
54+ )
4855def adx_send_data (
4956 adx_uri : str ,
5057 adx_ingest_uri : str ,
5158 database_name : str ,
5259 wait : bool ,
60+ tag : str = None ,
5361):
5462 # Import the function at the start of the command
5563 from cosmotech .coal .azure .adx .auth import create_ingest_client , create_kusto_client
@@ -61,8 +69,13 @@ def adx_send_data(
6169 from cosmotech .coal .azure .adx import type_mapping
6270
6371 import time
72+ import uuid
6473 from cosmotech .coal .azure .adx import IngestionStatus
6574
75+ # Generate operation tag if not provided
76+ operation_tag = tag or f"op-{ str (uuid .uuid4 ())} "
77+ LOGGER .debug (f"Starting ingestion operation with tag: { operation_tag } " )
78+
6679 LOGGER .debug ("Initializing clients" )
6780 kusto_client = create_kusto_client (adx_uri )
6881 ingest_client = create_ingest_client (adx_ingest_uri )
@@ -72,14 +85,14 @@ def adx_send_data(
7285 s = Store ()
7386 source_ids = []
7487 LOGGER .debug ("Listing tables" )
75- table_list = list (s .list_tables ())[: 3 ]
88+ table_list = list (s .list_tables ())
7689 table_ingestion_id_mapping = dict ()
7790 for target_table_name in table_list :
7891 LOGGER .info (f"Working on table: { target_table_name } " )
7992 data = s .get_table (target_table_name )
8093
8194 if data .num_rows < 1 :
82- LOGGER .warn (f"Table { target_table_name } has no rows - skipping it" )
95+ LOGGER .warning (f"Table { target_table_name } has no rows - skipping it" )
8396 continue
8497
8598 LOGGER .debug (" - Checking if table exists" )
@@ -99,38 +112,90 @@ def adx_send_data(
99112 create_table (kusto_client , database , target_table_name , mapping )
100113
101114 LOGGER .debug (f"Sending data to the table { target_table_name } " )
102- result = send_pyarrow_table_to_adx (ingest_client , database , target_table_name , data , None )
115+ # Use the operation_tag as the drop_by_tag parameter
116+ result = send_pyarrow_table_to_adx (ingest_client , database , target_table_name , data , operation_tag )
103117 source_ids .append (result .source_id )
104118 table_ingestion_id_mapping [result .source_id ] = target_table_name
105119
120+ # Track if any failures occur
121+ has_failures = False
122+
106123 LOGGER .info ("Store data was sent for ADX ingestion" )
107- if wait :
108- LOGGER .info ("Waiting for ingestion of data to finish" )
109- import tqdm
110-
111- with tqdm .tqdm (desc = "Ingestion status" , total = len (source_ids )) as pbar :
112- while any (
113- map (
114- lambda _status : _status [1 ] in (IngestionStatus .QUEUED , IngestionStatus .UNKNOWN ),
115- results := list (check_ingestion_status (ingest_client , source_ids )),
116- )
117- ):
118- cleared_ids = list (
119- result for result in results if result [1 ] not in (IngestionStatus .QUEUED , IngestionStatus .UNKNOWN )
120- )
121-
122- for ingestion_id , ingestion_status in cleared_ids :
123- pbar .update (1 )
124- source_ids .remove (ingestion_id )
125-
126- if os .environ .get ("CSM_USE_RICH" , "False" ).lower () in ("true" , "1" , "yes" , "t" , "y" ):
127- for _ in range (10 ):
128- time .sleep (1 )
129- pbar .update (0 )
124+ try :
125+ if wait :
126+ LOGGER .info ("Waiting for ingestion of data to finish" )
127+ import tqdm
128+
129+ with tqdm .tqdm (desc = "Ingestion status" , total = len (source_ids )) as pbar :
130+ while any (
131+ list (
132+ map (
133+ lambda _status : _status [1 ] in (IngestionStatus .QUEUED , IngestionStatus .UNKNOWN ),
134+ results := list (check_ingestion_status (ingest_client , source_ids )),
135+ )
136+ )
137+ ):
138+ # Check for failures
139+ for ingestion_id , ingestion_status in results :
140+ if ingestion_status == IngestionStatus .FAILURE :
141+ LOGGER .error (
142+ f"Ingestion { ingestion_id } failed for table { table_ingestion_id_mapping .get (ingestion_id )} "
143+ )
144+ has_failures = True
145+
146+ cleared_ids = list (
147+ result
148+ for result in results
149+ if result [1 ] not in (IngestionStatus .QUEUED , IngestionStatus .UNKNOWN )
150+ )
151+
152+ for ingestion_id , ingestion_status in cleared_ids :
153+ pbar .update (1 )
154+ source_ids .remove (ingestion_id )
155+
156+ time .sleep (1 )
157+ if os .environ .get ("CSM_USE_RICH" , "False" ).lower () in ("true" , "1" , "yes" , "t" , "y" ):
158+ pbar .refresh ()
130159 else :
131- time .sleep (10 )
132- pbar .update (len (source_ids ))
133- LOGGER .info ("All data got ingested" )
160+ for ingestion_id , ingestion_status in results :
161+ if ingestion_status == IngestionStatus .FAILURE :
162+ LOGGER .error (
163+ f"Ingestion { ingestion_id } failed for table { table_ingestion_id_mapping .get (ingestion_id )} "
164+ )
165+ has_failures = True
166+ pbar .update (len (source_ids ))
167+ LOGGER .info ("All data ingestion attempts completed" )
168+
169+ # If any ingestion failed, perform rollback
170+ if has_failures :
171+ LOGGER .warning (f"Failures detected during ingestion - dropping data with tag: { operation_tag } " )
172+ _drop_by_tag (kusto_client , database , operation_tag )
173+
174+ except Exception as e :
175+ LOGGER .exception ("Error during ingestion process" )
176+ # Perform rollback using the tag
177+ LOGGER .warning (f"Dropping data with tag: { operation_tag } " )
178+ _drop_by_tag (kusto_client , database , operation_tag )
179+ raise e
180+
181+ if has_failures :
182+ click .Abort ()
183+
184+
185+ def _drop_by_tag (kusto_client , database , tag ):
186+ """
187+ Drop all data with the specified tag
188+ """
189+ LOGGER .info (f"Dropping data with tag: { tag } " )
190+
191+ try :
192+ # Execute the drop by tag command
193+ drop_command = f'.drop extents <| .show database extents where tags has "drop-by:{ tag } "'
194+ kusto_client .execute_mgmt (database , drop_command )
195+ LOGGER .info ("Drop by tag operation completed" )
196+ except Exception as e :
197+ LOGGER .error (f"Error during drop by tag operation: { str (e )} " )
198+ LOGGER .exception ("Drop by tag details" )
134199
135200
136201if __name__ == "__main__" :
0 commit comments