Skip to content
This repository was archived by the owner on Feb 22, 2023. It is now read-only.

Commit c049737

Browse files
committed
Create a list of fields to clean outside of worker
Signed-off-by: Olga Bulat <obulat@gmail.com>
1 parent b5f06f6 commit c049737

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

ingestion_server/ingestion_server/cleanup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def test_tls_supported(cls, url):
207207
return True
208208

209209

210-
def _clean_data_worker(rows, temp_table, sources_config, table):
210+
def _clean_data_worker(rows, temp_table, sources_config, all_fields: list[str]):
211211
log.info("Starting data cleaning worker")
212212
global_field_to_func = sources_config["*"]["fields"]
213213
worker_conn = database_connect()
@@ -226,7 +226,7 @@ def _clean_data_worker(rows, temp_table, sources_config, table):
226226
}
227227

228228
start_time = time.time()
229-
cleaned_values = {field: [] for field in _get_cleanable_fields(table)}
229+
cleaned_values = {field: [] for field in all_fields}
230230
for row in rows:
231231
# Map fields that need updating to their cleaning functions
232232
source = row["source"]
@@ -359,7 +359,7 @@ def clean_image_data(table):
359359
end = job_size * n
360360
last_end = end
361361
# Arguments for parallel _clean_data_worker calls
362-
jobs.append((batch[start:end], temp_table, source_config, table))
362+
jobs.append((batch[start:end], temp_table, source_config, _get_cleanable_fields("image")))
363363
pool = multiprocessing.Pool(processes=num_workers)
364364
log.info(f"Starting {len(jobs)} cleaning jobs")
365365
conn.commit()

0 commit comments

Comments
 (0)