From 29583742442f0a492f5b73e10b9a3d08f7a5a491 Mon Sep 17 00:00:00 2001 From: thomasallen Date: Tue, 21 Oct 2025 10:24:44 -0700 Subject: [PATCH 1/4] input throttling --- run.py | 75 +++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 67 insertions(+), 8 deletions(-) mode change 100755 => 100644 run.py diff --git a/run.py b/run.py old mode 100755 new mode 100644 index 12bc533..598d3d0 --- a/run.py +++ b/run.py @@ -1,5 +1,6 @@ import sys import os, fnmatch +from collections import defaultdict from adsputils import setup_logging, load_config, get_date from datetime import timedelta @@ -16,6 +17,7 @@ app = tasks.app logger = setup_logging('run.py') +processed_log = setup_logging('processed_subdirectories.py') def run_diagnostics(bibcodes: list, source_filenames: list) -> None: @@ -38,23 +40,40 @@ def run_diagnostics(bibcodes: list, source_filenames: list) -> None: return -def get_source_filenames(source_file_path: str, file_extension: str, date_cutoff: time.struct_time) -> list: +def get_source_filenames(source_file_path, file_extension, date_cutoff): """ - retrieves a list of files from the given directory with the specified file extension and modified date after the cutoff + Return a list of lists of matching files, grouped by the first-level + subdirectory under `source_file_path`. If files live directly in + `source_file_path`, they are grouped together as one inner list. :param source_file_path: the path of the directory to search for files :param file_extension: the file extension pattern to match :param date_cutoff: the modified date cutoff, files modified after this date will be included only - :return: list of files in the directory with modified date after the cutoff, if any + :return: list of lists of files in the directory with modified date after the cutoff, if any """ - list_files = [] + groups = defaultdict(list) + ROOT = "__ROOT__" + for root, dirs, files in os.walk(source_file_path): for basename in files: if fnmatch.fnmatch(basename, file_extension): filename = os.path.join(root, basename) if get_date_modified_struct_time(filename) >= date_cutoff: - list_files.append(filename) - return list_files + rel_dir = os.path.relpath(root, source_file_path) + key = ROOT if rel_dir in (".", "") else rel_dir.split(os.sep, 1)[0] + groups[key].append(filename) + + if not groups: + return [] + + # Build a stable list-of-lists: root group first (if present), then subdirs sorted + result = [] + if ROOT in groups: + result.append(sorted(groups[ROOT])) + for key in sorted(k for k in groups.keys() if k != ROOT): + result.append(sorted(groups[key])) + return result + def queue_references(references: list, source_filename: str, source_bibcode: str, parsername: str) -> None: @@ -276,6 +295,19 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib dest='fail', action='store_true', help='Reprocess records that failed to get resolved') + resolve.add_argument('-t', + '--time_delay', + dest='time_delay', + action='store', + default=10., + help='Add time delay between processing subdirectories for large batches. The delay time is batch size divided by input value in seconds.') + resolve.add_argument('-sp', + '--skip_processed_directories', + dest='skip_processed', + action='store', + default=None, + help='Skip directories that have been previously processed') + stats = subparsers.add_parser('STATS', help='Print out statistics of the reference source file') stats.add_argument('-b', @@ -316,6 +348,7 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib help='Return all resolved bibcode') args = parser.parse_args() + #import pdb;pdb.set_trace() if args.action == 'DIAGNOSTICS': if args.parse_filename: @@ -345,8 +378,34 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib else: date_cutoff = get_date('1972') source_filenames = get_source_filenames(args.path, args.extension, date_cutoff.timetuple()) + if args.time_delay: + delay_rate = args.time_delay + else: + delay_rate = 1000. if len(source_filenames) > 0: - process_files(source_filenames) + for subdir in source_filenames: + subdir_name = subdir[0].split('/') + subdir_name = "/".join(subdir_name[:-1]) + delay_time = float(len(subdir))/float(delay_rate) + if args.skip_processed: + skip_file = args.skip_processed + try: + with open(skip_file,'r') as file: + skip_files = file.read().splitlines() + print(f'Skipping {len(skip_files)} subdirectories') + except: + skip_files = [] + print('No files to skip') + if subdir_name not in skip_files: + process_files(subdir) + processed_log.info(f"{subdir_name}") + logger.info(f"Processed subdirectoy: {subdir_name}") + print(f"Processed subdirectoy: {subdir_name}") + logger.info(f"Pause for {delay_time} seconds to process") + print(f"Pause for {delay_time} seconds to process") + time.sleep(delay_time) + else: + print(f'Skipping {subdir_name}') elif args.confidence: date_cutoff = get_date() - timedelta(days=int(args.days)) if args.days else None reprocess_references(ReprocessQueryType.score, score_cutoff=float(args.confidence), date_cutoff=date_cutoff) @@ -391,4 +450,4 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib # if args.all: # else: - sys.exit(0) \ No newline at end of file + sys.exit(0) From 0fcb42eb9825b21fb9e6e09c41cd4d295a9a9682 Mon Sep 17 00:00:00 2001 From: thomasallen Date: Tue, 21 Oct 2025 10:29:32 -0700 Subject: [PATCH 2/4] type hints --- run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.py b/run.py index 598d3d0..0c7eabb 100644 --- a/run.py +++ b/run.py @@ -40,7 +40,7 @@ def run_diagnostics(bibcodes: list, source_filenames: list) -> None: return -def get_source_filenames(source_file_path, file_extension, date_cutoff): +def get_source_filenames(source_file_path: str, file_extension: str, date_cutoff: time.struct_time) -> list): """ Return a list of lists of matching files, grouped by the first-level subdirectory under `source_file_path`. If files live directly in From ae3bac8c917073cd830374d9d40c53fcd7a194ec Mon Sep 17 00:00:00 2001 From: thomasallen Date: Tue, 21 Oct 2025 10:30:51 -0700 Subject: [PATCH 3/4] fix typo --- run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/run.py b/run.py index 0c7eabb..6d97120 100644 --- a/run.py +++ b/run.py @@ -40,7 +40,7 @@ def run_diagnostics(bibcodes: list, source_filenames: list) -> None: return -def get_source_filenames(source_file_path: str, file_extension: str, date_cutoff: time.struct_time) -> list): +def get_source_filenames(source_file_path: str, file_extension: str, date_cutoff: time.struct_time) -> list: """ Return a list of lists of matching files, grouped by the first-level subdirectory under `source_file_path`. If files live directly in From 87c5f97ceef43abb1008e0558415defb48786d37 Mon Sep 17 00:00:00 2001 From: thomasallen Date: Wed, 1 Apr 2026 11:47:36 -0700 Subject: [PATCH 4/4] Config setting and unit test for input throttle --- adsrefpipe/tests/unittests/test_run.py | 73 ++++++++++++++++++++++++++ config.py | 5 +- run.py | 36 +++++++++---- 3 files changed, 103 insertions(+), 11 deletions(-) create mode 100644 adsrefpipe/tests/unittests/test_run.py diff --git a/adsrefpipe/tests/unittests/test_run.py b/adsrefpipe/tests/unittests/test_run.py new file mode 100644 index 0000000..8c996fe --- /dev/null +++ b/adsrefpipe/tests/unittests/test_run.py @@ -0,0 +1,73 @@ +import io +import os +import sys +import unittest +from contextlib import redirect_stderr +from datetime import datetime +from unittest.mock import patch + +project_home = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')) +if project_home not in sys.path: + sys.path.insert(0, project_home) + +import run + + +class TestRunResolveTimeDelay(unittest.TestCase): + + def test_resolve_uses_config_default_time_delay(self): + subdir = ['/tmp/input/A/file1.raw', '/tmp/input/A/file2.raw'] + + with patch.object(run, 'get_date', return_value=datetime(2024, 1, 1)), \ + patch.object(run, 'get_source_filenames', return_value=[subdir]), \ + patch.object(run, 'process_files') as mock_process_files, \ + patch.object(run.time, 'sleep') as mock_sleep, \ + patch.object(run.logger, 'info'), \ + patch.object(run.processed_log, 'info'): + result = run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw']) + + self.assertEqual(result, 0) + mock_process_files.assert_called_once_with(subdir) + mock_sleep.assert_called_once_with(len(subdir) / run.config['REFERENCE_PIPELINE_DEFAULT_TIME_DELAY']) + + def test_resolve_explicit_time_delay_overrides_config_default(self): + subdir = [ + '/tmp/input/A/file1.raw', + '/tmp/input/A/file2.raw', + '/tmp/input/A/file3.raw', + '/tmp/input/A/file4.raw', + ] + + with patch.object(run, 'get_date', return_value=datetime(2024, 1, 1)), \ + patch.object(run, 'get_source_filenames', return_value=[subdir]), \ + patch.object(run, 'process_files') as mock_process_files, \ + patch.object(run.time, 'sleep') as mock_sleep, \ + patch.object(run.logger, 'info'), \ + patch.object(run.processed_log, 'info'): + result = run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '2']) + + self.assertEqual(result, 0) + mock_process_files.assert_called_once_with(subdir) + mock_sleep.assert_called_once_with(2.0) + + def test_resolve_rejects_zero_time_delay(self): + stderr = io.StringIO() + + with redirect_stderr(stderr), self.assertRaises(SystemExit) as exc: + run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '0']) + + self.assertEqual(exc.exception.code, 2) + self.assertIn('time_delay must be greater than 0.', stderr.getvalue()) + + def test_resolve_rejects_negative_time_delay(self): + stderr = io.StringIO() + + with redirect_stderr(stderr), self.assertRaises(SystemExit) as exc: + run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '-5']) + + self.assertEqual(exc.exception.code, 2) + self.assertIn('time_delay must be greater than 0.', stderr.getvalue()) + + +if __name__ == '__main__': + unittest.main() diff --git a/config.py b/config.py index 0ced7c3..e9de475 100644 --- a/config.py +++ b/config.py @@ -29,6 +29,9 @@ # checking queues every this many seconds QUEUE_AUDIT_INTERVAL = 10 +# default delay rate divisor used for RESOLVE batch pauses +REFERENCE_PIPELINE_DEFAULT_TIME_DELAY = 1000.0 + # true if to compare the resolved records with classic COMPARE_CLASSIC = True @@ -39,4 +42,4 @@ MAX_QUEUE_RETRIES = 3 # indication that this is considered an incomplete reference -INCOMPLETE_REFERENCE = ' --- Incomplete' \ No newline at end of file +INCOMPLETE_REFERENCE = ' --- Incomplete' diff --git a/run.py b/run.py index 6d97120..96fc167 100644 --- a/run.py +++ b/run.py @@ -20,6 +20,19 @@ processed_log = setup_logging('processed_subdirectories.py') +def positive_float(value: str) -> float: + """ + argparse type for positive floating point values. + + :param value: CLI argument value to validate + :return: validated float value + """ + parsed_value = float(value) + if parsed_value <= 0: + raise argparse.ArgumentTypeError('time_delay must be greater than 0.') + return parsed_value + + def run_diagnostics(bibcodes: list, source_filenames: list) -> None: """ show diagnostic information based on the provided bibcodes and source filenames @@ -218,7 +231,7 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib logger.error("Unable to process %s. Skipped!" % toREFs.filename) -if __name__ == '__main__': +def main(argv=None) -> int: parser = argparse.ArgumentParser(description='Process user input.') @@ -299,8 +312,9 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib '--time_delay', dest='time_delay', action='store', - default=10., - help='Add time delay between processing subdirectories for large batches. The delay time is batch size divided by input value in seconds.') + type=positive_float, + default=config['REFERENCE_PIPELINE_DEFAULT_TIME_DELAY'], + help='Add time delay between processing subdirectories for large batches. The delay time is batch size divided by input value in seconds. Defaults to REFERENCE_PIPELINE_DEFAULT_TIME_DELAY from config.') resolve.add_argument('-sp', '--skip_processed_directories', dest='skip_processed', @@ -347,7 +361,7 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib action='store_true', help='Return all resolved bibcode') - args = parser.parse_args() + args = parser.parse_args(argv) #import pdb;pdb.set_trace() if args.action == 'DIAGNOSTICS': @@ -378,15 +392,13 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib else: date_cutoff = get_date('1972') source_filenames = get_source_filenames(args.path, args.extension, date_cutoff.timetuple()) - if args.time_delay: - delay_rate = args.time_delay - else: - delay_rate = 1000. + delay_rate = args.time_delay + skip_files = [] if len(source_filenames) > 0: for subdir in source_filenames: subdir_name = subdir[0].split('/') subdir_name = "/".join(subdir_name[:-1]) - delay_time = float(len(subdir))/float(delay_rate) + delay_time = float(len(subdir)) / delay_rate if args.skip_processed: skip_file = args.skip_processed try: @@ -450,4 +462,8 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib # if args.all: # else: - sys.exit(0) + return 0 + + +if __name__ == '__main__': + sys.exit(main())