Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions adsmp/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,117 @@ def capture_manage_sig(bibcodes, action):
expected_bibcodes = ['2023ApJ...123..791E', '2023ApJ...123..789C', '2023ApJ...123..790D']
self.assertEqual(set(captured_bibcodes), set(expected_bibcodes))

def test_update_sitemaps_auto_batching_large_bibcode_list(self):
"""Test that large bibcode lists are split into batches to avoid RabbitMQ message size limits"""

with self.app.session_scope() as session:
for i in range(5):
session.add(Records(
bibcode=f'2023ApJ...123..{450+i:03d}A',
bib_data='{"title": "Test"}',
bib_data_updated=get_date() - timedelta(hours=6),
solr_processed=get_date() - timedelta(days=5),
status='success'
))
session.commit()

with patch('run.chain') as mock_chain:
with patch('run.app', self.app):
with patch('run.config', {'SITEMAP_MAX_BIBCODES_PER_MESSAGE': 2}):
mock_workflow = Mock()
mock_result = Mock()
mock_result.id = 'test-batched-workflow'
mock_workflow.apply_async.return_value = mock_result
mock_chain.return_value = mock_workflow

captured_si_calls = []
def capture_si(bibcodes, action):
captured_si_calls.append((list(bibcodes), action))
return Mock()

with patch.object(tasks.task_manage_sitemap, 'si', side_effect=capture_si):
with patch.object(tasks.task_update_sitemap_files, 'si', return_value=Mock()):
workflow_id = update_sitemaps_auto(days_back=1)

self.assertEqual(workflow_id, 'test-batched-workflow')
self.assertEqual(len(captured_si_calls), 3)
for _, action in captured_si_calls:
self.assertEqual(action, 'add')
self.assertTrue(all(len(batch) <= 2 for batch, _ in captured_si_calls))
total = sum(len(batch) for batch, _ in captured_si_calls)
self.assertEqual(total, 5)

def test_update_sitemaps_auto_small_list_no_batching(self):
"""Test that small bibcode lists use a single chain without batching"""

with self.app.session_scope() as session:
session.add(Records(
bibcode='2023ApJ...123..456A',
bib_data='{"title": "Test"}',
bib_data_updated=get_date() - timedelta(hours=6),
status='success'
))
session.commit()

with patch('run.chain') as mock_chain:
with patch('run.app', self.app):
with patch('run.config', {'SITEMAP_MAX_BIBCODES_PER_MESSAGE': 100000}):
mock_workflow = Mock()
mock_result = Mock()
mock_result.id = 'test-single-workflow'
mock_workflow.apply_async.return_value = mock_result
mock_chain.return_value = mock_workflow

s_called = []
def capture_s(bibcodes, action):
s_called.append((list(bibcodes), action))
return Mock()

with patch.object(tasks.task_manage_sitemap, 's', side_effect=capture_s):
with patch.object(tasks.task_update_sitemap_files, 's', return_value=Mock()):
workflow_id = update_sitemaps_auto(days_back=1)

self.assertEqual(workflow_id, 'test-single-workflow')
self.assertEqual(len(s_called), 1)
self.assertEqual(s_called[0][0], ['2023ApJ...123..456A'])
chain_args = mock_chain.call_args[0]
self.assertEqual(len(chain_args), 2)

def test_update_sitemaps_auto_at_batch_boundary(self):
"""Test that exactly max_bibcodes_per_message uses single chain (no batching)"""

with self.app.session_scope() as session:
for i in range(3):
session.add(Records(
bibcode=f'2023ApJ...123..{460+i:03d}A',
bib_data='{"title": "Test"}',
bib_data_updated=get_date() - timedelta(hours=6),
status='success'
))
session.commit()

with patch('run.chain') as mock_chain:
with patch('run.app', self.app):
with patch('run.config', {'SITEMAP_MAX_BIBCODES_PER_MESSAGE': 3}):
mock_workflow = Mock()
mock_result = Mock()
mock_result.id = 'test-boundary-workflow'
mock_workflow.apply_async.return_value = mock_result
mock_chain.return_value = mock_workflow

s_called = []
def capture_s(bibcodes, action):
s_called.append(len(bibcodes))
return Mock()

with patch.object(tasks.task_manage_sitemap, 's', side_effect=capture_s):
with patch.object(tasks.task_update_sitemap_files, 's', return_value=Mock()):
workflow_id = update_sitemaps_auto(days_back=1)

