Skip to content

Commit d949d8a

Browse files
committed
remove ADXQueriesWrapper
1 parent 75a849f commit d949d8a

File tree

5 files changed

+98
-537
lines changed

5 files changed

+98
-537
lines changed

cosmotech/coal/azure/adx/ingestion.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,6 @@ def get_messages(queues):
207207
else:
208208
# The message did not correspond to a known ID
209209
continue
210-
else:
211-
# No message was found on the current list of messages for the given IDs
212-
continue
213210

214211
# Check for timeouts
215212
actual_timeout = timeout if timeout is not None else default_timeout

cosmotech/coal/azure/adx/runner.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@
2424
from azure.kusto.ingest import IngestionResult
2525
from azure.kusto.ingest import ReportLevel
2626

27-
from cosmotech.coal.azure.adx.wrapper import ADXQueriesWrapper
28-
from cosmotech.coal.azure.adx.wrapper import IngestionStatus
27+
from azure.kusto.data import KustoClient
28+
from azure.kusto.ingest import QueuedIngestClient
29+
30+
from cosmotech.coal.azure.adx.auth import initialize_clients
31+
from cosmotech.coal.azure.adx.query import run_query, run_command_query
32+
from cosmotech.coal.azure.adx.ingestion import check_ingestion_status, IngestionStatus
2933
from cosmotech.coal.utils.logger import LOGGER
3034
from cosmotech.orchestrator.utils.translate import T
3135

@@ -79,7 +83,8 @@ def construct_create_query(files_data: Dict[str, Dict[str, Any]]) -> Dict[str, s
7983

8084
def insert_csv_files(
8185
files_data: Dict[str, Dict[str, Any]],
82-
adx_client: ADXQueriesWrapper,
86+
kusto_client: KustoClient,
87+
ingest_client: QueuedIngestClient,
8388
runner_id: str,
8489
database: str,
8590
wait: bool = False,
@@ -91,7 +96,8 @@ def insert_csv_files(
9196
9297
Args:
9398
files_data: Map of filename to file_infos as returned by prepare_csv_content
94-
adx_client: ADX client wrapper
99+
kusto_client: The KustoClient for querying
100+
ingest_client: The QueuedIngestClient for ingestion
95101
runner_id: Runner ID to use as a tag
96102
database: ADX database name
97103
wait: Whether to wait for ingestion to complete
@@ -131,14 +137,14 @@ def insert_csv_files(
131137
additional_properties={"ignoreFirstRecord": "true"},
132138
)
133139
LOGGER.info(T("coal.logs.ingestion.ingesting").format(table=filename))
134-
results: IngestionResult = adx_client.ingest_client.ingest_from_file(fd, ingestion_properties)
140+
results: IngestionResult = ingest_client.ingest_from_file(fd, ingestion_properties)
135141
ingestion_ids[str(results.source_id)] = filename
136142
if wait:
137143
count = 0
138144
while any(
139145
map(
140146
lambda s: s[1] in (IngestionStatus.QUEUED, IngestionStatus.UNKNOWN),
141-
adx_client.check_ingestion_status(source_ids=list(ingestion_ids.keys())),
147+
check_ingestion_status(ingest_client, source_ids=list(ingestion_ids.keys())),
142148
)
143149
):
144150
count += 1
@@ -151,7 +157,7 @@ def insert_csv_files(
151157
time.sleep(wait_duration)
152158

153159
LOGGER.info(T("coal.logs.ingestion.status"))
154-
for _id, status in adx_client.check_ingestion_status(source_ids=list(ingestion_ids.keys())):
160+
for _id, status in check_ingestion_status(ingest_client, source_ids=list(ingestion_ids.keys())):
155161
color = (
156162
"red"
157163
if status == IngestionStatus.FAILURE
@@ -197,10 +203,10 @@ def send_runner_data(
197203
if send_datasets:
198204
csv_data.update(prepare_csv_content(dataset_absolute_path))
199205
queries = construct_create_query(csv_data)
200-
adx_client = ADXQueriesWrapper(database=database_name, cluster_url=adx_uri, ingest_url=adx_ingest_uri)
206+
kusto_client, ingest_client = initialize_clients(adx_uri, adx_ingest_uri)
201207
for k, v in queries.items():
202208
LOGGER.info(T("coal.logs.ingestion.creating_table").format(query=v))
203-
r: KustoResponseDataSet = adx_client.run_query(v)
209+
r: KustoResponseDataSet = run_query(kusto_client, database_name, v)
204210
if r.errors_count == 0:
205211
LOGGER.info(T("coal.logs.ingestion.table_created").format(table=k))
206212
else:
@@ -209,7 +215,8 @@ def send_runner_data(
209215
raise RuntimeError(f"Failed to create table {k}")
210216
insert_csv_files(
211217
files_data=csv_data,
212-
adx_client=adx_client,
218+
kusto_client=kusto_client,
219+
ingest_client=ingest_client,
213220
runner_id=runner_id,
214221
database=database_name,
215222
wait=wait,

cosmotech/coal/azure/adx/wrapper.py

Lines changed: 0 additions & 188 deletions
This file was deleted.

0 commit comments

Comments
 (0)