Skip to content
This repository was archived by the owner on Feb 22, 2023. It is now read-only.

Commit 84ac386

Browse files
committed
Create a list of fields to clean outside of worker
Signed-off-by: Olga Bulat <[email protected]>
1 parent b7cc4f1 commit 84ac386

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

ingestion_server/ingestion_server/cleanup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def test_tls_supported(cls, url):
197197
return True
198198

199199

200-
def _clean_data_worker(rows, temp_table, sources_config, table):
200+
def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
201201
log.info("Starting data cleaning worker")
202202
global_field_to_func = sources_config["*"]["fields"]
203203
worker_conn = database_connect()
@@ -216,7 +216,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table):
216216
}
217217

218218
start_time = time.time()
219-
cleaned_values = {field: [] for field in _get_cleanable_fields(table)}
219+
cleaned_values = {field: [] for field in all_fields}
220220
for row in rows:
221221
# Map fields that need updating to their cleaning functions
222222
source = row["source"]
@@ -349,7 +349,7 @@ def clean_image_data(table):
349349
end = job_size * n
350350
last_end = end
351351
# Arguments for parallel _clean_data_worker calls
352-
jobs.append((batch[start:end], temp_table, source_config, table))
352+
jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image")))
353353
pool = multiprocessing.Pool(processes=num_workers)
354354
log.info(f"Starting {len(jobs)} cleaning jobs")
355355
conn.commit()

0 commit comments

Comments
 (0)