Skip to content

Commit e632e97

Browse files
committed
wip commit : adx store ingestion
1 parent 8d40eea commit e632e97

File tree

9 files changed

+397
-120
lines changed

9 files changed

+397
-120
lines changed

cosmotech/coal/azure/adx/ingestion.py

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,28 @@
55
# etc., to any person is prohibited unless it has been previously and
66
# specifically authorized by written means by Cosmo Tech.
77

8-
import time
98
from enum import Enum
10-
from typing import Iterator, List, Dict, Tuple, Optional, Union
9+
from typing import Dict
10+
from typing import Iterator
11+
from typing import List
12+
from typing import Optional
13+
from typing import Tuple
1114

1215
import pandas as pd
16+
import time
1317
from azure.kusto.data import KustoClient
1418
from azure.kusto.data.data_format import DataFormat
15-
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, ReportLevel
16-
from azure.kusto.ingest.status import (
17-
KustoIngestStatusQueues,
18-
SuccessMessage,
19-
FailureMessage,
20-
)
21-
22-
from cosmotech.coal.utils.logger import LOGGER
19+
from azure.kusto.ingest import IngestionProperties
20+
from azure.kusto.ingest import QueuedIngestClient
21+
from azure.kusto.ingest import ReportLevel
22+
from azure.kusto.ingest.status import FailureMessage
23+
from azure.kusto.ingest.status import KustoIngestStatusQueues
24+
from azure.kusto.ingest.status import SuccessMessage
2325
from cosmotech.orchestrator.utils.translate import T
24-
from cosmotech.coal.azure.adx.tables import table_exists, create_table
26+
27+
from cosmotech.coal.azure.adx.tables import create_table
2528
from cosmotech.coal.azure.adx.utils import type_mapping
29+
from cosmotech.coal.utils.logger import LOGGER
2630

2731

2832
class IngestionStatus(Enum):
@@ -179,15 +183,13 @@ def get_messages(queues):
179183
successes = get_messages(qs.success._get_queues())
180184
failures = get_messages(qs.failure._get_queues())
181185

182-
if logs:
183-
LOGGER.debug(T("coal.logs.adx.status_messages").format(success=len(successes), failure=len(failures)))
186+
LOGGER.debug(T("coal.logs.adx.status_messages").format(success=len(successes), failure=len(failures)))
184187

