Skip to content

Commit 6e436ca

Browse files
authored
Fix database issues (#2345)
* Fix database issues * update test
1 parent edf765e commit 6e436ca

File tree

5 files changed

+44
-26
lines changed

5 files changed

+44
-26
lines changed

autosubmit/history/database_managers/experiment_history_db_manager.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -403,10 +403,24 @@ def get_job_data_by_job_id_name(self, job_id: int, job_name: str) -> JobData:
403403
models = [Models.JobDataRow(*row) for row in job_data_rows][-1]
404404
return JobData.from_model(models)
405405

406-
def get_job_data_max_counter(self):
407-
""" The max counter is the maximum count value for the count column in job_data. """
408-
statement = "SELECT MAX(counter) as maxcounter FROM job_data"
409-
counter_result = self.get_from_statement(self.historicaldb_file_path, statement)
406+
def get_job_data_max_counter(self, job_name: str = None) -> int:
407+
"""
408+
Get the maximum counter value from the `job_data` table. If a `job_name` is provided,
409+
the query will filter by that specific job name.
410+
411+
:param job_name: The name of the job to filter by (optional).
412+
:type job_name: str, optional
413+
:return: The maximum counter value, or the default value if no rows are found.
414+
:rtype: int
415+
"""
416+
if job_name:
417+
statement = "SELECT MAX(counter) as maxcounter FROM job_data WHERE job_name = ?"
418+
arguments = (job_name,)
419+
counter_result = self.get_from_statement_with_arguments(self.historicaldb_file_path, statement, arguments)
420+
else:
421+
statement = "SELECT MAX(counter) as maxcounter FROM job_data"
422+
counter_result = self.get_from_statement(self.historicaldb_file_path, statement)
423+
410424
if len(counter_result) <= 0:
411425
return DEFAULT_MAX_COUNTER
412426
else:
@@ -483,7 +497,7 @@ def _update_job_data_by_id(self, job_data_dc): ...
483497

484498
def get_job_data_by_name(self, job_name): ...
485499

486-
def get_job_data_max_counter(self): ...
500+
def get_job_data_max_counter(self, job_name: str = None) -> int: ...
487501

488502
def delete_job_data(self, id_): ...
489503

autosubmit/history/experiment_history.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ def _get_defined_queue_name(self, wrapper_queue, wrapper_code, qos):
343343
def _get_next_counter_by_job_name(self, job_name):
344344
""" Return the counter attribute from the latest job data row by job_name. """
345345
job_data_dc = self.manager.get_job_data_dc_unique_latest_by_job_name(job_name)
346-
max_counter = self.manager.get_job_data_max_counter()
346+
max_counter = self.manager.get_job_data_max_counter(job_name)
347347
if job_data_dc:
348348
return max(max_counter, job_data_dc.counter + 1)
349349
else:

autosubmit/job/job.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,10 +1143,10 @@ def _get_from_stat(self, index: int, fail_count: int =-1) -> int:
11431143
int: Value in the index position, or 0 if the file or index does not exist.
11441144
"""
11451145
if fail_count == -1:
1146-
logname = os.path.join(self._log_path, f"{self.stat_file}0")
1146+
logname = os.path.join(self._tmp_path, f"{self.stat_file}0")
11471147
else:
11481148
fail_count = str(fail_count)
1149-
logname = os.path.join(self._log_path, f"{self.stat_file}{fail_count}")
1149+
logname = os.path.join(self._tmp_path, f"{self.stat_file}{fail_count}")
11501150
if os.path.exists(logname):
11511151
lines = open(logname).readlines()
11521152
if len(lines) >= index + 1:
@@ -2549,10 +2549,12 @@ def write_end_time(self, completed, count=-1):
25492549

25502550
end_time = self.check_end_time(count)
25512551
if end_time > 0:
2552-
self.finish_time_timestamp = end_time
2552+
self.finish_time_timestamp = int(end_time)
2553+
if not self.finish_time_timestamp:
2554+
self.finish_time_timestamp = int(time.time())
25532555
with open(Path(self._tmp_path) / f"{self.name}_TOTAL_STATS", 'a') as stat_file:
25542556
stat_file.write(' ')
2555-
stat_file.write(date2str(datetime.datetime.fromtimestamp(float(self.finish_time_timestamp)), 'S'))
2557+
stat_file.write(date2str(datetime.datetime.fromtimestamp(int(self.finish_time_timestamp)), 'S'))
25562558
stat_file.write(' ')
25572559

25582560
if completed:

autosubmit/platforms/platform.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1063,16 +1063,18 @@ def wait_until_timeout(self, timeout: int = 60) -> bool:
10631063
break
10641064
return process_log
10651065

1066-
def recover_job_log(self, identifier: str, jobs_pending_to_process: Set[Any]) -> Set[Any]:
1066+
def recover_job_log(self, identifier: str, jobs_pending_to_process: Set[Any], as_conf: 'AutosubmitConfig') -> Set[Any]:
10671067
"""
1068-
Recovers log files for jobs from the recovery queue and retry failed jobs.
1068+
Recovers log files for jobs from the recovery queue and retries failed jobs.
10691069
1070-
Args:
1071-
identifier (str): Identifier for logging purposes.
1072-
jobs_pending_to_process (Set[Any]): Set of jobs that had issues during log retrieval.
1073-
1074-
Returns:
1075-
Set[Any]: Updated set of jobs pending to process.
1070+
:param identifier: Identifier for logging purposes.
1071+
:type identifier: str
1072+
:param jobs_pending_to_process: Set of jobs that had issues during log retrieval.
1073+
:type jobs_pending_to_process: Set[Any]
1074+
:param as_conf: The Autosubmit configuration object containing experiment data.
1075+
:type as_conf: AutosubmitConfig
1076+
:return: Updated set of jobs pending to process.
1077+
:rtype: Set[Any]
10761078
"""
10771079
job = None
10781080

@@ -1093,7 +1095,7 @@ def recover_job_log(self, identifier: str, jobs_pending_to_process: Set[Any]) ->
10931095
pass
10941096

10951097
if len(jobs_pending_to_process) > 0: # Restore the connection if there was an issue with one or more jobs.
1096-
self.restore_connection(None)
1098+
self.restore_connection(as_conf, log_recovery_process=True)
10971099

10981100
# This second while is to keep retring the failed jobs.
10991101
# With the unique queue, the main process won't send the job again, so we have to store it here.
@@ -1111,7 +1113,7 @@ def recover_job_log(self, identifier: str, jobs_pending_to_process: Set[Any]) ->
11111113
Log.result(
11121114
f"{identifier} (Retry) Successfully recovered log for job '{job.name}' and retry '{job.fail_count}'.")
11131115
if len(jobs_pending_to_process) > 0:
1114-
self.restore_connection(None) # Restore the connection if there was an issue with one or more jobs.
1116+
self.restore_connection(as_conf, log_recovery_process=True) # Restore the connection if there was an issue with one or more jobs.
11151117

11161118
return jobs_pending_to_process
11171119

@@ -1132,9 +1134,9 @@ def recover_platform_job_logs(self, as_conf) -> None:
11321134
# Keep alive signal timeout is 5 minutes, but the sleeptime is 60 seconds.
11331135
self.keep_alive_timeout = max(log_recovery_timeout*5, 60*5)
11341136
while self.wait_for_work(sleep_time=max(log_recovery_timeout, 60)):
1135-
jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process)
1137+
jobs_pending_to_process = self.recover_job_log(identifier, jobs_pending_to_process, as_conf)
11361138
if self.cleanup_event.is_set(): # Check if the main process is waiting for this child to end.
1137-
self.recover_job_log(identifier, jobs_pending_to_process)
1139+
self.recover_job_log(identifier, jobs_pending_to_process, as_conf)
11381140
break
11391141
except Exception as e:
11401142
Log.error(f"{identifier} {e}")

test/unit/test_job.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1903,14 +1903,14 @@ def test_get_from_stat(tmpdir, file_exists, index_timestamp, fail_count, expecte
19031903

19041904
job = Job("dummy", 1, Status.WAITING, 0)
19051905
assert job.stat_file == f"{job.name}_STAT_"
1906-
job._log_path = Path(tmpdir)
1907-
job._log_path.mkdir(parents=True, exist_ok=True)
1906+
job._tmp_path = Path(tmpdir)
1907+
job._tmp_path.mkdir(parents=True, exist_ok=True)
19081908

19091909
# Generating the timestamp file
19101910
if file_exists:
1911-
with open(job._log_path.joinpath(f"{job.stat_file}0"), "w") as stat_file:
1911+
with open(job._tmp_path.joinpath(f"{job.stat_file}0"), "w") as stat_file:
19121912
stat_file.write("19704923\n19704924\n")
1913-
with open(job._log_path.joinpath(f"{job.stat_file}1"), "w") as stat_file:
1913+
with open(job._tmp_path.joinpath(f"{job.stat_file}1"), "w") as stat_file:
19141914
stat_file.write("29704923\n29704924\n")
19151915

19161916
if fail_count is None:

0 commit comments

Comments
 (0)