Skip to content

Commit 086de07

Browse files
committed
Implement Autopopulate 2.0 job system
This commit implements the per-table jobs system specified in the Autopopulate 2.0 design document. New features: - Per-table JobsTable class (jobs_v2.py) with FK-derived primary keys - Status enum: pending, reserved, success, error, ignore - Priority system (lower = more urgent, 0 = highest, default = 5) - Scheduled processing via delay parameter - Methods: refresh(), reserve(), complete(), error(), ignore() - Properties: pending, reserved, errors, ignored, completed, progress() Configuration (settings.py): - New JobsSettings class with: - jobs.auto_refresh (default: True) - jobs.keep_completed (default: False) - jobs.stale_timeout (default: 3600 seconds) - jobs.default_priority (default: 5) AutoPopulate changes (autopopulate.py): - Added jobs property to access per-table JobsTable - Updated populate() with new parameters: priority, refresh - Updated _populate1() to use new JobsTable API - Collision errors (DuplicateError) handled silently per spec Schema changes (schemas.py): - Track auto-populated tables during decoration - schema.jobs now returns list of JobsTable objects - Added schema.legacy_jobs for backward compatibility
1 parent 7b11d65 commit 086de07

File tree

4 files changed

+711
-27
lines changed

4 files changed

+711
-27
lines changed

src/datajoint/autopopulate.py

Lines changed: 95 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class AutoPopulate:
5555

5656
_key_source = None
5757
_allow_insert = False
58+
_jobs_table = None # Cached JobsTable instance
5859

