Skip to content

Commit 7184ce5

Browse files
committed
format: black
1 parent 1f773fa commit 7184ce5

File tree

3 files changed

+66
-38
lines changed

3 files changed

+66
-38
lines changed

datajoint/autopopulate.py

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""This module defines class dj.AutoPopulate"""
2+
23
import contextlib
34
import datetime
45
import inspect
@@ -270,24 +271,25 @@ def populate(
270271
raise DataJointError(
271272
"The order argument must be one of %s" % str(valid_order)
272273
)
273-
274+
274275
if schedule_jobs:
275276
self.schedule_jobs(*restrictions)
276277

277278
# define and set up signal handler for SIGTERM:
278279
if reserve_jobs:
280+
279281
def handler(signum, frame):
280282
logger.info("Populate terminated by SIGTERM")
281283
raise SystemExit("SIGTERM received")
284+
282285
old_handler = signal.signal(signal.SIGTERM, handler)
283286

284287
# retrieve `keys` if not provided
285288
if keys is None:
286289
if reserve_jobs:
287-
keys = (
288-
self.jobs
289-
& {'status': 'scheduled'}
290-
).fetch("key", order_by="timestamp", limit=limit)
290+
keys = (self.jobs & {"status": "scheduled"}).fetch(
291+
"key", order_by="timestamp", limit=limit
292+
)
291293
if restrictions:
292294
# hitting the `key_source` again to apply the restrictions
293295
# this is expensive/suboptimal
@@ -300,8 +302,7 @@ def handler(signum, frame):
300302
# exclude "error", "ignore" or "reserved" jobs
301303
if reserve_jobs:
302304
exclude_key_hashes = (
303-
self.jobs
304-
& 'status in ("error", "ignore", "reserved")'
305+
self.jobs & 'status in ("error", "ignore", "reserved")'
305306
).fetch("key_hash")
306307
keys = [key for key in keys if key_hash(key) not in exclude_key_hashes]
307308

@@ -473,15 +474,15 @@ def _populate1(
473474
self._Jobs.complete(
474475
self.target.table_name,
475476
self._job_key(key),
476-
run_duration=(
477-
datetime.datetime.utcnow() - make_start
478-
).total_seconds(),
477+
run_duration=(datetime.datetime.utcnow() - make_start).total_seconds(),
479478
)
480479
# Update the _job column with the job metadata for newly populated entries
481480
if "_job" in self.target.heading._attributes:
482481
job_metadata = {
483482
"execution_time": make_start,
484-
"execution_duration": (datetime.datetime.utcnow() - make_start).total_seconds(),
483+
"execution_duration": (
484+
datetime.datetime.utcnow() - make_start
485+
).total_seconds(),
485486
"host": platform.node(),
486487
"pid": os.getpid(),
487488
"connection_id": self.connection.connection_id,
@@ -526,44 +527,50 @@ def _Jobs(self):
526527
def jobs(self):
527528
return self._Jobs & {"table_name": self.target.table_name}
528529

529-
def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval=None):
530+
def schedule_jobs(
531+
self, *restrictions, purge_jobs=False, min_scheduling_interval=None
532+
):
530533
"""
531534
Schedule new jobs for this autopopulate table by finding keys that need computation.
532-
535+
533536
This method implements an optimization strategy to avoid excessive scheduling:
534537
1. First checks if jobs were scheduled recently (within min_scheduling_interval)
535538
2. If recent scheduling event exists, skips scheduling to prevent database load
536539
3. Otherwise, finds keys that need computation and schedules them
537-
540+
538541
The method also optionally purges invalid jobs (jobs that no longer exist in key_source)
539542
to maintain database cleanliness.
540-
543+
541544
Args:
542545
restrictions: a list of restrictions each restrict (table.key_source - target.proj())
543546
purge_jobs: if True, remove orphaned jobs from the jobs table (potentially expensive operation)
544547
min_scheduling_interval: minimum time in seconds that must have passed since last job scheduling.
545548
If None, uses the value from dj.config["min_scheduling_interval"] (default: None)
546-
549+
547550
Returns:
548551
None
549552
"""
550553
__scheduled_event = {
551554
"table_name": self.target.table_name,
552-
"__type__": "jobs scheduling event"
553-
}
554-
555+
"__type__": "jobs scheduling event",
556+
}
557+
555558
if min_scheduling_interval is None:
556559
min_scheduling_interval = config["min_scheduling_interval"]
557560

558561
if min_scheduling_interval > 0:
559562
recent_scheduling_event = (
560-
self._Jobs.proj(last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())")
563+
self._Jobs.proj(
564+
last_scheduled="TIMESTAMPDIFF(SECOND, timestamp, UTC_TIMESTAMP())"
565+
)
561566
& {"table_name": f"__{self.target.table_name}__"}
562567
& {"key_hash": key_hash(__scheduled_event)}
563568
& f"last_scheduled <= {min_scheduling_interval}"
564569
)
565570
if recent_scheduling_event:
566-
logger.info(f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)")
571+
logger.info(
572+
f"Skip jobs scheduling for `{to_camel_case(self.target.table_name)}` (last scheduled {recent_scheduling_event.fetch1('last_scheduled')} seconds ago)"
573+
)
567574
return
568575

569576
try:
@@ -574,8 +581,11 @@ def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval
574581
except Exception as e:
575582
logger.exception(str(e))
576583
else:
577-
self._Jobs.ignore(f"__{self.target.table_name}__", __scheduled_event,
578-
message=f"Jobs scheduling event: {__scheduled_event['table_name']}")
584+
self._Jobs.ignore(
585+
f"__{self.target.table_name}__",
586+
__scheduled_event,
587+
message=f"Jobs scheduling event: {__scheduled_event['table_name']}",
588+
)
579589
logger.info(
580590
f"{schedule_count} new jobs scheduled for `{to_camel_case(self.target.table_name)}`"
581591
)
@@ -586,21 +596,23 @@ def schedule_jobs(self, *restrictions, purge_jobs=False, min_scheduling_interval
586596
def purge_jobs(self):
587597
"""
588598
Check and remove any orphaned/outdated jobs in the JobTable for this autopopulate table.
589-
599+
590600
This method handles two types of orphaned jobs:
591601
1. Jobs that are no longer in the `key_source` (e.g. entries in upstream table(s) got deleted)
592602
2. Jobs with "success" status that are no longer in the target table (e.g. entries in target table got deleted)
593-
603+
594604
The method is potentially time-consuming as it needs to:
595605
- Compare all jobs against the current key_source
596606
- For success jobs, verify their existence in the target table
597607
- Delete any jobs that fail these checks
598-
608+
599609
This cleanup should not need to run very often, but helps maintain database consistency.
600610
"""
601611
invalid_removed = 0
602612

603-
incomplete_query = self.jobs & {"table_name": self.target.table_name} & "status != 'success'"
613+
incomplete_query = (
614+
self.jobs & {"table_name": self.target.table_name} & "status != 'success'"
615+
)
604616
if incomplete_query:
605617
keys2do = self._jobs_to_do({}).fetch("KEY")
606618
invalid_incomplete = len(incomplete_query) - len(keys2do)

datajoint/jobs.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,15 @@ def complete(self, table_name, key):
196196
job_key = dict(table_name=table_name, key_hash=key_hash(key))
197197
(self & job_key).delete()
198198

199-
def error(self, table_name, key, error_message, error_stack=None, run_duration=None, run_metadata=None):
199+
def error(
200+
self,
201+
table_name,
202+
key,
203+
error_message,
204+
error_stack=None,
205+
run_duration=None,
206+
run_metadata=None,
207+
):
200208
"""
201209
Log an error message. The job reservation is replaced with an error entry.
202210
if an error occurs, leave an entry describing the problem

