Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 73 additions & 13 deletions calibrations/calo/calo_cdb/scripts/runProd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
This module generates a list of run / datasets given a run type and event threshold.
"""
import argparse
import datetime
from datetime import datetime
import logging
import os
import shutil
import subprocess
import sys
import textwrap
import time

from pathlib import Path
from multiprocessing import Pool, cpu_count
Expand All @@ -22,9 +23,9 @@

parser.add_argument('-i'
, '--run-type', type=str
, default='run3pp'
, choices=['run2pp','run2auau','run3auau','run3pp']
, help='Run Type. Default: run3pp')
, default='run3oo'
, choices=['run2pp','run2auau','run3auau','run3pp','run3oo']
, help='Run Type. Default: run3oo')

parser.add_argument('-f'
, '--bin-filter-datasets', type=str
Expand All @@ -46,6 +47,11 @@
, default='test'
, help='Output directory for condor.')

parser.add_argument('-o2'
, '--output-calib-tags', type=str
, default=''
, help='Output directory for calibration tags.')

parser.add_argument('-m'
, '--memory', type=float
, default=0.5
Expand All @@ -70,7 +76,7 @@ def get_file_paths(engine, runtype='run3auau'):
"""

# Identify run range from the run type
run_ranges = {'run2pp': (47286, 53880), 'run2auau': (54128, 54974), 'run3auau': (66457, 78954), 'run3pp': (79146, 200000)}
run_ranges = {'run2pp': (47286, 53880), 'run2auau': (54128, 54974), 'run3auau': (66457, 78954), 'run3pp': (79146, 81668), 'run3oo': (82374, 200000)}
params = {'run_start': run_ranges[runtype][0], 'run_end': run_ranges[runtype][1]}

query = """
Expand Down Expand Up @@ -296,25 +302,55 @@ def generate_run_list(reduced_process_df, output):

group_df['full_file_path'].to_csv(filepath, index=False, header=False)

def create_symlink(target_path, link_path):
"""
Creates a symbolic link pointing to target_path named link_path.

Replicates the behavior of 'ln -sfn' by removing the destination
if it already exists (including broken symlinks) before creating
the new link.

Args:
target_path (str or Path): The existing file or directory to point to.
link_path (str or Path): The path where the symlink should be created.

Raises:
OSError: If the link_path exists as a real directory and cannot be unlinked, or if permission is denied.
Comment on lines +309 to +318
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring states that the function replicates 'ln -sfn' behavior, but the '-n' flag in 'ln -sfn' treats the destination as a normal file if it's a symlink to a directory. The current implementation doesn't replicate this behavior - it will follow symlinks when checking 'link.exists()'. For true 'ln -sfn' equivalence, use 'link.is_symlink()' check only, or document the deviation from '-n' behavior.

Suggested change
Replicates the behavior of 'ln -sfn' by removing the destination
if it already exists (including broken symlinks) before creating
the new link.
Args:
target_path (str or Path): The existing file or directory to point to.
link_path (str or Path): The path where the symlink should be created.
Raises:
OSError: If the link_path exists as a real directory and cannot be unlinked, or if permission is denied.
Approximates the behavior of ``ln -sfn`` by removing the destination
at ``link_path`` if it already exists (including broken symlinks)
before creating the new link.
Note:
This function uses :meth:`pathlib.Path.exists`, which follows
symlinks to directories, and unlinks any existing path at
``link_path``. As a result, its behavior may differ from the
exact semantics of ``ln -sfn`` (in particular the ``-n`` flag)
in some edge cases.
Args:
target_path (str or Path): The existing file or directory to point to.
link_path (str or Path): The path where the symlink should be created.
Raises:
OSError: If the link_path exists and cannot be unlinked, or if
permission is denied.