5960
@property
6061
def key_source(self):
@@ -160,6 +161,21 @@ def target(self):
160161
"""
161162
return self
162163

164+
@property
165+
def jobs(self):
166+
"""
167+
Access the jobs table for this auto-populated table.
168+
169+
The jobs table provides per-table job queue management with rich status
170+
tracking (pending, reserved, success, error, ignore).
171+
172+
:return: JobsTable instance for this table
173+
"""
174+
if self._jobs_table is None:
175+
from .jobs_v2 import JobsTable
176+
self._jobs_table = JobsTable(self.target)
177+
return self._jobs_table
178+
163179
def _job_key(self, key):
164180
"""
165181
:param key: they key returned for the job from the key source
@@ -209,6 +225,9 @@ def populate(
209225
display_progress=False,
210226
processes=1,
211227
make_kwargs=None,
228+
# New parameters for Autopopulate 2.0
229+
priority=None,
230+
refresh=True,
212231
):
213232
"""
214233
``table.populate()`` calls ``table.make(key)`` for every primary key in
@@ -230,6 +249,10 @@ def populate(
230249
to be passed down to each ``make()`` call. Computation arguments should be
231250
specified within the pipeline e.g. using a `dj.Lookup` table.
232251
:type make_kwargs: dict, optional
252+
:param priority: Only process jobs at this priority or more urgent (lower values).
253+
Only applies when reserve_jobs=True.
254+
:param refresh: If True and no pending jobs are found, refresh the jobs queue
255+
before giving up. Only applies when reserve_jobs=True.
233256
:return: a dict with two keys
234257
"success_count": the count of successful ``make()`` calls in this ``populate()`` call
235258
"error_list": the error list that is filled if `suppress_errors` is True
@@ -240,7 +263,9 @@ def populate(
240263
valid_order = ["original", "reverse", "random"]
241264
if order not in valid_order:
242265
raise DataJointError("The order argument must be one of %s" % str(valid_order))
243-
jobs = self.connection.schemas[self.target.database].jobs if reserve_jobs else None
266+
267+
# Get the jobs table (per-table JobsTable for new system)
268+
jobs_table = self.jobs if reserve_jobs else None
244269

245270
if reserve_jobs:
246271
# Define a signal handler for SIGTERM
@@ -250,15 +275,21 @@ def handler(signum, frame):
250275

251276
old_handler = signal.signal(signal.SIGTERM, handler)
252277

253-
if keys is None:
254-
keys = (self._jobs_to_do(restrictions) - self.target).fetch("KEY", limit=limit)
278+
error_list = []
279+
success_list = []
255280

256-
# exclude "error", "ignore" or "reserved" jobs
257281
if reserve_jobs:
258-
exclude_key_hashes = (
259-
jobs & {"table_name": self.target.table_name} & 'status in ("error", "ignore", "reserved")'
260-
).fetch("key_hash")
261-
keys = [key for key in keys if key_hash(key) not in exclude_key_hashes]
282+
# New Autopopulate 2.0 logic: use jobs table
283+
keys = self._get_pending_jobs(
284+
restrictions=restrictions,
285+
priority=priority,
286+
limit=limit,
287+
refresh=refresh,
288+
)
289+
else:
290+
# Legacy behavior: get keys from key_source
291+
if keys is None:
292+
keys = (self._jobs_to_do(restrictions) - self.target).fetch("KEY", limit=limit)
262293

263294
if order == "reverse":
264295
keys.reverse()
@@ -270,9 +301,6 @@ def handler(signum, frame):
270301
keys = keys[:max_calls]
271302
nkeys = len(keys)
272303

273-
error_list = []
274-
success_list = []
275-
276304
if nkeys:
277305
processes = min(_ for _ in (processes, nkeys, mp.cpu_count()) if _)
278306

@@ -284,7 +312,7 @@ def handler(signum, frame):
284312

285313
if processes == 1:
286314
for key in tqdm(keys, desc=self.__class__.__name__) if display_progress else keys:
287-
status = self._populate1(key, jobs, **populate_kwargs)
315+
status = self._populate1(key, jobs_table, **populate_kwargs)
288316
if status is True:
289317
success_list.append(1)
290318
elif isinstance(status, tuple):
@@ -296,7 +324,7 @@ def handler(signum, frame):
296324
self.connection.close() # disconnect parent process from MySQL server
297325
del self.connection._conn.ctx # SSLContext is not pickleable
298326
with (
299-
mp.Pool(processes, _initialize_populate, (self, jobs, populate_kwargs)) as pool,
327+
mp.Pool(processes, _initialize_populate, (self, jobs_table, populate_kwargs)) as pool,
300328
tqdm(desc="Processes: ", total=nkeys) if display_progress else contextlib.nullcontext() as progress_bar,
301329
):
302330
for status in pool.imap(_call_populate1, keys, chunksize=1):
@@ -319,23 +347,54 @@ def handler(signum, frame):
319347
"error_list": error_list,
320348
}
321349

350+
def _get_pending_jobs(self, restrictions, priority, limit, refresh):
351+
"""
352+
Get pending jobs from the jobs table.
353+
354+
If no pending jobs are found and refresh=True, refreshes the jobs queue
355+
and tries again.
356+
357+
:param restrictions: Restrictions to apply when refreshing
358+
:param priority: Only get jobs at this priority or more urgent
359+
:param limit: Maximum number of jobs to return
360+
:param refresh: Whether to refresh if no pending jobs found
361+
:return: List of key dicts
362+
"""
363+
jobs_table = self.jobs
364+
365+
# First, try to get pending jobs
366+
keys = jobs_table.fetch_pending(limit=limit, priority=priority)
367+
368+
# If no pending jobs and refresh is enabled, refresh and try again
369+
if not keys and refresh:
370+
logger.debug("No pending jobs found, refreshing jobs queue")
371+
jobs_table.refresh(*restrictions)
372+
keys = jobs_table.fetch_pending(limit=limit, priority=priority)
373+
374+
return keys
375+
322376
def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_kwargs=None):
323377
"""
324378
populates table for one source key, calling self.make inside a transaction.
325-
:param jobs: the jobs table or None if not reserve_jobs
379+
:param jobs: the jobs table (JobsTable) or None if not reserve_jobs
326380
:param key: dict specifying job to populate
327381
:param suppress_errors: bool if errors should be suppressed and returned
328382
:param return_exception_objects: if True, errors must be returned as objects
329383
:return: (key, error) when suppress_errors=True,
330384
True if successfully invoke one `make()` call, otherwise False
331385
"""
386+
import time
387+
332388
# use the legacy `_make_tuples` callback.
333389
make = self._make_tuples if hasattr(self, "_make_tuples") else self.make
390+
job_key = self._job_key(key)
391+
start_time = time.time()
334392

335-
if jobs is not None and not jobs.reserve(self.target.table_name, self._job_key(key)):
393+
# Try to reserve the job (per-key, before make)
394+
if jobs is not None and not jobs.reserve(job_key):
336395
return False
337396

338-
# if make is a generator, it transaction can be delayed until the final stage
397+
# if make is a generator, transaction can be delayed until the final stage
339398
is_generator = inspect.isgeneratorfunction(make)
340399
if not is_generator:
341400
self.connection.start_transaction()
@@ -344,7 +403,8 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_
344403
if not is_generator:
345404
self.connection.cancel_transaction()
346405
if jobs is not None:
347-
jobs.complete(self.target.table_name, self._job_key(key))
406+
# Job already done - mark complete or delete
407+
jobs.complete(job_key, duration=0)
348408
return False
349409

350410
logger.debug(f"Making {key} -> {self.target.full_table_name}")
@@ -379,24 +439,34 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects, make_
379439
msg=": " + str(error) if str(error) else "",
380440
)
381441
logger.debug(f"Error making {key} -> {self.target.full_table_name} - {error_message}")
442+
443+
# Only log errors from inside make() - not collision errors
382444
if jobs is not None:
383-
# show error name and error message (if any)
384-
jobs.error(
385-
self.target.table_name,
386-
self._job_key(key),
387-
error_message=error_message,
388-
error_stack=traceback.format_exc(),
389-
)
445+
from .errors import DuplicateError
446+
if isinstance(error, DuplicateError):
447+
# Collision error - job reverts to pending or gets deleted
448+
# This is not a real error, just coordination artifact
449+
logger.debug(f"Duplicate key collision for {key}, reverting job")
450+
# Delete the reservation, letting the job be picked up again or cleaned
451+
(jobs & job_key).delete_quick()
452+
else:
453+
# Real error inside make() - log it
454+
jobs.error(
455+
job_key,
456+
error_message=error_message,
457+
error_stack=traceback.format_exc(),
458+
)
390459
if not suppress_errors or isinstance(error, SystemExit):
391460
raise
392461
else:
393462
logger.error(error)
394463
return key, error if return_exception_objects else error_message
395464
else:
396465
self.connection.commit_transaction()
466+
duration = time.time() - start_time
397467
logger.debug(f"Success making {key} -> {self.target.full_table_name}")
398468
if jobs is not None:
399-
jobs.complete(self.target.table_name, self._job_key(key))
469+
jobs.complete(job_key, duration=duration)
400470
return True
401471
finally:
402472
self.__class__._allow_insert = False

0 commit comments

Comments
 (0)