Skip to content

Commit d28fa7c

Browse files
committed
Simplify jobs variable usage in populate()
- Remove early jobs_table assignment, use self.jobs directly - Fix comment: key_source is correct behavior, not legacy - Use self.jobs directly in _get_pending_jobs
1 parent 7b28c64 commit d28fa7c

File tree

1 file changed

+8
-11
lines changed

1 file changed

+8
-11
lines changed

src/datajoint/autopopulate.py

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,6 @@ def populate(
224224
if order not in valid_order:
225225
raise DataJointError("The order argument must be one of %s" % str(valid_order))
226226

227-
# Get the jobs table (per-table JobsTable for new system)
228-
jobs_table = self.jobs if reserve_jobs else None
229-
230227
if reserve_jobs:
231228
# Define a signal handler for SIGTERM
232229
def handler(signum, frame):
@@ -247,7 +244,7 @@ def handler(signum, frame):
247244
refresh=refresh,
248245
)
249246
else:
250-
# Legacy behavior: get keys from key_source
247+
# Without job reservations: compute keys directly from key_source
251248
if keys is None:
252249
todo = (self.key_source & AndList(restrictions)).proj()
253250
keys = (todo - self).fetch("KEY", limit=limit)
@@ -271,9 +268,11 @@ def handler(signum, frame):
271268
make_kwargs=make_kwargs,
272269
)
273270

271+
jobs = self.jobs if reserve_jobs else None
272+
274273
if processes == 1:
275274
for key in tqdm(keys, desc=self.__class__.__name__) if display_progress else keys:
276-
status = self._populate1(key, jobs_table, **populate_kwargs)
275+
status = self._populate1(key, jobs, **populate_kwargs)
277276
if status is True:
278277
success_list.append(1)
279278
elif isinstance(status, tuple):
@@ -285,7 +284,7 @@ def handler(signum, frame):
285284
self.connection.close() # disconnect parent process from MySQL server
286285
del self.connection._conn.ctx # SSLContext is not pickleable
287286
with (
288-
mp.Pool(processes, _initialize_populate, (self, jobs_table, populate_kwargs)) as pool,
287+
mp.Pool(processes, _initialize_populate, (self, jobs, populate_kwargs)) as pool,
289288
tqdm(desc="Processes: ", total=nkeys) if display_progress else contextlib.nullcontext() as progress_bar,
290289
):
291290
for status in pool.imap(_call_populate1, keys, chunksize=1):
@@ -321,16 +320,14 @@ def _get_pending_jobs(self, restrictions, priority, limit, refresh):
321320
:param refresh: Whether to refresh if no pending jobs found
322321
:return: List of key dicts
323322
"""
324-
jobs_table = self.jobs
325-
326323
# First, try to get pending jobs
327-
keys = jobs_table.fetch_pending(limit=limit, priority=priority)
324+
keys = self.jobs.fetch_pending(limit=limit, priority=priority)
328325

329326
# If no pending jobs and refresh is enabled, refresh and try again
330327
if not keys and refresh:
331328
logger.debug("No pending jobs found, refreshing jobs queue")
332-
jobs_table.refresh(*restrictions)
333-
keys = jobs_table.fetch_pending(limit=limit, priority=priority)
329+
self.jobs.refresh(*restrictions)
330+
keys = self.jobs.fetch_pending(limit=limit, priority=priority)
334331

335332
return keys
336333

0 commit comments

Comments
 (0)