Copilot uses AI. Check for mistakes.
"""
target = Path(target_path)
link = Path(link_path)

# Check if the link destination already exists
if link.exists() or link.is_symlink():
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function attempts to unlink a path that could be a non-empty directory. While the docstring mentions that 'link.unlink()' won't remove target directories, it will fail with 'IsADirectoryError' if 'link_path' is an actual directory (not a symlink). The current code doesn't handle this case. Consider using 'shutil.rmtree()' if the path is a directory, or explicitly check the path type before calling 'unlink()'.

Suggested change
if link.exists() or link.is_symlink():
if link.exists() or link.is_symlink():
# If link_path is a real directory (not a symlink), do not attempt to unlink it.
# This matches the documented behavior and avoids IsADirectoryError.
if link.is_dir() and not link.is_symlink():
raise OSError(f"Cannot overwrite existing directory at {link}")

Copilot uses AI. Check for mistakes.
# link.unlink() removes the symlink or file, but not the target directory
link.unlink()
print(f"Removed existing link/file at {link}")

# Create the new symlink
link.symlink_to(target)
print(f"Created symlink: {link} -> {target}")
Comment on lines +327 to +331
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'create_symlink' function uses print statements instead of the logger that's used throughout the rest of the script. For consistency and proper log file integration, replace 'print()' calls with 'logger.info()' or 'logger.debug()' to ensure these messages are captured in the log file configured at line 437.

Suggested change
print(f"Removed existing link/file at {link}")
# Create the new symlink
link.symlink_to(target)
print(f"Created symlink: {link} -> {target}")
logger.info(f"Removed existing link/file at {link}")
# Create the new symlink
link.symlink_to(target)
logger.info(f"Created symlink: {link} -> {target}")

Copilot uses AI. Check for mistakes.
Comment on lines +305 to +331
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Minor issue: unlink() fails on real directories.

If link_path is an existing real directory (not a symlink), link.unlink() at line 326 will raise IsADirectoryError. The docstring acknowledges this but doesn't handle it. If this scenario is possible in your workflow, consider using shutil.rmtree() for directories.

Also, for consistency with the rest of the module, consider using logger.info() instead of print() for the status messages.

🛠️ Optional fix for directory handling and logging
 def create_symlink(target_path, link_path):
     """
     Creates a symbolic link pointing to target_path named link_path.
 
     Replicates the behavior of 'ln -sfn' by removing the destination 
     if it already exists (including broken symlinks) before creating 
     the new link.
 
     Args:
         target_path (str or Path): The existing file or directory to point to.
         link_path (str or Path): The path where the symlink should be created.
 
     Raises:
         OSError: If the link_path exists as a real directory and cannot be unlinked, or if permission is denied.
     """
     target = Path(target_path)
     link = Path(link_path)
 
     # Check if the link destination already exists
     if link.exists() or link.is_symlink():
-        # link.unlink() removes the symlink or file, but not the target directory
-        link.unlink()
-        print(f"Removed existing link/file at {link}")
+        if link.is_symlink() or link.is_file():
+            link.unlink()
+        else:
+            shutil.rmtree(link)
+        logger.info(f"Removed existing link/file at {link}")
 
     # Create the new symlink
     link.symlink_to(target)
-    print(f"Created symlink: {link} -> {target}")
+    logger.info(f"Created symlink: {link} -> {target}")


def generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_genStatus, condor_script, do_condor_submit):
"""
Generate condor submission directory to generate the CDB files for the runs.
"""
# 9. Condor Submission
if os.path.exists(condor_log_dir):
shutil.rmtree(condor_log_dir)
logger.info(f"Directory '{condor_log_dir}' and its contents removed.")
condor_log_dir.mkdir(parents=True, exist_ok=True)
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating the directory only to immediately delete it in the next line is redundant and inefficient. The first 'mkdir' call serves no purpose since 'shutil.rmtree' with 'ignore_errors=True' will silently succeed whether the directory exists or not. Remove line 337 to eliminate this unnecessary operation.

Suggested change
condor_log_dir.mkdir(parents=True, exist_ok=True)

Copilot uses AI. Check for mistakes.
shutil.rmtree(condor_log_dir, ignore_errors=True)
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using 'shutil.rmtree' with 'ignore_errors=True' silently suppresses all errors, including permission issues or I/O errors that might indicate a real problem. This could mask important failures. Consider using 'onerror' or checking if the directory exists first with proper error logging, or at minimum log when the directory is successfully removed.

Suggested change
shutil.rmtree(condor_log_dir, ignore_errors=True)
if condor_log_dir.exists():
try:
shutil.rmtree(condor_log_dir)
logger.info("Removed existing Condor log directory: %s", condor_log_dir)
except OSError as exc:
logger.warning("Failed to remove Condor log directory %s: %s. Proceeding with existing contents.", condor_log_dir, exc)

Copilot uses AI. Check for mistakes.

# Setup Condor Log Dir
condor_log_dir.mkdir(parents=True, exist_ok=True)

shutil.copy(bin_genStatus, output)
shutil.copy(condor_script, output)

dataset_dir = output / 'datasets'
list_files = list(dataset_dir.glob('*.list'))

jobs = 0
with open(output / 'jobs.list', 'w', encoding="utf-8") as f:
for file_path in list_files:
f.write(str(file_path.resolve()) + '\n')
jobs += 1
Comment on lines +349 to +353
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job counting can be simplified by using 'len(list_files)' instead of maintaining a separate counter. This eliminates the need for the 'jobs' variable and the increment operation, making the code more concise and less error-prone.

Suggested change
jobs = 0
with open(output / 'jobs.list', 'w', encoding="utf-8") as f:
for file_path in list_files:
f.write(str(file_path.resolve()) + '\n')
jobs += 1
jobs = len(list_files)
with open(output / 'jobs.list', 'w', encoding="utf-8") as f:
for file_path in list_files:
f.write(str(file_path.resolve()) + '\n')

Copilot uses AI. Check for mistakes.

# list of subdirectories to create
subdirectories = ['stdout', 'error', 'output']
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The subdirectory name 'output' in the subdirectories list creates a path collision with the function parameter 'output'. This makes the code confusing because 'output / 'output'' refers to a subdirectory inside the output directory. Consider renaming the subdirectory to something more descriptive like 'results', 'job_output', or 'completed' to improve code clarity and avoid confusion.

Copilot uses AI. Check for mistakes.
Expand All @@ -335,11 +371,26 @@ def generate_condor(output, condor_log_dir, condor_log_file, condor_memory, bin_
with open(output / 'genStatus.sub', mode="w", encoding="utf-8") as file:
file.write(submit_file_content)

command = f'rm -rf {condor_log_dir} && mkdir {condor_log_dir} && cd {output} && condor_submit genStatus.sub -queue "input_run from jobs.list"'
command = f'condor_submit genStatus.sub -queue "input_run from jobs.list"'

if do_condor_submit:
run_command_and_log(command, output)

job_dir = output / 'output'

# Check Job Progress
while True:
finished_jobs = sum(1 for x in job_dir.iterdir() if x.is_dir())
Comment on lines +379 to +383
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job monitoring starts immediately after submitting jobs without any initial delay, which could cause a race condition. If 'condor_submit' takes time to create the initial job directories, the first check may incorrectly show 0/N jobs completed. Consider adding a small initial delay (e.g., a few seconds) before starting the monitoring loop, or check that the job submission has completed successfully before beginning to poll.

Copilot uses AI. Check for mistakes.

if finished_jobs >= jobs:
logger.info(f"All Jobs Complete. {finished_jobs}/{jobs} Jobs.")
break

logger.info(f"Waiting for Jobs... {finished_jobs}/{jobs} done.")
time.sleep(15) # Check every 15 seconds
Comment on lines +382 to +390
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job monitoring loop assumes that each completed job creates a directory in 'output/', but this assumption is not validated. If jobs fail silently or create files instead of directories, the counter will be inaccurate and the loop may wait indefinitely. Consider adding error handling to check for job failures (e.g., by checking stderr files in the 'error/' directory) or adding a timeout mechanism to prevent infinite waiting.

Suggested change
while True:
finished_jobs = sum(1 for x in job_dir.iterdir() if x.is_dir())
if finished_jobs >= jobs:
logger.info(f"All Jobs Complete. {finished_jobs}/{jobs} Jobs.")
break
logger.info(f"Waiting for Jobs... {finished_jobs}/{jobs} done.")
time.sleep(15) # Check every 15 seconds
start_time = time.time()
# Maximum time to wait for all jobs to finish (in seconds)
max_wait_seconds = 6 * 60 * 60 # 6 hours
while True:
# Count both files and directories as completed job artifacts
finished_jobs = sum(1 for x in job_dir.iterdir() if x.is_dir() or x.is_file())
if finished_jobs >= jobs:
logger.info(f"All Jobs Complete. {finished_jobs}/{jobs} Jobs.")
break
# Timeout to avoid waiting indefinitely if jobs fail silently
if time.time() - start_time > max_wait_seconds:
logger.error(
"Timeout reached while waiting for Condor jobs: "
f"{finished_jobs}/{jobs} jobs appear to have produced output "
f"after waiting {max_wait_seconds} seconds."
)
break
logger.info(f"Waiting for Jobs... {finished_jobs}/{jobs} done.")
time.sleep(15) # Check every 15 seconds

Copilot uses AI. Check for mistakes.
Comment on lines +379 to +390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential infinite loop if jobs fail.

This polling loop has no timeout mechanism. If condor jobs fail without creating output directories, the script will hang indefinitely. For a production cron job, this could block subsequent runs.

Consider adding:

  1. A maximum wait time / timeout
  2. Checking for job failures via condor_q or error files
  3. At minimum, logging a warning after extended waiting periods
🛡️ Suggested timeout mechanism
         job_dir = output / 'output'
 
+        max_wait_seconds = 3600 * 6  # 6 hour timeout
+        elapsed = 0
         # Check Job Progress
         while True:
             finished_jobs = sum(1 for x in job_dir.iterdir() if x.is_dir())
 
             if finished_jobs >= jobs:
                 logger.info(f"All Jobs Complete. {finished_jobs}/{jobs} Jobs.")
                 break
 
+            if elapsed >= max_wait_seconds:
+                logger.error(f"Timeout reached. Only {finished_jobs}/{jobs} jobs completed.")
+                break
+
             logger.info(f"Waiting for Jobs... {finished_jobs}/{jobs} done.")
             time.sleep(15) # Check every 15 seconds
+            elapsed += 15


else:
command = f'cd {output} && {command}'
logger.info(f'\nSubmission Command: {command}')

def main():
Expand All @@ -348,7 +399,7 @@ def main():
"""
args = parser.parse_args()
run_type = args.run_type
CURRENT_DATE = str(datetime.date.today())
CURRENT_DATE = datetime.now().strftime("%m-%d-%y")
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The date format was changed from YYYY-MM-DD to MM-DD-YY, which breaks chronological sorting. Directories and files named with this format will not sort correctly by name (e.g., '12-31-24' comes before '01-01-25' alphabetically but represents a later date). Revert to the ISO 8601 format '%Y-%m-%d' for proper chronological ordering, or use '%Y-%m-%d' if you need the full year.