tests/test_declare.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -431,25 +431,33 @@ def test_add_hidden_timestamp_disabled(disable_add_hidden_timestamp, schema_any)
431431

432432
def test_hidden_job_column_for_imported_computed_tables(schema_any):
433433
"""Test that hidden _job column is added to imported and computed tables but not manual/lookup tables"""
434-
434+
435435
# Manual and Lookup tables should NOT have _job column
436436
manual_attrs = Image().heading._attributes
437437
lookup_attrs = Subject().heading._attributes
438-
439-
assert not any(a.name == "_job" for a in manual_attrs.values()), "Manual table should not have _job column"
440-
assert not any(a.name == "_job" for a in lookup_attrs.values()), "Lookup table should not have _job column"
441-
438+
439+
assert not any(
440+
a.name == "_job" for a in manual_attrs.values()
441+
), "Manual table should not have _job column"
442+
assert not any(
443+
a.name == "_job" for a in lookup_attrs.values()
444+
), "Lookup table should not have _job column"
445+
442446
# Imported and Computed tables SHOULD have _job column
443447
imported_attrs = Experiment().heading._attributes
444448
computed_attrs = SigIntTable().heading._attributes
445-
446-
assert any(a.name == "_job" for a in imported_attrs.values()), "Imported table should have _job column"
447-
assert any(a.name == "_job" for a in computed_attrs.values()), "Computed table should have _job column"
448-
449+
450+
assert any(
451+
a.name == "_job" for a in imported_attrs.values()
452+
), "Imported table should have _job column"
453+
assert any(
454+
a.name == "_job" for a in computed_attrs.values()
455+
), "Computed table should have _job column"
456+
449457
# Verify the _job column is hidden and has correct type
450458
imported_job_attr = next(a for a in imported_attrs.values() if a.name == "_job")
451459
computed_job_attr = next(a for a in computed_attrs.values() if a.name == "_job")
452-
460+
453461
assert imported_job_attr.is_hidden, "_job column should be hidden"
454462
assert computed_job_attr.is_hidden, "_job column should be hidden"
455463
assert "json" in imported_job_attr.sql.lower(), "_job column should be JSON type"

0 commit comments

Comments
 (0)