Skip to content

Commit 75a849f

Browse files
committed
Separate content of csm-data and coal for adx_send_data
1 parent 409d682 commit 75a849f

File tree

7 files changed

+333
-269
lines changed

7 files changed

+333
-269
lines changed

cosmotech/coal/azure/adx/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
# etc., to any person is prohibited unless it has been previously and
66
# specifically authorized by written means by Cosmo Tech.
77

8-
from cosmotech.coal.azure.adx.auth import create_kusto_client, create_ingest_client
8+
from cosmotech.coal.azure.adx.auth import create_kusto_client, create_ingest_client, initialize_clients
99
from cosmotech.coal.azure.adx.query import run_query, run_command_query
1010
from cosmotech.coal.azure.adx.ingestion import (
1111
ingest_dataframe,
1212
send_to_adx,
1313
check_ingestion_status,
14+
monitor_ingestion,
15+
handle_failures,
1416
IngestionStatus,
1517
)
16-
from cosmotech.coal.azure.adx.tables import table_exists, create_table
17-
from cosmotech.coal.azure.adx.utils import type_mapping
18+
from cosmotech.coal.azure.adx.tables import table_exists, create_table, check_and_create_table, _drop_by_tag
19+
from cosmotech.coal.azure.adx.utils import type_mapping, create_column_mapping
20+
from cosmotech.coal.azure.adx.store import send_pyarrow_table_to_adx, send_table_data, process_tables, send_store_to_adx
1821
from cosmotech.coal.azure.adx.runner import (
1922
prepare_csv_content,
2023
construct_create_query,

cosmotech/coal/azure/adx/auth.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# specifically authorized by written means by Cosmo Tech.
77

88
import os
9-
from typing import Union, Optional
9+
from typing import Union, Optional, Tuple
1010

1111
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder
1212
from azure.kusto.ingest import QueuedIngestClient
@@ -87,7 +87,24 @@ def create_ingest_client(
8787
return QueuedIngestClient(kcsb)
8888

8989

90-
def get_cluster_urls(cluster_name: str, cluster_region: str) -> tuple[str, str]:
90+
def initialize_clients(adx_uri: str, adx_ingest_uri: str) -> Tuple[KustoClient, QueuedIngestClient]:
91+
"""
92+
Initialize and return the Kusto and ingest clients.
93+
94+
Args:
95+
adx_uri: The Azure Data Explorer resource URI
96+
adx_ingest_uri: The Azure Data Explorer resource ingest URI
97+
98+
Returns:
99+
tuple: (kusto_client, ingest_client)
100+
"""
101+
LOGGER.debug("Initializing clients")
102+
kusto_client = create_kusto_client(adx_uri)
103+
ingest_client = create_ingest_client(adx_ingest_uri)
104+
return kusto_client, ingest_client
105+
106+
107+
def get_cluster_urls(cluster_name: str, cluster_region: str) -> Tuple[str, str]:
91108
"""
92109
Generate cluster and ingest URLs from cluster name and region.
93110

cosmotech/coal/azure/adx/ingestion.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@
1212
from typing import Optional
1313
from typing import Tuple
1414

15+
import os
1516
import pandas as pd
1617
import time
18+
import tqdm
1719
from azure.kusto.data import KustoClient
1820
from azure.kusto.data.data_format import DataFormat
1921
from azure.kusto.ingest import IngestionProperties
@@ -24,7 +26,7 @@
2426
from azure.kusto.ingest.status import SuccessMessage
2527
from cosmotech.orchestrator.utils.translate import T
2628

27-
from cosmotech.coal.azure.adx.tables import create_table
29+
from cosmotech.coal.azure.adx.tables import create_table, _drop_by_tag
2830
from cosmotech.coal.azure.adx.utils import type_mapping
2931
from cosmotech.coal.utils.logger import LOGGER
3032

@@ -221,6 +223,86 @@ def get_messages(queues):
221223
yield source_id, _ingest_status[source_id]
222224

223225

226+
def monitor_ingestion(
227+
ingest_client: QueuedIngestClient, source_ids: List[str], table_ingestion_id_mapping: Dict[str, str]
228+
) -> bool:
229+
"""
230+
Monitor the ingestion process with progress reporting.
231+
232+
Args:
233+
ingest_client: The ingest client
234+
source_ids: List of source IDs to monitor
235+
table_ingestion_id_mapping: Mapping of source IDs to table names
236+
237+
Returns:
238+
bool: True if any failures occurred, False otherwise
239+
"""
240+
has_failures = False
241+
source_ids_copy = source_ids.copy()
242+
243+
LOGGER.info("Waiting for ingestion of data to finish")
244+
245+
with tqdm.tqdm(desc="Ingestion status", total=len(source_ids_copy)) as pbar:
246+
while any(
247+
list(
248+
map(
249+
lambda _status: _status[1] in (IngestionStatus.QUEUED, IngestionStatus.UNKNOWN),
250+
results := list(check_ingestion_status(ingest_client, source_ids_copy)),
251+
)
252+
)
253+
):
254+
# Check for failures
255+
for ingestion_id, ingestion_status in results:
256+
if ingestion_status == IngestionStatus.FAILURE:
257+
LOGGER.error(
258+
f"Ingestion {ingestion_id} failed for table {table_ingestion_id_mapping.get(ingestion_id)}"
259+
)
260+
has_failures = True
261+
262+
cleared_ids = list(
263+
result for result in results if result[1] not in (IngestionStatus.QUEUED, IngestionStatus.UNKNOWN)
264+
)
265+
266+
for ingestion_id, ingestion_status in cleared_ids:
267+
pbar.update(1)
268+
source_ids_copy.remove(ingestion_id)
269+
270+
time.sleep(1)
271+
if os.environ.get("CSM_USE_RICH", "False").lower() in ("true", "1", "yes", "t", "y"):
272+
pbar.refresh()
273+
else:
274+
for ingestion_id, ingestion_status in results:
275+
if ingestion_status == IngestionStatus.FAILURE:
276+
LOGGER.error(
277+
f"Ingestion {ingestion_id} failed for table {table_ingestion_id_mapping.get(ingestion_id)}"
278+
)
279+
has_failures = True
280+
pbar.update(len(source_ids_copy))
281+
282+
LOGGER.info("All data ingestion attempts completed")
283+
return has_failures
284+
285+
286+
def handle_failures(kusto_client: KustoClient, database: str, operation_tag: str, has_failures: bool) -> bool:
287+
"""
288+
Handle any failures and perform rollbacks if needed.
289+
290+
Args:
291+
kusto_client: The Kusto client
292+
database: The database name
293+
operation_tag: The operation tag for tracking
294+
has_failures: Whether any failures occurred
295+
296+
Returns:
297+
bool: True if the process should abort, False otherwise
298+
"""
299+
if has_failures:
300+
LOGGER.warning(f"Failures detected during ingestion - dropping data with tag: {operation_tag}")
301+
_drop_by_tag(kusto_client, database, operation_tag)
302+
return True
303+
return False
304+
305+
224306
def clear_ingestion_status_queues(client: QueuedIngestClient, confirmation: bool = False):
225307
"""
226308
Clear all data in the ingestion status queues.

cosmotech/coal/azure/adx/store.py

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,87 @@
88
import os
99
import tempfile
1010
import uuid
11-
from typing import Optional
11+
from typing import Optional, List, Dict, Tuple, Union, Any
1212

1313
import pyarrow
1414
import pyarrow.csv as pc
1515
import time
16+
from azure.kusto.data import KustoClient
1617
from azure.kusto.data.data_format import DataFormat
1718
from azure.kusto.ingest import IngestionProperties
1819
from azure.kusto.ingest import QueuedIngestClient
1920
from azure.kusto.ingest import ReportLevel
2021
from cosmotech.orchestrator.utils.translate import T
2122
from time import perf_counter
2223

24+
from cosmotech.coal.azure.adx.tables import check_and_create_table, _drop_by_tag
25+
from cosmotech.coal.azure.adx.auth import initialize_clients
26+
from cosmotech.coal.azure.adx.ingestion import monitor_ingestion, handle_failures
2327
from cosmotech.coal.store.store import Store
2428
from cosmotech.coal.utils.logger import LOGGER
2529
from cosmotech.coal.utils.postgresql import send_pyarrow_table_to_postgresql
2630

2731

32+
def send_table_data(
33+
ingest_client: QueuedIngestClient, database: str, table_name: str, data: pyarrow.Table, operation_tag: str
34+
) -> Tuple[str, str]:
35+
"""
36+
Send a PyArrow table to ADX.
37+
38+
Args:
39+
ingest_client: The ingest client
40+
database: The database name
41+
table_name: The table name
42+
data: The PyArrow table data
43+
operation_tag: The operation tag for tracking
44+
45+
Returns:
46+
tuple: (source_id, table_name)
47+
"""
48+
LOGGER.debug(f"Sending data to the table {table_name}")
49+
result = send_pyarrow_table_to_adx(ingest_client, database, table_name, data, operation_tag)
50+
return result.source_id, table_name
51+
52+
53+
def process_tables(
54+
store: Store, kusto_client: KustoClient, ingest_client: QueuedIngestClient, database: str, operation_tag: str
55+
) -> Tuple[List[str], Dict[str, str]]:
56+
"""
57+
Process all tables in the store.
58+
59+
Args:
60+
store: The data store
61+
kusto_client: The Kusto client
62+
ingest_client: The ingest client
63+
database: The database name
64+
operation_tag: The operation tag for tracking
65+
66+
Returns:
67+
tuple: (source_ids, table_ingestion_id_mapping)
68+
"""
69+
source_ids = []
70+
table_ingestion_id_mapping = dict()
71+
72+
LOGGER.debug("Listing tables")
73+
table_list = list(store.list_tables())
74+
75+
for target_table_name in table_list:
76+
LOGGER.info(f"Working on table: {target_table_name}")
77+
data = store.get_table(target_table_name)
78+
79+
if data.num_rows < 1:
80+
LOGGER.warning(f"Table {target_table_name} has no rows - skipping it")
81+
continue
82+
83+
check_and_create_table(kusto_client, database, target_table_name, data)
84+
85+
source_id, _ = send_table_data(ingest_client, database, target_table_name, data, operation_tag)
86+
source_ids.append(source_id)
87+
table_ingestion_id_mapping[source_id] = target_table_name
88+
89+
return source_ids, table_ingestion_id_mapping
90+
91+
2892
def send_pyarrow_table_to_adx(
2993
client: QueuedIngestClient,
3094
database: str,
@@ -52,6 +116,68 @@ def send_pyarrow_table_to_adx(
52116
os.unlink(temp_file_path)
53117

54118

119+
def send_store_to_adx(
120+
adx_uri: str,
121+
adx_ingest_uri: str,
122+
database_name: str,
123+
wait: bool = False,
124+
tag: Optional[str] = None,
125+
store_location: Optional[str] = None,
126+
) -> Union[bool, Any]:
127+
"""
128+
Send data from the store to Azure Data Explorer.
129+
130+
Args:
131+
adx_uri: The Azure Data Explorer resource URI
132+
adx_ingest_uri: The Azure Data Explorer resource ingest URI
133+
database_name: The database name
134+
wait: Whether to wait for ingestion to complete
135+
tag: The operation tag for tracking (will generate a unique one if not provided)
136+
store_location: Optional store location (uses default if not provided)
137+
138+
Returns:
139+
bool: True if successful, False otherwise
140+
"""
141+
# Generate a unique operation tag if none provided
142+
operation_tag = tag or f"op-{str(uuid.uuid4())}"
143+
LOGGER.debug(f"Starting ingestion operation with tag: {operation_tag}")
144+
145+
# Initialize clients
146+
kusto_client, ingest_client = initialize_clients(adx_uri, adx_ingest_uri)
147+
database = database_name
148+
149+
# Load datastore
150+
LOGGER.debug("Loading datastore")
151+
store = Store(store_location=store_location)
152+
153+
try:
154+
# Process tables
155+
source_ids, table_ingestion_id_mapping = process_tables(
156+
store, kusto_client, ingest_client, database, operation_tag
157+
)
158+
159+
LOGGER.info("Store data was sent for ADX ingestion")
160+
161+
# Monitor ingestion if wait is True
162+
has_failures = False
163+
if wait and source_ids:
164+
has_failures = monitor_ingestion(ingest_client, source_ids, table_ingestion_id_mapping)
165+
166+
# Handle failures
167+
should_abort = handle_failures(kusto_client, database, operation_tag, has_failures)
168+
if should_abort:
169+
return False
170+
171+
return True
172+
173+
except Exception as e:
174+
LOGGER.exception("Error during ingestion process")
175+
# Perform rollback using the tag
176+
LOGGER.warning(f"Dropping data with tag: {operation_tag}")
177+
_drop_by_tag(kusto_client, database, operation_tag)
178+
raise e
179+
180+
55181
def dump_store_to_adx(
56182
store_folder: str,
57183
postgres_host: str,

cosmotech/coal/azure/adx/tables.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
# etc., to any person is prohibited unless it has been previously and
66
# specifically authorized by written means by Cosmo Tech.
77

8-
from typing import Dict
8+
from typing import Dict, Any
99

10+
import pyarrow
1011
from azure.kusto.data import KustoClient
1112

1213
from cosmotech.coal.utils.logger import LOGGER
@@ -39,6 +40,51 @@ def table_exists(client: KustoClient, database: str, table_name: str) -> bool:
3940
return False
4041

4142

43+
def check_and_create_table(kusto_client: KustoClient, database: str, table_name: str, data: pyarrow.Table) -> bool:
44+
"""
45+
Check if a table exists and create it if it doesn't.
46+
47+
Args:
48+
kusto_client: The Kusto client
49+
database: The database name
50+
table_name: The table name
51+
data: The PyArrow table data
52+
53+
Returns:
54+
bool: True if the table was created, False if it already existed
55+
"""
56+
LOGGER.debug(" - Checking if table exists")
57+
if not table_exists(kusto_client, database, table_name):
58+
from cosmotech.coal.azure.adx.utils import create_column_mapping
59+
60+
mapping = create_column_mapping(data)
61+
LOGGER.debug(" - Does not exist, creating it")
62+
create_table(kusto_client, database, table_name, mapping)
63+
return True
64+
return False
65+
66+
67+
def _drop_by_tag(kusto_client: KustoClient, database: str, tag: str) -> None:
68+
"""
69+
Drop all data with the specified tag.
70+
71+
Args:
72+
kusto_client: The Kusto client
73+
database: The database name
74+
tag: The tag to drop data by
75+
"""
76+
LOGGER.info(f"Dropping data with tag: {tag}")
77+
78+
try:
79+
# Execute the drop by tag command
80+
drop_command = f'.drop extents <| .show database extents where tags has "drop-by:{tag}"'
81+
kusto_client.execute_mgmt(database, drop_command)
82+
LOGGER.info("Drop by tag operation completed")
83+
except Exception as e:
84+
LOGGER.error(f"Error during drop by tag operation: {str(e)}")
85+
LOGGER.exception("Drop by tag details")
86+
87+
4288
def create_table(client: KustoClient, database: str, table_name: str, schema: Dict[str, str]) -> bool:
4389
"""
4490
Create a table in the database.

0 commit comments

Comments
 (0)