Suggested change
CURRENT_DATE = datetime.now().strftime("%m-%d-%y")
CURRENT_DATE = datetime.now().strftime("%Y-%m-%d")

Copilot uses AI. Check for mistakes.
output = Path(args.output).resolve()
condor_memory = args.memory
USER = os.environ.get('USER')
Expand All @@ -360,7 +411,11 @@ def main():
# Append timestamp if the automated condor submission is enabled.
# This will ensure the output directory is unique for each call of the cron job.
if do_condor_submit:
output += '-' + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
output = Path(f"{output}-{timestamp}")

# Save date tags of the calibration output
output_calib_tags = Path(args.output_calib_tags).resolve() if args.output_calib_tags else output / 'tags'
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When 'do_condor_submit' is True, the output directory gets a timestamp suffix (line 415), but 'output_calib_tags' is set afterward (line 418). This means the default tags directory will be inside the timestamped directory (e.g., 'test-2024-12-18-10-30-00/tags'), which may not be the intended behavior. If you want a shared tags directory across runs, calculate 'output_calib_tags' before modifying 'output', or use the original output path for the fallback.

Copilot uses AI. Check for mistakes.

log_file = output / f'log-{CURRENT_DATE}.txt'

Expand All @@ -374,6 +429,10 @@ def main():
bin_genStatus = Path(args.bin_genStatus).resolve() if args.bin_genStatus else OFFLINE_MAIN_BIN / 'CaloCDB-GenStatus'

