diff --git a/adsrefpipe/tests/unittests/test_run.py b/adsrefpipe/tests/unittests/test_run.py new file mode 100644 index 0000000..8c996fe --- /dev/null +++ b/adsrefpipe/tests/unittests/test_run.py @@ -0,0 +1,73 @@ +import io +import os +import sys +import unittest +from contextlib import redirect_stderr +from datetime import datetime +from unittest.mock import patch + +project_home = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../../')) +if project_home not in sys.path: + sys.path.insert(0, project_home) + +import run + + +class TestRunResolveTimeDelay(unittest.TestCase): + + def test_resolve_uses_config_default_time_delay(self): + subdir = ['/tmp/input/A/file1.raw', '/tmp/input/A/file2.raw'] + + with patch.object(run, 'get_date', return_value=datetime(2024, 1, 1)), \ + patch.object(run, 'get_source_filenames', return_value=[subdir]), \ + patch.object(run, 'process_files') as mock_process_files, \ + patch.object(run.time, 'sleep') as mock_sleep, \ + patch.object(run.logger, 'info'), \ + patch.object(run.processed_log, 'info'): + result = run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw']) + + self.assertEqual(result, 0) + mock_process_files.assert_called_once_with(subdir) + mock_sleep.assert_called_once_with(len(subdir) / run.config['REFERENCE_PIPELINE_DEFAULT_TIME_DELAY']) + + def test_resolve_explicit_time_delay_overrides_config_default(self): + subdir = [ + '/tmp/input/A/file1.raw', + '/tmp/input/A/file2.raw', + '/tmp/input/A/file3.raw', + '/tmp/input/A/file4.raw', + ] + + with patch.object(run, 'get_date', return_value=datetime(2024, 1, 1)), \ + patch.object(run, 'get_source_filenames', return_value=[subdir]), \ + patch.object(run, 'process_files') as mock_process_files, \ + patch.object(run.time, 'sleep') as mock_sleep, \ + patch.object(run.logger, 'info'), \ + patch.object(run.processed_log, 'info'): + result = run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '2']) + + self.assertEqual(result, 0) + mock_process_files.assert_called_once_with(subdir) + mock_sleep.assert_called_once_with(2.0) + + def test_resolve_rejects_zero_time_delay(self): + stderr = io.StringIO() + + with redirect_stderr(stderr), self.assertRaises(SystemExit) as exc: + run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '0']) + + self.assertEqual(exc.exception.code, 2) + self.assertIn('time_delay must be greater than 0.', stderr.getvalue()) + + def test_resolve_rejects_negative_time_delay(self): + stderr = io.StringIO() + + with redirect_stderr(stderr), self.assertRaises(SystemExit) as exc: + run.main(['RESOLVE', '-p', '/tmp/input', '-e', '*.raw', '-t', '-5']) + + self.assertEqual(exc.exception.code, 2) + self.assertIn('time_delay must be greater than 0.', stderr.getvalue()) + + +if __name__ == '__main__': + unittest.main() diff --git a/config.py b/config.py index 0ced7c3..e9de475 100644 --- a/config.py +++ b/config.py @@ -29,6 +29,9 @@ # checking queues every this many seconds QUEUE_AUDIT_INTERVAL = 10 +# default delay rate divisor used for RESOLVE batch pauses +REFERENCE_PIPELINE_DEFAULT_TIME_DELAY = 1000.0 + # true if to compare the resolved records with classic COMPARE_CLASSIC = True @@ -39,4 +42,4 @@ MAX_QUEUE_RETRIES = 3 # indication that this is considered an incomplete reference -INCOMPLETE_REFERENCE = ' --- Incomplete' \ No newline at end of file +INCOMPLETE_REFERENCE = ' --- Incomplete' diff --git a/run.py b/run.py old mode 100755 new mode 100644 index 12bc533..96fc167 --- a/run.py +++ b/run.py @@ -1,5 +1,6 @@ import sys import os, fnmatch +from collections import defaultdict from adsputils import setup_logging, load_config, get_date from datetime import timedelta @@ -16,6 +17,20 @@ app = tasks.app logger = setup_logging('run.py') +processed_log = setup_logging('processed_subdirectories.py') + + +def positive_float(value: str) -> float: + """ + argparse type for positive floating point values. + + :param value: CLI argument value to validate + :return: validated float value + """ + parsed_value = float(value) + if parsed_value <= 0: + raise argparse.ArgumentTypeError('time_delay must be greater than 0.') + return parsed_value def run_diagnostics(bibcodes: list, source_filenames: list) -> None: @@ -40,21 +55,38 @@ def run_diagnostics(bibcodes: list, source_filenames: list) -> None: def get_source_filenames(source_file_path: str, file_extension: str, date_cutoff: time.struct_time) -> list: """ - retrieves a list of files from the given directory with the specified file extension and modified date after the cutoff + Return a list of lists of matching files, grouped by the first-level + subdirectory under `source_file_path`. If files live directly in + `source_file_path`, they are grouped together as one inner list. :param source_file_path: the path of the directory to search for files :param file_extension: the file extension pattern to match :param date_cutoff: the modified date cutoff, files modified after this date will be included only - :return: list of files in the directory with modified date after the cutoff, if any + :return: list of lists of files in the directory with modified date after the cutoff, if any """ - list_files = [] + groups = defaultdict(list) + ROOT = "__ROOT__" + for root, dirs, files in os.walk(source_file_path): for basename in files: if fnmatch.fnmatch(basename, file_extension): filename = os.path.join(root, basename) if get_date_modified_struct_time(filename) >= date_cutoff: - list_files.append(filename) - return list_files + rel_dir = os.path.relpath(root, source_file_path) + key = ROOT if rel_dir in (".", "") else rel_dir.split(os.sep, 1)[0] + groups[key].append(filename) + + if not groups: + return [] + + # Build a stable list-of-lists: root group first (if present), then subdirs sorted + result = [] + if ROOT in groups: + result.append(sorted(groups[ROOT])) + for key in sorted(k for k in groups.keys() if k != ROOT): + result.append(sorted(groups[key])) + return result + def queue_references(references: list, source_filename: str, source_bibcode: str, parsername: str) -> None: @@ -199,7 +231,7 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib logger.error("Unable to process %s. Skipped!" % toREFs.filename) -if __name__ == '__main__': +def main(argv=None) -> int: parser = argparse.ArgumentParser(description='Process user input.') @@ -276,6 +308,20 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib dest='fail', action='store_true', help='Reprocess records that failed to get resolved') + resolve.add_argument('-t', + '--time_delay', + dest='time_delay', + action='store', + type=positive_float, + default=config['REFERENCE_PIPELINE_DEFAULT_TIME_DELAY'], + help='Add time delay between processing subdirectories for large batches. The delay time is batch size divided by input value in seconds. Defaults to REFERENCE_PIPELINE_DEFAULT_TIME_DELAY from config.') + resolve.add_argument('-sp', + '--skip_processed_directories', + dest='skip_processed', + action='store', + default=None, + help='Skip directories that have been previously processed') + stats = subparsers.add_parser('STATS', help='Print out statistics of the reference source file') stats.add_argument('-b', @@ -315,7 +361,8 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib action='store_true', help='Return all resolved bibcode') - args = parser.parse_args() + args = parser.parse_args(argv) + #import pdb;pdb.set_trace() if args.action == 'DIAGNOSTICS': if args.parse_filename: @@ -345,8 +392,32 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib else: date_cutoff = get_date('1972') source_filenames = get_source_filenames(args.path, args.extension, date_cutoff.timetuple()) + delay_rate = args.time_delay + skip_files = [] if len(source_filenames) > 0: - process_files(source_filenames) + for subdir in source_filenames: + subdir_name = subdir[0].split('/') + subdir_name = "/".join(subdir_name[:-1]) + delay_time = float(len(subdir)) / delay_rate + if args.skip_processed: + skip_file = args.skip_processed + try: + with open(skip_file,'r') as file: + skip_files = file.read().splitlines() + print(f'Skipping {len(skip_files)} subdirectories') + except: + skip_files = [] + print('No files to skip') + if subdir_name not in skip_files: + process_files(subdir) + processed_log.info(f"{subdir_name}") + logger.info(f"Processed subdirectoy: {subdir_name}") + print(f"Processed subdirectoy: {subdir_name}") + logger.info(f"Pause for {delay_time} seconds to process") + print(f"Pause for {delay_time} seconds to process") + time.sleep(delay_time) + else: + print(f'Skipping {subdir_name}') elif args.confidence: date_cutoff = get_date() - timedelta(days=int(args.days)) if args.days else None reprocess_references(ReprocessQueryType.score, score_cutoff=float(args.confidence), date_cutoff=date_cutoff) @@ -391,4 +462,8 @@ def reprocess_references(reprocess_type: str, score_cutoff: float = 0, match_bib # if args.all: # else: - sys.exit(0) \ No newline at end of file + return 0 + + +if __name__ == '__main__': + sys.exit(main())