self.assertEqual(workflow_id, 'test-boundary-workflow')
self.assertEqual(len(s_called), 1)
self.assertEqual(s_called[0], 3)

def test_cleanup_invalid_sitemaps(self):
"""Test cleanup_invalid_sitemaps function"""

Expand Down
1 change: 1 addition & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
# Sitemap index generation retry configuration
# Default 100 retries = ~90 minutes. Increase for larger databases (e.g., 300 for ~5.5 hours)
SITEMAP_INDEX_MAX_RETRIES = 100
SITEMAP_MAX_BIBCODES_PER_MESSAGE = 100000


# Site configurations for multi-site sitemap generation
Expand Down
69 changes: 44 additions & 25 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,73 +558,92 @@ def cleanup_invalid_sitemaps():
def update_sitemaps_auto(days_back=1):
"""
Automatically find and process records needing sitemap updates

Finds records that need sitemap updates based on:
1. Records where bib_data_updated >= cutoff_date
2. Records where solr_processed >= cutoff_date

Uses UNION to combine both criteria, ensuring records that are either
newly ingested OR recently processed by SOLR are included in sitemaps.

Excludes records that already have update_flag=True to avoid duplicate work,
as those records are already scheduled for sitemap regeneration.


Splits large bibcode lists into batches to avoid exceeding RabbitMQ's
message size limit (default max 256 MB).

Args:
days_back: Number of days to look back for changes

Returns:
str: Workflow task ID (chains manage + file generation), or None if no updates needed
str: Workflow task ID for the final chain, or None if no updates needed
"""


logger.info('Starting automatic sitemap update (looking back %d days)', days_back)

# Calculate cutoff date
cutoff_date = get_date() - timedelta(days=days_back)
logger.info('Looking for records updated since: %s', cutoff_date.isoformat())

bibcodes_to_update = []

with app.session_scope() as session:
# Find all records that were updated recently OR had SOLR processing recently
# Exclude records that already have update_flag=True to avoid duplicate work

# Subquery to get bibcodes that already have update_flag=True
already_flagged_subquery = session.query(SitemapInfo.bibcode).filter(
SitemapInfo.update_flag.is_(True)
)

bib_data_query = session.query(Records.bibcode).filter(
Records.bib_data_updated >= cutoff_date,
~Records.bibcode.in_(already_flagged_subquery)
)

solr_processed_query = session.query(Records.bibcode).filter(
Records.solr_processed >= cutoff_date,
~Records.bibcode.in_(already_flagged_subquery)
)

# UNION automatically eliminates duplicates
combined_query = bib_data_query.union(solr_processed_query)

bibcodes_to_update = [record.bibcode for record in combined_query]

logger.info('Found %d records updated since %s (excluding already flagged)', len(bibcodes_to_update), cutoff_date.isoformat())

if not bibcodes_to_update:
logger.info('No records need sitemap updates')
return None

logger.info('Submitting %d records for sitemap processing', len(bibcodes_to_update))

# Chain tasks: manage sitemap → update files

max_bibcodes_per_message = config.get('SITEMAP_MAX_BIBCODES_PER_MESSAGE', 100000)

if len(bibcodes_to_update) <= max_bibcodes_per_message:
workflow = chain(
tasks.task_manage_sitemap.s(bibcodes_to_update, 'add'),
tasks.task_update_sitemap_files.s()
)
result = workflow.apply_async(priority=0)
logger.info('Submitted sitemap workflow: %s', result.id)
return result.id

batches = [bibcodes_to_update[i:i + max_bibcodes_per_message]
for i in range(0, len(bibcodes_to_update), max_bibcodes_per_message)]
logger.info('Splitting %d bibcodes into %d batches of up to %d',
len(bibcodes_to_update), len(batches), max_bibcodes_per_message)

manage_tasks = [tasks.task_manage_sitemap.si(batch, 'add') for batch in batches]
workflow = chain(
tasks.task_manage_sitemap.s(bibcodes_to_update, 'add'),
tasks.task_update_sitemap_files.s()
*manage_tasks,
tasks.task_update_sitemap_files.si()
)
result = workflow.apply_async(priority=0)
logger.info('Submitted sitemap workflow: %s', result.id)
logger.info('Submitted sitemap workflow (%d batches): %s', len(batches), result.id)

return result.id


Expand Down
Loading