output.mkdir(parents=True, exist_ok=True)
output_calib_tags.mkdir(parents=True, exist_ok=True)

tag = output_calib_tags / CURRENT_DATE
create_symlink(output / 'output', tag)
Comment on lines +434 to +435
Copy link

Copilot AI Dec 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The symlink is created before the 'output/output' directory exists. The target directory 'output / 'output'' is only created later by the condor jobs in the 'generate_condor' function (line 360). This will cause the symlink to point to a non-existent directory at creation time, potentially resulting in a broken symlink. Consider moving this symlink creation to after the directory structure is set up in 'generate_condor', or create the output directory explicitly before creating the symlink.

Suggested change
tag = output_calib_tags / CURRENT_DATE
create_symlink(output / 'output', tag)
# Ensure the output subdirectory exists before creating the symlink
output_subdir = output / 'output'
output_subdir.mkdir(parents=True, exist_ok=True)
tag = output_calib_tags / CURRENT_DATE
create_symlink(output_subdir, tag)

Copilot uses AI. Check for mistakes.
Comment on lines +432 to +435
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Symlink created before target directory exists.

At line 435, create_symlink(output / 'output', tag) is called, but the output/output directory is only created later at line 360 in generate_condor(). This creates a symlink pointing to a non-existent target.

While symlinks to non-existent paths are technically valid (dangling symlinks), this may cause confusion or errors if something tries to follow the symlink before jobs complete.

Consider either:

  1. Moving the symlink creation to after generate_condor() completes
  2. Creating the output/output directory earlier in main()


setup_logging(log_file, logging.DEBUG)

Expand All @@ -383,9 +442,10 @@ def main():
logging.basicConfig()

logger.info('#'*40)
logger.info(f'LOGGING: {str(datetime.datetime.now())}')
logger.info(f'LOGGING: {str(datetime.now())}')
logger.info(f'Run Type: {run_type}')
logger.info(f'Output Directory: {output}')
logger.info(f'Output Calib Tags: {output_calib_tags}')
logger.info(f'Condor Memory: {condor_memory}')
logger.info(f'Do Condor Submission: {do_condor_submit}')
logger.info(f'Filter Datasets Bin: {bin_filter_datasets}')
Expand Down