From 8f80685e038164dbcfd0172a1033955de9c76bd7 Mon Sep 17 00:00:00 2001 From: femalves Date: Fri, 27 Mar 2026 19:46:16 -0400 Subject: [PATCH] Batch large bibcode lists in update_sitemaps_auto to avoid RabbitMQ message size limit --- adsmp/tests/test_run.py | 111 ++++++++++++++++++++++++++++++++++++++++ config.py | 1 + run.py | 69 ++++++++++++++++--------- 3 files changed, 156 insertions(+), 25 deletions(-) diff --git a/adsmp/tests/test_run.py b/adsmp/tests/test_run.py index f93d066..cfe5d49 100644 --- a/adsmp/tests/test_run.py +++ b/adsmp/tests/test_run.py @@ -610,6 +610,117 @@ def capture_manage_sig(bibcodes, action): expected_bibcodes = ['2023ApJ...123..791E', '2023ApJ...123..789C', '2023ApJ...123..790D'] self.assertEqual(set(captured_bibcodes), set(expected_bibcodes)) + def test_update_sitemaps_auto_batching_large_bibcode_list(self): + """Test that large bibcode lists are split into batches to avoid RabbitMQ message size limits""" + + with self.app.session_scope() as session: + for i in range(5): + session.add(Records( + bibcode=f'2023ApJ...123..{450+i:03d}A', + bib_data='{"title": "Test"}', + bib_data_updated=get_date() - timedelta(hours=6), + solr_processed=get_date() - timedelta(days=5), + status='success' + )) + session.commit() + + with patch('run.chain') as mock_chain: + with patch('run.app', self.app): + with patch('run.config', {'SITEMAP_MAX_BIBCODES_PER_MESSAGE': 2}): + mock_workflow = Mock() + mock_result = Mock() + mock_result.id = 'test-batched-workflow' + mock_workflow.apply_async.return_value = mock_result + mock_chain.return_value = mock_workflow + + captured_si_calls = [] + def capture_si(bibcodes, action): + captured_si_calls.append((list(bibcodes), action)) + return Mock() + + with patch.object(tasks.task_manage_sitemap, 'si', side_effect=capture_si): + with patch.object(tasks.task_update_sitemap_files, 'si', return_value=Mock()): + workflow_id = update_sitemaps_auto(days_back=1) + + self.assertEqual(workflow_id, 'test-batched-workflow') + self.assertEqual(len(captured_si_calls), 3) + for _, action in captured_si_calls: + self.assertEqual(action, 'add') + self.assertTrue(all(len(batch) <= 2 for batch, _ in captured_si_calls)) + total = sum(len(batch) for batch, _ in captured_si_calls) + self.assertEqual(total, 5) + + def test_update_sitemaps_auto_small_list_no_batching(self): + """Test that small bibcode lists use a single chain without batching""" + + with self.app.session_scope() as session: + session.add(Records( + bibcode='2023ApJ...123..456A', + bib_data='{"title": "Test"}', + bib_data_updated=get_date() - timedelta(hours=6), + status='success' + )) + session.commit() + + with patch('run.chain') as mock_chain: + with patch('run.app', self.app): + with patch('run.config', {'SITEMAP_MAX_BIBCODES_PER_MESSAGE': 100000}): + mock_workflow = Mock() + mock_result = Mock() + mock_result.id = 'test-single-workflow' + mock_workflow.apply_async.return_value = mock_result + mock_chain.return_value = mock_workflow + + s_called = [] + def capture_s(bibcodes, action): + s_called.append((list(bibcodes), action)) + return Mock() + + with patch.object(tasks.task_manage_sitemap, 's', side_effect=capture_s): + with patch.object(tasks.task_update_sitemap_files, 's', return_value=Mock()): + workflow_id = update_sitemaps_auto(days_back=1) + + self.assertEqual(workflow_id, 'test-single-workflow') + self.assertEqual(len(s_called), 1) + self.assertEqual(s_called[0][0], ['2023ApJ...123..456A']) + chain_args = mock_chain.call_args[0] + self.assertEqual(len(chain_args), 2) + + def test_update_sitemaps_auto_at_batch_boundary(self): + """Test that exactly max_bibcodes_per_message uses single chain (no batching)""" + + with self.app.session_scope() as session: + for i in range(3): + session.add(Records( + bibcode=f'2023ApJ...123..{460+i:03d}A', + bib_data='{"title": "Test"}', + bib_data_updated=get_date() - timedelta(hours=6), + status='success' + )) + session.commit() + + with patch('run.chain') as mock_chain: + with patch('run.app', self.app): + with patch('run.config', {'SITEMAP_MAX_BIBCODES_PER_MESSAGE': 3}): + mock_workflow = Mock() + mock_result = Mock() + mock_result.id = 'test-boundary-workflow' + mock_workflow.apply_async.return_value = mock_result + mock_chain.return_value = mock_workflow + + s_called = [] + def capture_s(bibcodes, action): + s_called.append(len(bibcodes)) + return Mock() + + with patch.object(tasks.task_manage_sitemap, 's', side_effect=capture_s): + with patch.object(tasks.task_update_sitemap_files, 's', return_value=Mock()): + workflow_id = update_sitemaps_auto(days_back=1) + + self.assertEqual(workflow_id, 'test-boundary-workflow') + self.assertEqual(len(s_called), 1) + self.assertEqual(s_called[0], 3) + def test_cleanup_invalid_sitemaps(self): """Test cleanup_invalid_sitemaps function""" diff --git a/config.py b/config.py index 3764fcc..a564f08 100644 --- a/config.py +++ b/config.py @@ -56,6 +56,7 @@ # Sitemap index generation retry configuration # Default 100 retries = ~90 minutes. Increase for larger databases (e.g., 300 for ~5.5 hours) SITEMAP_INDEX_MAX_RETRIES = 100 +SITEMAP_MAX_BIBCODES_PER_MESSAGE = 100000 # Site configurations for multi-site sitemap generation diff --git a/run.py b/run.py index 15b4978..786bbbb 100755 --- a/run.py +++ b/run.py @@ -558,73 +558,92 @@ def cleanup_invalid_sitemaps(): def update_sitemaps_auto(days_back=1): """ Automatically find and process records needing sitemap updates - + Finds records that need sitemap updates based on: 1. Records where bib_data_updated >= cutoff_date 2. Records where solr_processed >= cutoff_date - + Uses UNION to combine both criteria, ensuring records that are either newly ingested OR recently processed by SOLR are included in sitemaps. - + Excludes records that already have update_flag=True to avoid duplicate work, as those records are already scheduled for sitemap regeneration. - + + Splits large bibcode lists into batches to avoid exceeding RabbitMQ's + message size limit (default max 256 MB). + Args: days_back: Number of days to look back for changes - + Returns: - str: Workflow task ID (chains manage + file generation), or None if no updates needed + str: Workflow task ID for the final chain, or None if no updates needed """ - - + + logger.info('Starting automatic sitemap update (looking back %d days)', days_back) - + # Calculate cutoff date cutoff_date = get_date() - timedelta(days=days_back) logger.info('Looking for records updated since: %s', cutoff_date.isoformat()) - + bibcodes_to_update = [] - + with app.session_scope() as session: # Find all records that were updated recently OR had SOLR processing recently # Exclude records that already have update_flag=True to avoid duplicate work - + # Subquery to get bibcodes that already have update_flag=True already_flagged_subquery = session.query(SitemapInfo.bibcode).filter( SitemapInfo.update_flag.is_(True) ) - + bib_data_query = session.query(Records.bibcode).filter( Records.bib_data_updated >= cutoff_date, ~Records.bibcode.in_(already_flagged_subquery) ) - + solr_processed_query = session.query(Records.bibcode).filter( Records.solr_processed >= cutoff_date, ~Records.bibcode.in_(already_flagged_subquery) ) - + # UNION automatically eliminates duplicates combined_query = bib_data_query.union(solr_processed_query) - + bibcodes_to_update = [record.bibcode for record in combined_query] - + logger.info('Found %d records updated since %s (excluding already flagged)', len(bibcodes_to_update), cutoff_date.isoformat()) - + if not bibcodes_to_update: logger.info('No records need sitemap updates') return None - + logger.info('Submitting %d records for sitemap processing', len(bibcodes_to_update)) - - # Chain tasks: manage sitemap → update files + + max_bibcodes_per_message = config.get('SITEMAP_MAX_BIBCODES_PER_MESSAGE', 100000) + + if len(bibcodes_to_update) <= max_bibcodes_per_message: + workflow = chain( + tasks.task_manage_sitemap.s(bibcodes_to_update, 'add'), + tasks.task_update_sitemap_files.s() + ) + result = workflow.apply_async(priority=0) + logger.info('Submitted sitemap workflow: %s', result.id) + return result.id + + batches = [bibcodes_to_update[i:i + max_bibcodes_per_message] + for i in range(0, len(bibcodes_to_update), max_bibcodes_per_message)] + logger.info('Splitting %d bibcodes into %d batches of up to %d', + len(bibcodes_to_update), len(batches), max_bibcodes_per_message) + + manage_tasks = [tasks.task_manage_sitemap.si(batch, 'add') for batch in batches] workflow = chain( - tasks.task_manage_sitemap.s(bibcodes_to_update, 'add'), - tasks.task_update_sitemap_files.s() + *manage_tasks, + tasks.task_update_sitemap_files.si() ) result = workflow.apply_async(priority=0) - logger.info('Submitted sitemap workflow: %s', result.id) - + logger.info('Submitted sitemap workflow (%d batches): %s', len(batches), result.id) + return result.id