185188
non_sent_ids = remaining_ids[:]
186-
187189
# Process success and failure messages
188-
for messages, cast_func, status in [
189-
(successes, SuccessMessage, IngestionStatus.SUCCESS),
190-
(failures, FailureMessage, IngestionStatus.FAILURE),
190+
for messages, cast_func, status, log_function in [
191+
(successes, SuccessMessage, IngestionStatus.SUCCESS, LOGGER.debug),
192+
(failures, FailureMessage, IngestionStatus.FAILURE, LOGGER.error),
191193
]:
192194
for _q, _m in messages:
193195
dm = cast_func(_m.content)
@@ -197,8 +199,7 @@ def get_messages(queues):
197199
if dm.IngestionSourceId == str(source_id):
198200
_ingest_status[source_id] = status
199201

200-
if logs:
201-
LOGGER.debug(T("coal.logs.adx.status_found").format(source_id=source_id, status=status.value))
202+
log_function(T("coal.logs.adx.status_found").format(source_id=source_id, status=status.value))
202203

203204
_q.delete_message(_m)
204205
remaining_ids.remove(source_id)

cosmotech/coal/azure/adx/store.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
8+
import os
9+
import tempfile
10+
import uuid
11+
from typing import Optional
12+
13+
import pyarrow
14+
import pyarrow.csv as pc
15+
import time
16+
from azure.kusto.data.data_format import DataFormat
17+
from azure.kusto.ingest import IngestionProperties
18+
from azure.kusto.ingest import QueuedIngestClient
19+
from azure.kusto.ingest import ReportLevel
20+
from cosmotech.orchestrator.utils.translate import T
21+
from time import perf_counter
22+
23+
from cosmotech.coal.store.store import Store
24+
from cosmotech.coal.utils.logger import LOGGER
25+
from cosmotech.coal.utils.postgresql import send_pyarrow_table_to_postgresql
26+
27+
28+
def send_pyarrow_table_to_adx(
29+
client: QueuedIngestClient,
30+
database: str,
31+
table_name: str,
32+
table_data: pyarrow.Table,
33+
drop_by_tag: Optional[str] = None,
34+
):
35+
drop_by_tags = [drop_by_tag] if (drop_by_tag is not None) else None
36+
37+
properties = IngestionProperties(
38+
database=database,
39+
table=table_name,
40+
data_format=DataFormat.CSV,
41+
drop_by_tags=drop_by_tags,
42+
report_level=ReportLevel.FailuresAndSuccesses,
43+
)
44+
45+
file_name = f"adx_{database}_{table_name}_{int(time.time())}_{uuid.uuid4()}.csv"
46+
temp_file_path = os.path.join(os.environ.get("CSM_TEMP_ABSOLUTE_PATH", tempfile.gettempdir()), file_name)
47+
pc.write_csv(table_data, temp_file_path, pc.WriteOptions(include_header=False))
48+
try:
49+
return client.ingest_from_file(temp_file_path, properties)
50+
finally:
51+
os.unlink(temp_file_path)
52+
53+
54+
def dump_store_to_adx(
55+
store_folder: str,
56+
postgres_host: str,
57+
postgres_port: int,
58+
postgres_db: str,
59+
postgres_schema: str,
60+
postgres_user: str,
61+
postgres_password: str,
62+
table_prefix: str = "Cosmotech_",
63+
replace: bool = True,
64+
) -> None:
65+
"""
66+
Dump Store data to an Azure Data Explorer database.
67+
68+
Args:
69+
store_folder: Folder containing the Store
70+
postgres_host: PostgreSQL host
71+
postgres_port: PostgreSQL port
72+
postgres_db: PostgreSQL database name
73+
postgres_schema: PostgreSQL schema
74+
postgres_user: PostgreSQL username
75+
postgres_password: PostgreSQL password
76+
table_prefix: Table prefix
77+
replace: Whether to replace existing tables
78+
"""
79+
_s = Store(store_location=store_folder)
80+
81+
tables = list(_s.list_tables())
82+
if len(tables):
83+
LOGGER.info(T("coal.logs.database.sending_data").format(table=f"{postgres_db}.{postgres_schema}"))
84+
total_rows = 0
85+
_process_start = perf_counter()
86+
for table_name in tables:
87+
_s_time = perf_counter()
88+
target_table_name = f"{table_prefix}{table_name}"
89+
LOGGER.info(T("coal.logs.database.table_entry").format(table=target_table_name))
90+
data = _s.get_table(table_name)
91+
if not len(data):
92+
LOGGER.info(T("coal.logs.database.no_rows"))
93+
continue
94+
_dl_time = perf_counter()
95+
rows = send_pyarrow_table_to_postgresql(
96+
data,
97+
target_table_name,
98+
postgres_host,
99+
postgres_port,
100+
postgres_db,
101+
postgres_schema,
102+
postgres_user,
103+
postgres_password,
104+
replace,
105+
)
106+
total_rows += rows
107+
_up_time = perf_counter()
108+
LOGGER.info(T("coal.logs.database.row_count").format(count=rows))
109+
LOGGER.debug(
110+
T("coal.logs.progress.operation_timing").format(
111+
operation="Load from datastore", time=f"{_dl_time - _s_time:0.3}"
112+
)
113+
)
114+
LOGGER.debug(
115+
T("coal.logs.progress.operation_timing").format(
116+
operation="Send to postgresql", time=f"{_up_time - _dl_time:0.3}"
117+
)
118+
)
119+
_process_end = perf_counter()
120+
LOGGER.info(
121+
T("coal.logs.database.rows_fetched").format(
122+
table="all tables",
123+
count=total_rows,
124+
time=f"{_process_end - _process_start:0.3}",
125+
)
126+
)
127+
else:
128+
LOGGER.info(T("coal.logs.database.store_empty"))
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Copyright (C) - 2023 - 2025 - Cosmo Tech
2+
# This document and all information contained herein is the exclusive property -
3+
# including all intellectual property rights pertaining thereto - of Cosmo Tech.
4+
# Any use, reproduction, translation, broadcasting, transmission, distribution,
5+
# etc., to any person is prohibited unless it has been previously and
6+
# specifically authorized by written means by Cosmo Tech.
7+
from cosmotech.orchestrator.utils.translate import T
8+
import os
9+
10+
from cosmotech.coal.utils.logger import LOGGER
11+
from cosmotech.csm_data.utils.click import click
12+
13+
14+
@click.command()
15+
@click.option(
16+
"--adx-uri",
17+
envvar="AZURE_DATA_EXPLORER_RESOURCE_URI",
18+
show_envvar=True,
19+
required=True,
20+
metavar="URI",
21+
help=T("csm-data.commands.storage.adx_send_runnerdata.parameters.adx_uri"),
22+
)
23+
@click.option(
24+
"--adx-ingest-uri",
25+
envvar="AZURE_DATA_EXPLORER_RESOURCE_INGEST_URI",
26+
show_envvar=True,
27+
required=True,
28+
metavar="URI",
29+
help=T("csm-data.commands.storage.adx_send_runnerdata.parameters.adx_ingest_uri"),
30+
)
31+
@click.option(
32+
"--database-name",
33+
envvar="AZURE_DATA_EXPLORER_DATABASE_NAME",
34+
show_envvar=True,
35+
required=True,
36+
metavar="NAME",
37+
help=T("csm-data.commands.storage.adx_send_runnerdata.parameters.database_name"),
38+
)
39+
@click.option(
40+
"--wait/--no-wait",
41+
"wait",
42+
envvar="CSM_DATA_ADX_WAIT_INGESTION",
43+
show_envvar=True,
44+
default=False,
45+
show_default=True,
46+
help="Wait for ingestion to complete",
47+
)
48+
def adx_send_data(
49+
adx_uri: str,
50+
adx_ingest_uri: str,
51+
database_name: str,
52+
wait: bool,
53+
):
54+
# Import the function at the start of the command
55+
from cosmotech.coal.azure.adx.auth import create_ingest_client, create_kusto_client
56+
from cosmotech.coal.azure.adx.store import send_pyarrow_table_to_adx
57+
from cosmotech.coal.store.store import Store
58+
from cosmotech.coal.azure.adx import check_ingestion_status
59+
from cosmotech.coal.azure.adx import create_table
60+
from cosmotech.coal.azure.adx import table_exists
61+
from cosmotech.coal.azure.adx import type_mapping
62+
63+
import time
64+
from cosmotech.coal.azure.adx import IngestionStatus
65+
66+
LOGGER.debug("Initializing clients")
67+
kusto_client = create_kusto_client(adx_uri)
68+
ingest_client = create_ingest_client(adx_ingest_uri)
69+
database = database_name
70+
71+
LOGGER.debug("Loading datastore")
72+
s = Store()
73+
source_ids = []
74+
LOGGER.debug("Listing tables")
75+
table_list = list(s.list_tables())[:3]
76+
table_ingestion_id_mapping = dict()
77+
for target_table_name in table_list:
78+
LOGGER.info(f"Working on table: {target_table_name}")
79+
data = s.get_table(target_table_name)
80+
81+
if data.num_rows < 1:
82+
LOGGER.warn(f"Table {target_table_name} has no rows - skipping it")
83+
continue
84+
85+
LOGGER.debug(" - Checking if table exists")
86+
if not table_exists(kusto_client, database, target_table_name):
87+
mapping = dict()
88+
for column_name in data.column_names:
89+
column = data.column(column_name)
90+
try:
91+
ex = next(v for v in column.to_pylist() if v is not None)
92+
except StopIteration:
93+
LOGGER.error(f"Column {column_name} has no content, defaulting it to string")
94+
mapping[column_name] = type_mapping(column_name, "string")
95+
continue
96+
else:
97+
mapping[column_name] = type_mapping(column_name, ex)
98+
LOGGER.debug(" - Does not exist, creating it")
99+
create_table(kusto_client, database, target_table_name, mapping)
100+
101+
LOGGER.debug(f"Sending data to the table {target_table_name}")
102+
result = send_pyarrow_table_to_adx(ingest_client, database, target_table_name, data, None)
103+
source_ids.append(result.source_id)
104+
table_ingestion_id_mapping[result.source_id] = target_table_name
105+
106+
LOGGER.info("Store data was sent for ADX ingestion")
107+
if wait:
108+
LOGGER.info("Waiting for ingestion of data to finish")
109+
import tqdm
110+
111+
with tqdm.tqdm(desc="Ingestion status", total=len(source_ids)) as pbar:
112+
while any(
113+
map(
114+
lambda _status: _status[1] in (IngestionStatus.QUEUED, IngestionStatus.UNKNOWN),
115+
results := list(check_ingestion_status(ingest_client, source_ids)),
116+
)
117+
):
118+
cleared_ids = list(
119+
result for result in results if result[1] not in (IngestionStatus.QUEUED, IngestionStatus.UNKNOWN)
120+
)
121+
122+
for ingestion_id, ingestion_status in cleared_ids:
123+
pbar.update(1)
124+
source_ids.remove(ingestion_id)
125+
126+
if os.environ.get("CSM_USE_RICH", "False").lower() in ("true", "1", "yes", "t", "y"):
127+
for _ in range(10):
128+
time.sleep(1)
129+
pbar.update(0)
130+
else:
131+
time.sleep(10)
132+
pbar.update(len(source_ids))
133+
LOGGER.info("All data got ingested")
134+
135+
136+
if __name__ == "__main__":
137+
adx_send_data()

cosmotech/csm_data/main.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from cosmotech.csm_data.commands.s3_bucket_download import s3_bucket_download
1414
from cosmotech.csm_data.commands.s3_bucket_upload import s3_bucket_upload
1515
from cosmotech.csm_data.commands.s3_bucket_delete import s3_bucket_delete
16+
from cosmotech.csm_data.commands.adx_send_data import adx_send_data
1617
from cosmotech.csm_data.commands.store.store import store
1718
from cosmotech.csm_data.utils.click import click
1819
from cosmotech.csm_data.utils.decorators import translate_help, web_help
@@ -50,6 +51,7 @@ def main():
5051
main.add_command(s3_bucket_delete, "s3-bucket-delete")
5152
main.add_command(adx_send_runnerdata, "adx-send-runnerdata")
5253
main.add_command(az_storage_upload, "az-storage-upload")
54+
main.add_command(adx_send_data, "adx-send-data")
5355

5456
if __name__ == "__main__":
5557
main()

docs/scripts/generate_index.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
from cosmotech.coal import __version__
66

77
_md_file: IO
8-
with mkdocs_gen_files.open("index.md", "w") as _md_file, open("docs/scripts/index.md.template") as index_template, open(
9-
"README.md"
10-
) as readme:
8+
with (
9+
mkdocs_gen_files.open("index.md", "w") as _md_file,
10+
open("docs/scripts/index.md.template") as index_template,
11+
open("README.md") as readme,
12+
):
1113
_index: list[str] = index_template.readlines()
1214
_readme_content = readme.readlines()
1315
for _line in _index:

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ adbc-driver-postgresql~=1.1.0
3434
click~=8.1.7
3535
rich-click~=1.7.3
3636
click-log~=0.4.0
37+
tqdm~=4.67.1
3738

3839
# Other requirements
3940
openpyxl~=3.1

0 commit comments

Comments
 (0)