99import logging
1010import os
1111import platform
12- from datetime import datetime
13- from typing import TYPE_CHECKING , Optional
12+ from typing import TYPE_CHECKING
1413
1514from .errors import DataJointError , DuplicateError
1615from .expression import QueryExpression
@@ -259,6 +258,7 @@ def refresh(
259258 key_source = self ._target .key_source
260259 if restrictions :
261260 from .expression import AndList
261+
262262 key_source = key_source & AndList (restrictions )
263263
264264 # Project to FK-derived attributes only
@@ -271,17 +271,8 @@ def refresh(
271271
272272 # Insert new jobs
273273 added = 0
274- now = datetime .now ()
275274 for key in new_keys :
276- job = {
277- ** key ,
278- "status" : "pending" ,
279- "priority" : priority ,
280- "created_time" : now ,
281- # Use SQL expression for scheduled_time to use server time
282- }
283275 try :
284- # Use raw SQL to set scheduled_time using server time
285276 self ._insert_job_with_delay (key , priority , delay )
286277 added += 1
287278 except DuplicateError :
@@ -292,10 +283,7 @@ def refresh(
292283 # Find pending jobs older than stale_timeout whose keys are not in key_source
293284 removed = 0
294285 if stale_timeout > 0 :
295- stale_condition = (
296- f'status="pending" AND '
297- f'created_time < NOW() - INTERVAL { stale_timeout } SECOND'
298- )
286+ stale_condition = f'status="pending" AND ' f"created_time < NOW() - INTERVAL { stale_timeout } SECOND"
299287 stale_jobs = (self & stale_condition ).proj (* pk_attrs )
300288
301289 # Check which stale jobs are no longer in key_source
@@ -317,14 +305,10 @@ def _insert_job_with_delay(self, key: dict, priority: int, delay: float) -> None
317305 """
318306 # Build column names and values
319307 pk_attrs = [name for name , _ in self ._get_fk_derived_primary_key ()]
320- columns = pk_attrs + [
321- "status" , "priority" , "created_time" , "scheduled_time" ,
322- "user" , "host" , "pid" , "connection_id"
323- ]
308+ columns = pk_attrs + ["status" , "priority" , "created_time" , "scheduled_time" , "user" , "host" , "pid" , "connection_id" ]
324309
325310 # Build values
326- pk_values = [f"'{ key [attr ]} '" if isinstance (key [attr ], str ) else str (key [attr ])
327- for attr in pk_attrs ]
311+ pk_values = [f"'{ key [attr ]} '" if isinstance (key [attr ], str ) else str (key [attr ]) for attr in pk_attrs ]
328312 other_values = [
329313 "'pending'" ,
330314 str (priority ),
@@ -360,9 +344,7 @@ def reserve(self, key: dict) -> bool:
360344 # Build WHERE clause for the key
361345 pk_attrs = [name for name , _ in self ._get_fk_derived_primary_key ()]
362346 key_conditions = " AND " .join (
363- f"`{ attr } `='{ key [attr ]} '" if isinstance (key [attr ], str )
364- else f"`{ attr } `={ key [attr ]} "
365- for attr in pk_attrs
347+ f"`{ attr } `='{ key [attr ]} '" if isinstance (key [attr ], str ) else f"`{ attr } `={ key [attr ]} " for attr in pk_attrs
366348 )
367349
368350 # Attempt atomic update: pending -> reserved
@@ -403,8 +385,7 @@ def complete(self, key: dict, duration: float = None, keep: bool = None) -> None
403385 # Update to success status
404386 duration_sql = f", duration={ duration } " if duration is not None else ""
405387 key_conditions = " AND " .join (
406- f"`{ attr } `='{ job_key [attr ]} '" if isinstance (job_key [attr ], str )
407- else f"`{ attr } `={ job_key [attr ]} "
388+ f"`{ attr } `='{ job_key [attr ]} '" if isinstance (job_key [attr ], str ) else f"`{ attr } `={ job_key [attr ]} "
408389 for attr in pk_attrs
409390 )
410391 sql = f"""
@@ -431,17 +412,13 @@ def error(self, key: dict, error_message: str, error_stack: str = None) -> None:
431412
432413 # Truncate error message if necessary
433414 if len (error_message ) > ERROR_MESSAGE_LENGTH :
434- error_message = (
435- error_message [: ERROR_MESSAGE_LENGTH - len (TRUNCATION_APPENDIX )]
436- + TRUNCATION_APPENDIX
437- )
415+ error_message = error_message [: ERROR_MESSAGE_LENGTH - len (TRUNCATION_APPENDIX )] + TRUNCATION_APPENDIX
438416
439417 pk_attrs = [name for name , _ in self ._get_fk_derived_primary_key ()]
440418 job_key = {attr : key [attr ] for attr in pk_attrs if attr in key }
441419
442420 key_conditions = " AND " .join (
443- f"`{ attr } `='{ job_key [attr ]} '" if isinstance (job_key [attr ], str )
444- else f"`{ attr } `={ job_key [attr ]} "
421+ f"`{ attr } `='{ job_key [attr ]} '" if isinstance (job_key [attr ], str ) else f"`{ attr } `={ job_key [attr ]} "
445422 for attr in pk_attrs
446423 )
447424
@@ -480,8 +457,7 @@ def ignore(self, key: dict) -> None:
480457 if job_key in self :
481458 # Update existing job to ignore
482459 key_conditions = " AND " .join (
483- f"`{ attr } `='{ job_key [attr ]} '" if isinstance (job_key [attr ], str )
484- else f"`{ attr } `={ job_key [attr ]} "
460+ f"`{ attr } `='{ job_key [attr ]} '" if isinstance (job_key [attr ], str ) else f"`{ attr } `={ job_key [attr ]} "
485461 for attr in pk_attrs
486462 )
487463 sql = f"""
@@ -497,15 +473,9 @@ def ignore(self, key: dict) -> None:
497473 def _insert_job_with_status (self , key : dict , status : str ) -> None :
498474 """Insert a new job with the given status."""
499475 pk_attrs = [name for name , _ in self ._get_fk_derived_primary_key ()]
500- columns = pk_attrs + [
501- "status" , "priority" , "created_time" , "scheduled_time" ,
502- "user" , "host" , "pid" , "connection_id"
503- ]
476+ columns = pk_attrs + ["status" , "priority" , "created_time" , "scheduled_time" , "user" , "host" , "pid" , "connection_id" ]
504477
505- pk_values = [
506- f"'{ key [attr ]} '" if isinstance (key [attr ], str ) else str (key [attr ])
507- for attr in pk_attrs
508- ]
478+ pk_values = [f"'{ key [attr ]} '" if isinstance (key [attr ], str ) else str (key [attr ]) for attr in pk_attrs ]
509479 other_values = [
510480 f"'{ status } '" ,
511481 str (DEFAULT_PRIORITY ),
@@ -567,7 +537,6 @@ def fetch_pending(
567537 query = query & f"priority <= { priority } "
568538
569539 # Fetch with ordering
570- pk_attrs = [name for name , _ in self ._get_fk_derived_primary_key ()]
571540 return query .fetch (
572541 "KEY" ,
573542 order_by = ["priority ASC" , "scheduled_time ASC" ],
0 commit comments