Skip to content

Commit 9e0c3ba

Browse files
committed
Add more logging
1 parent d949d8a commit 9e0c3ba

File tree

14 files changed

+127
-70
lines changed

14 files changed

+127
-70
lines changed

cosmotech/coal/azure/adx/auth.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def initialize_clients(adx_uri: str, adx_ingest_uri: str) -> Tuple[KustoClient,
9898
Returns:
9999
tuple: (kusto_client, ingest_client)
100100
"""
101-
LOGGER.debug("Initializing clients")
101+
LOGGER.debug(T("coal.logs.adx.auth.initializing_clients"))
102102
kusto_client = create_kusto_client(adx_uri)
103103
ingest_client = create_ingest_client(adx_ingest_uri)
104104
return kusto_client, ingest_client

cosmotech/coal/azure/adx/ingestion.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def monitor_ingestion(
237237
has_failures = False
238238
source_ids_copy = source_ids.copy()
239239

240-
LOGGER.info("Waiting for ingestion of data to finish")
240+
LOGGER.info(T("coal.logs.adx.waiting_ingestion"))
241241

242242
with tqdm.tqdm(desc="Ingestion status", total=len(source_ids_copy)) as pbar:
243243
while any(
@@ -252,7 +252,9 @@ def monitor_ingestion(
252252
for ingestion_id, ingestion_status in results:
253253
if ingestion_status == IngestionStatus.FAILURE:
254254
LOGGER.error(
255-
f"Ingestion {ingestion_id} failed for table {table_ingestion_id_mapping.get(ingestion_id)}"
255+
T("coal.logs.adx.ingestion_failed").format(
256+
ingestion_id=ingestion_id, table=table_ingestion_id_mapping.get(ingestion_id)
257+
)
256258
)
257259
has_failures = True
258260

@@ -271,12 +273,14 @@ def monitor_ingestion(
271273
for ingestion_id, ingestion_status in results:
272274
if ingestion_status == IngestionStatus.FAILURE:
273275
LOGGER.error(
274-
f"Ingestion {ingestion_id} failed for table {table_ingestion_id_mapping.get(ingestion_id)}"
276+
T("coal.logs.adx.ingestion_failed").format(
277+
ingestion_id=ingestion_id, table=table_ingestion_id_mapping.get(ingestion_id)
278+
)
275279
)
276280
has_failures = True
277281
pbar.update(len(source_ids_copy))
278282

279-
LOGGER.info("All data ingestion attempts completed")
283+
LOGGER.info(T("coal.logs.adx.ingestion_completed"))
280284
return has_failures
281285

282286

@@ -294,7 +298,7 @@ def handle_failures(kusto_client: KustoClient, database: str, operation_tag: str
294298
bool: True if the process should abort, False otherwise
295299
"""
296300
if has_failures:
297-
LOGGER.warning(f"Failures detected during ingestion - dropping data with tag: {operation_tag}")
301+
LOGGER.warning(T("coal.logs.adx.failures_detected").format(operation_tag=operation_tag))
298302
_drop_by_tag(kusto_client, database, operation_tag)
299303
return True
300304
return False

cosmotech/coal/azure/adx/runner.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def prepare_csv_content(folder_path: str) -> Dict[str, Dict[str, Any]]:
5757
cols = {k.strip(): "string" for k in headers}
5858
csv_datas = {"filename": _file.name.removesuffix(".csv"), "headers": cols}
5959
content[str(_file)] = csv_datas
60-
LOGGER.debug(content)
60+
LOGGER.debug(T("coal.logs.adx.runner.content_debug").format(content=content))
6161

6262
return content
6363

@@ -211,7 +211,7 @@ def send_runner_data(
211211
LOGGER.info(T("coal.logs.ingestion.table_created").format(table=k))
212212
else:
213213
LOGGER.error(T("coal.logs.ingestion.table_creation_failed").format(table=k))
214-
LOGGER.error(r.get_exceptions())
214+
LOGGER.error(T("coal.logs.ingestion.exceptions").format(exceptions=r.get_exceptions()))
215215
raise RuntimeError(f"Failed to create table {k}")
216216
insert_csv_files(
217217
files_data=csv_data,

cosmotech/coal/azure/adx/store.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def send_table_data(
4545
Returns:
4646
tuple: (source_id, table_name)
4747
"""
48-
LOGGER.debug(f"Sending data to the table {table_name}")
48+
LOGGER.debug(T("coal.logs.adx.store.sending_data").format(table_name=table_name))
4949
result = send_pyarrow_table_to_adx(ingest_client, database, table_name, data, operation_tag)
5050
return result.source_id, table_name
5151

@@ -69,15 +69,15 @@ def process_tables(
6969
source_ids = []
7070
table_ingestion_id_mapping = dict()
7171

72-
LOGGER.debug("Listing tables")
72+
LOGGER.debug(T("coal.logs.adx.store.listing_tables"))
7373
table_list = list(store.list_tables())
7474

7575
for target_table_name in table_list:
76-
LOGGER.info(f"Working on table: {target_table_name}")
76+
LOGGER.info(T("coal.logs.adx.store.working_on_table").format(table_name=target_table_name))
7777
data = store.get_table(target_table_name)
7878

7979
if data.num_rows < 1:
80-
LOGGER.warning(f"Table {target_table_name} has no rows - skipping it")
80+
LOGGER.warning(T("coal.logs.adx.store.table_empty").format(table_name=target_table_name))
8181
continue
8282

8383
check_and_create_table(kusto_client, database, target_table_name, data)
@@ -140,14 +140,14 @@ def send_store_to_adx(
140140
"""
141141
# Generate a unique operation tag if none provided
142142
operation_tag = tag or f"op-{str(uuid.uuid4())}"
143-
LOGGER.debug(f"Starting ingestion operation with tag: {operation_tag}")
143+
LOGGER.debug(T("coal.logs.adx.store.starting_ingestion").format(operation_tag=operation_tag))
144144

145145
# Initialize clients
146146
kusto_client, ingest_client = initialize_clients(adx_uri, adx_ingest_uri)
147147
database = database_name
148148

149149
# Load datastore
150-
LOGGER.debug("Loading datastore")
150+
LOGGER.debug(T("coal.logs.adx.store.loading_datastore"))
151151
store = Store(store_location=store_location)
152152

153153
try:
@@ -156,7 +156,7 @@ def send_store_to_adx(
156156
store, kusto_client, ingest_client, database, operation_tag
157157
)
158158

159-
LOGGER.info("Store data was sent for ADX ingestion")
159+
LOGGER.info(T("coal.logs.adx.store.data_sent"))
160160

161161
# Monitor ingestion if wait is True
162162
has_failures = False
@@ -171,9 +171,9 @@ def send_store_to_adx(
171171
return True
172172

173173
except Exception as e:
174-
LOGGER.exception("Error during ingestion process")
174+
LOGGER.exception(T("coal.logs.adx.store.ingestion_error"))
175175
# Perform rollback using the tag
176-
LOGGER.warning(f"Dropping data with tag: {operation_tag}")
176+
LOGGER.warning(T("coal.logs.adx.store.dropping_data").format(operation_tag=operation_tag))
177177
_drop_by_tag(kusto_client, database, operation_tag)
178178
raise e
179179

cosmotech/coal/azure/adx/tables.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,12 @@ def check_and_create_table(kusto_client: KustoClient, database: str, table_name:
5353
Returns:
5454
bool: True if the table was created, False if it already existed
5555
"""
56-
LOGGER.debug(" - Checking if table exists")
56+
LOGGER.debug(T("coal.logs.adx.checking_table_exists"))
5757
if not table_exists(kusto_client, database, table_name):
5858
from cosmotech.coal.azure.adx.utils import create_column_mapping
5959

6060
mapping = create_column_mapping(data)
61-
LOGGER.debug(" - Does not exist, creating it")
61+
LOGGER.debug(T("coal.logs.adx.creating_nonexistent_table"))
6262
create_table(kusto_client, database, table_name, mapping)
6363
return True
6464
return False
@@ -73,16 +73,16 @@ def _drop_by_tag(kusto_client: KustoClient, database: str, tag: str) -> None:
7373
database: The database name
7474
tag: The tag to drop data by
7575
"""
76-
LOGGER.info(f"Dropping data with tag: {tag}")
76+
LOGGER.info(T("coal.logs.adx.dropping_data_by_tag").format(tag=tag))
7777

7878
try:
7979
# Execute the drop by tag command
8080
drop_command = f'.drop extents <| .show database extents where tags has "drop-by:{tag}"'
8181
kusto_client.execute_mgmt(database, drop_command)
82-
LOGGER.info("Drop by tag operation completed")
82+
LOGGER.info(T("coal.logs.adx.drop_completed"))
8383
except Exception as e:
84-
LOGGER.error(f"Error during drop by tag operation: {str(e)}")
85-
LOGGER.exception("Drop by tag details")
84+
LOGGER.error(T("coal.logs.adx.drop_error").format(error=str(e)))
85+
LOGGER.exception(T("coal.logs.adx.drop_details"))
8686

8787

8888
def create_table(client: KustoClient, database: str, table_name: str, schema: Dict[str, str]) -> bool:

cosmotech/coal/azure/adx/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def create_column_mapping(data: pyarrow.Table) -> Dict[str, str]:
3030
try:
3131
ex = next(v for v in column.to_pylist() if v is not None)
3232
except StopIteration:
33-
LOGGER.error(f"Column {column_name} has no content, defaulting it to string")
33+
LOGGER.error(T("coal.logs.adx.utils.empty_column").format(column_name=column_name))
3434
mapping[column_name] = type_mapping(column_name, "string")
3535
continue
3636
else:

cosmotech/coal/cosmotech_api/parameters.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing import List, Dict, Any
1919

2020
from cosmotech.coal.utils.logger import LOGGER
21+
from cosmotech.orchestrator.utils.translate import T
2122

2223

2324
def write_parameters(
@@ -34,14 +35,14 @@ def write_parameters(
3435
"""
3536
if write_csv:
3637
tmp_parameter_file = os.path.join(parameter_folder, "parameters.csv")
37-
LOGGER.info(f"Generating {tmp_parameter_file}")
38+
LOGGER.info(T("coal.logs.runner.generating_file").format(file=tmp_parameter_file))
3839
with open(tmp_parameter_file, "w") as _file:
3940
_w = DictWriter(_file, fieldnames=["parameterId", "value", "varType", "isInherited"])
4041
_w.writeheader()
4142
_w.writerows(parameters)
4243

4344
if write_json:
4445
tmp_parameter_file = os.path.join(parameter_folder, "parameters.json")
45-
LOGGER.info(f"Generating {tmp_parameter_file}")
46+
LOGGER.info(T("coal.logs.runner.generating_file").format(file=tmp_parameter_file))
4647
with open(tmp_parameter_file, "w") as _file:
4748
json.dump(parameters, _file, indent=2)

cosmotech/coal/cosmotech_api/run_data.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def send_csv_to_run_data(
4747
source_dir = pathlib.Path(source_folder)
4848

4949
if not source_dir.exists():
50-
LOGGER.error(f"{source_dir} does not exists")
50+
LOGGER.error(T("coal.errors.file_system.file_not_found").format(source_folder=source_dir))
5151
raise FileNotFoundError(f"{source_dir} does not exist")
5252

5353
with get_api_client()[0] as api_client:
@@ -56,8 +56,8 @@ def send_csv_to_run_data(
5656
with open(csv_path) as _f:
5757
dr = DictReader(_f)
5858
table_name = csv_path.name.replace(".csv", "")
59-
LOGGER.info(f"Sending data to table CD_{table_name}")
60-
LOGGER.debug(f" - Column list: {dr.fieldnames}")
59+
LOGGER.info(T("coal.logs.run_data.sending_to_table").format(table_name=f"CD_{table_name}"))
60+
LOGGER.debug(T("coal.logs.database.column_list").format(columns=dr.fieldnames))
6161
data = []
6262

6363
for row in dr:
@@ -72,7 +72,7 @@ def send_csv_to_run_data(
7272
n_row[k] = v
7373
data.append(n_row)
7474

75-
LOGGER.info(f" - Sending {len(data)} rows")
75+
LOGGER.info(T("coal.logs.database.row_count").format(count=len(data)))
7676
api_run.send_run_data(
7777
organization_id,
7878
workspace_id,
@@ -102,25 +102,25 @@ def send_store_to_run_data(
102102
source_dir = pathlib.Path(store_folder)
103103

104104
if not source_dir.exists():
105-
LOGGER.error(f"{source_dir} does not exists")
105+
LOGGER.error(T("coal.errors.file_system.file_not_found").format(source_folder=source_dir))
106106
raise FileNotFoundError(f"{source_dir} does not exist")
107107

108108
with get_api_client()[0] as api_client:
109109
api_run = RunApi(api_client)
110110
_s = Store()
111111
for table_name in _s.list_tables():
112-
LOGGER.info(f"Sending data to table CD_{table_name}")
112+
LOGGER.info(T("coal.logs.run_data.sending_to_table").format(table_name=f"CD_{table_name}"))
113113
data = convert_table_as_pylist(table_name)
114114
if not len(data):
115-
LOGGER.info(" - No rows : skipping")
115+
LOGGER.info(T("coal.logs.database.no_rows"))
116116
continue
117117
fieldnames = _s.get_table_schema(table_name).names
118118
for row in data:
119119
for field in fieldnames:
120120
if row[field] is None:
121121
del row[field]
122-
LOGGER.debug(f" - Column list: {fieldnames}")
123-
LOGGER.info(f" - Sending {len(data)} rows")
122+
LOGGER.debug(T("coal.logs.database.column_list").format(columns=fieldnames))
123+
LOGGER.info(T("coal.logs.database.row_count").format(count=len(data)))
124124
api_run.send_run_data(
125125
organization_id,
126126
workspace_id,
@@ -160,14 +160,14 @@ def load_csv_from_run_data(
160160
organization_id, workspace_id, runner_id, run_id, RunDataQuery(query=query)
161161
)
162162
if query_result.result:
163-
LOGGER.info(f"Query returned {len(query_result.result)} rows")
163+
LOGGER.info(T("coal.logs.database.query_results").format(count=len(query_result.result)))
164164
with open(target_dir / (file_name + ".csv"), "w") as _f:
165165
headers = set()
166166
for r in query_result.result:
167167
headers = headers | set(r.keys())
168168
dw = DictWriter(_f, fieldnames=sorted(headers))
169169
dw.writeheader()
170170
dw.writerows(query_result.result)
171-
LOGGER.info(f"Results saved as {target_dir / file_name}.csv")
171+
LOGGER.info(T("coal.logs.database.saved_results").format(file=f"{target_dir / file_name}.csv"))
172172
else:
173-
LOGGER.info("No results returned by the query")
173+
LOGGER.info(T("coal.logs.database.no_results"))

cosmotech/coal/cosmotech_api/run_template.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def load_run_template_handlers(
5858
LOGGER.error(
5959
T("coal.errors.workspace.not_found").format(workspace_id=workspace_id, organization_id=organization_id)
6060
)
61-
LOGGER.debug(e.body)
61+
LOGGER.debug(T("coal.logs.orchestrator.error_details").format(details=e.body))
6262
raise ValueError(f"Workspace {workspace_id} not found in organization {organization_id}")
6363
solution_id = r_data.solution.solution_id
6464

@@ -86,7 +86,7 @@ def load_run_template_handlers(
8686
solution=solution_id,
8787
)
8888
)
89-
LOGGER.debug(e.body)
89+
LOGGER.debug(T("coal.logs.orchestrator.error_details").format(details=e.body))
9090
has_errors = True
9191
continue
9292
LOGGER.info(T("coal.logs.orchestrator.extracting_handler").format(path=handler_path.absolute()))

cosmotech/coal/cosmotech_api/twin_data_layer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def get_dataset_id_from_runner(organization_id: str, workspace_id: str, runner_i
160160

161161
if (datasets_len := len(runner_info.dataset_list)) != 1:
162162
LOGGER.error(T("coal.logs.runner.not_single_dataset").format(runner_id=runner_info.id, count=datasets_len))
163-
LOGGER.debug(runner_info)
163+
LOGGER.debug(T("coal.logs.runner.runner_info").format(info=runner_info))
164164
raise ValueError(f"Runner {runner_info.id} does not have exactly one dataset")
165165

166166
return runner_info.dataset_list[0]
@@ -309,7 +309,7 @@ def _process_csv_file(
309309
if len(errors):
310310
LOGGER.error(T("coal.logs.storage.import_errors").format(count=len(errors)))
311311
for _err in errors:
312-
LOGGER.error(str(_err))
312+
LOGGER.error(T("coal.logs.storage.error_detail").format(error=str(_err)))
313313
raise ValueError(f"Error importing data from {file_path}")
314314

315315

@@ -340,7 +340,7 @@ def load_files_from_tdl(
340340
LOGGER.error(
341341
T("coal.logs.runner.dataset_state").format(dataset_id=dataset_id, status=dataset_info.ingestion_status)
342342
)
343-
LOGGER.debug(dataset_info)
343+
LOGGER.debug(T("coal.logs.runner.dataset_info").format(info=dataset_info))
344344
raise ValueError(f"Dataset {dataset_id} is not in SUCCESS state")
345345

346346
# Create directory

0 commit comments

Comments
 (0)