@@ -154,23 +154,25 @@ def _process_s3_input(
154154
155155
156156def search_and_schedule_new_tasks (
157+ clp_config : CLPConfig ,
157158 db_conn ,
158159 db_cursor ,
159160 clp_metadata_db_connection_config : Dict [str , Any ],
160- clp_archive_output : ArchiveOutput ,
161161):
162162 """
163163 For all jobs with PENDING status, splits the job into tasks and schedules them.
164+ :param clp_config:
164165 :param db_conn:
165166 :param db_cursor:
166167 :param clp_metadata_db_connection_config:
167- :param clp_archive_output:
168168 """
169169 global scheduled_jobs
170170
171- existing_datasets = fetch_existing_datasets (
172- db_cursor , clp_metadata_db_connection_config ["table_prefix" ]
173- )
171+ existing_datasets : Set [str ] = set ()
172+ if StorageEngine .CLP_S == clp_config .package .storage_engine :
173+ existing_datasets = fetch_existing_datasets (
174+ db_cursor , clp_metadata_db_connection_config ["table_prefix" ]
175+ )
174176
175177 logger .debug ("Search and schedule new tasks" )
176178
@@ -193,7 +195,7 @@ def search_and_schedule_new_tasks(
193195 db_cursor ,
194196 table_prefix ,
195197 dataset ,
196- clp_archive_output ,
198+ clp_config . archive_output ,
197199 )
198200
199201 # NOTE: This assumes we never delete a dataset when compression jobs are being scheduled
@@ -428,10 +430,10 @@ def main(argv):
428430 while True :
429431 try :
430432 search_and_schedule_new_tasks (
433+ clp_config ,
431434 db_conn ,
432435 db_cursor ,
433436 clp_metadata_db_connection_config ,
434- clp_config .archive_output ,
435437 )
436438 poll_running_jobs (db_conn , db_cursor )
437439 time .sleep (clp_config .compression_scheduler .jobs_poll_delay )
0 commit comments