diff --git a/celery_app.py b/celery_app.py deleted file mode 100644 index 1d232165..00000000 --- a/celery_app.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Celery application factory and configuration for Sample Platform.""" - -import os - -from celery import Celery -from celery.schedules import crontab - -# Load configuration - use empty dict for testing when config.py doesn't exist -try: - from config_parser import parse_config - config = parse_config('config') -except Exception: - # In test environment, config.py may not exist - config = {} - - -def make_celery(app=None): - """ - Create a Celery application configured for the Sample Platform. - - :param app: Optional Flask application for context binding - :return: Configured Celery application - """ - celery_app = Celery( - 'sample_platform', - broker=config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'), - backend=config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'), - include=['mod_ci.tasks'] - ) - - # Apply configuration from config.py - celery_app.conf.update( - task_serializer=config.get('CELERY_TASK_SERIALIZER', 'json'), - result_serializer=config.get('CELERY_RESULT_SERIALIZER', 'json'), - accept_content=config.get('CELERY_ACCEPT_CONTENT', ['json']), - timezone=config.get('CELERY_TIMEZONE', 'UTC'), - enable_utc=config.get('CELERY_ENABLE_UTC', True), - task_acks_late=config.get('CELERY_TASK_ACKS_LATE', True), - worker_prefetch_multiplier=config.get('CELERY_WORKER_PREFETCH_MULTIPLIER', 1), - task_reject_on_worker_lost=config.get('CELERY_TASK_REJECT_ON_WORKER_LOST', True), - task_soft_time_limit=config.get('CELERY_TASK_SOFT_TIME_LIMIT', 3600), - task_time_limit=config.get('CELERY_TASK_TIME_LIMIT', 3900), - ) - - # Beat schedule for periodic tasks - celery_app.conf.beat_schedule = { - 'check-expired-instances-every-5-minutes': { - 'task': 'mod_ci.tasks.check_expired_instances_task', - 'schedule': crontab(minute='*/5'), - 'options': {'queue': 'maintenance'} - }, - 'process-pending-tests-every-minute': { - 'task': 'mod_ci.tasks.process_pending_tests_task', - 'schedule': crontab(minute='*'), - 'options': {'queue': 'default'} - }, - } - - # Queue routing - celery_app.conf.task_routes = { - 'mod_ci.tasks.start_test_task': {'queue': 'test_execution'}, - 'mod_ci.tasks.check_expired_instances_task': {'queue': 'maintenance'}, - 'mod_ci.tasks.process_pending_tests_task': {'queue': 'default'}, - } - - # If Flask app is provided, bind tasks to its context - if app is not None: - class ContextTask(celery_app.Task): - """Task base class that maintains Flask application context.""" - - def __call__(self, *args, **kwargs): - with app.app_context(): - return self.run(*args, **kwargs) - - celery_app.Task = ContextTask - - return celery_app - - -# Create the default celery instance (used by worker when started standalone) -celery = make_celery() diff --git a/config_sample.py b/config_sample.py index 2b2fd64f..34f30a46 100755 --- a/config_sample.py +++ b/config_sample.py @@ -37,21 +37,3 @@ GCP_INSTANCE_MAX_RUNTIME = 120 # In minutes GCS_BUCKET_NAME = 'spdev' GCS_SIGNED_URL_EXPIRY_LIMIT = 720 # In minutes - - -# CELERY TASK QUEUE CONFIG -CELERY_BROKER_URL = 'redis://localhost:6379/0' -CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' -CELERY_TASK_SERIALIZER = 'json' -CELERY_RESULT_SERIALIZER = 'json' -CELERY_ACCEPT_CONTENT = ['json'] -CELERY_TIMEZONE = 'UTC' -CELERY_ENABLE_UTC = True -CELERY_TASK_ACKS_LATE = True # Task acknowledged after completion -CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # One task at a time per worker -CELERY_TASK_REJECT_ON_WORKER_LOST = True # Requeue tasks if worker dies -CELERY_TASK_SOFT_TIME_LIMIT = 3600 # 1 hour soft limit -CELERY_TASK_TIME_LIMIT = 3900 # 1 hour 5 minutes hard limit - -# Feature flag for gradual migration (set to True to enable Celery, False for cron fallback) -USE_CELERY_TASKS = False diff --git a/install/celery-beat.service b/install/celery-beat.service deleted file mode 100644 index 81fce68b..00000000 --- a/install/celery-beat.service +++ /dev/null @@ -1,23 +0,0 @@ -[Unit] -Description=Sample Platform Celery Beat Scheduler -After=network.target redis.service celery-worker.service -Requires=redis.service - -[Service] -Type=simple -User=www-data -Group=www-data -WorkingDirectory=/var/www/sample-platform -Environment="PATH=/var/www/sample-platform/venv/bin" -ExecStart=/var/www/sample-platform/venv/bin/celery \ - -A celery_app.celery beat \ - --pidfile=/var/run/celery/beat.pid \ - --logfile=/var/www/sample-platform/logs/celery/beat.log \ - --loglevel=INFO \ - --schedule=/var/www/sample-platform/celerybeat-schedule -RuntimeDirectory=celery -Restart=always -RestartSec=10 - -[Install] -WantedBy=multi-user.target diff --git a/install/celery-worker.service b/install/celery-worker.service deleted file mode 100644 index d9c1cfaa..00000000 --- a/install/celery-worker.service +++ /dev/null @@ -1,30 +0,0 @@ -[Unit] -Description=Sample Platform Celery Worker -After=network.target redis.service mysql.service -Requires=redis.service - -[Service] -Type=forking -User=www-data -Group=www-data -WorkingDirectory=/var/www/sample-platform -Environment="PATH=/var/www/sample-platform/venv/bin" -ExecStart=/var/www/sample-platform/venv/bin/celery \ - -A celery_app.celery multi start worker \ - --pidfile=/var/run/celery/%n.pid \ - --logfile=/var/www/sample-platform/logs/celery/%n%I.log \ - --loglevel=INFO \ - -Q default,test_execution,maintenance \ - --concurrency=2 -ExecStop=/var/www/sample-platform/venv/bin/celery \ - -A celery_app.celery multi stopwait worker \ - --pidfile=/var/run/celery/%n.pid -ExecReload=/var/www/sample-platform/venv/bin/celery \ - -A celery_app.celery multi restart worker \ - --pidfile=/var/run/celery/%n.pid -RuntimeDirectory=celery -Restart=always -RestartSec=10 - -[Install] -WantedBy=multi-user.target diff --git a/install/installation.md b/install/installation.md index 839bcac0..0b577cec 100644 --- a/install/installation.md +++ b/install/installation.md @@ -217,107 +217,6 @@ The file `mod_ci/cron.py` is to be run in periodic intervals. To setup a cron jo ``` Change the `/var/www/sample-plaform` directory, if you have installed the platform in a different directory. -## Optional: Setting up Celery Task Queue - -As an alternative to cron-based polling, you can use Celery with Redis for event-driven test processing. This provides faster test execution, better retry handling, and parallel processing. - -### Installing Redis - -```bash -sudo apt update -sudo apt install redis-server - -# Configure Redis -sudo nano /etc/redis/redis.conf -# Set: supervised systemd -# Set: bind 127.0.0.1 ::1 - -# Enable and start Redis -sudo systemctl enable redis-server -sudo systemctl start redis-server - -# Verify Redis is running -redis-cli ping # Should return PONG -``` - -### Configuring Celery - -Add the following to your `config.py`: - -```python -# Celery Configuration -CELERY_BROKER_URL = 'redis://localhost:6379/0' -CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' -USE_CELERY_TASKS = True # Set to False to use cron instead -``` - -### Installing Celery Services - -```bash -# Create log directory -sudo mkdir -p /var/www/sample-platform/logs/celery -sudo chown -R www-data:www-data /var/www/sample-platform/logs/celery - -# Create runtime directory -sudo mkdir -p /var/run/celery -sudo chown www-data:www-data /var/run/celery - -# Install systemd services -sudo cp /var/www/sample-platform/install/celery-worker.service /etc/systemd/system/ -sudo cp /var/www/sample-platform/install/celery-beat.service /etc/systemd/system/ - -# Reload systemd and enable services -sudo systemctl daemon-reload -sudo systemctl enable celery-worker celery-beat - -# Start the services -sudo systemctl start celery-worker -sudo systemctl start celery-beat -``` - -### Monitoring Celery - -```bash -# Check worker status -celery -A celery_app.celery inspect active - -# Check queue depth -redis-cli LLEN celery - -# View logs -tail -f /var/www/sample-platform/logs/celery/*.log - -# Optional: Install Flower for web-based monitoring -pip install flower -celery -A celery_app.celery flower --port=5555 -``` - -### Gradual Migration - -For a safe transition from cron to Celery: - -1. **Stage 1**: Set `USE_CELERY_TASKS = False` and keep cron running. Start Celery services and verify they work correctly in logs. - -2. **Stage 2**: Set `USE_CELERY_TASKS = True`. Reduce cron frequency to every 30 minutes as a fallback. - -3. **Stage 3**: Disable cron entirely once you're confident Celery is working correctly. - -### Rollback to Cron - -If you need to disable Celery: - -```bash -# Stop Celery services -sudo systemctl stop celery-beat celery-worker - -# Edit config.py and set USE_CELERY_TASKS = False - -# Restart platform -sudo systemctl restart platform - -# Ensure cron is running every 10 minutes -``` - ## GCS configuration to serve file downloads using Signed URLs To serve file downloads directly from the private GCS bucket, Signed download URLs have been used. diff --git a/mod_ci/controllers.py b/mod_ci/controllers.py index 1448cc36..2b5b1169 100755 --- a/mod_ci/controllers.py +++ b/mod_ci/controllers.py @@ -1124,7 +1124,7 @@ def save_xml_to_file(xml_node, folder_name, file_name) -> None: ) -def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list: +def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None: """ Add test details entry into Test model for each platform. @@ -1140,8 +1140,8 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list: :type branch: str :param pr_nr: Pull Request number, if applicable. :type pr_nr: int - :return: List of created test IDs - :rtype: list + :return: Nothing + :rtype: None """ from run import log @@ -1149,7 +1149,7 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list: # Based on issue identified by NexionisJake in PR #937 if not is_valid_commit_hash(commit): log.error(f"Invalid commit hash '{commit}' - skipping test entry creation") - return [] + return fork_url = f"%/{g.github['repository_owner']}/{g.github['repository']}.git" fork = Fork.query.filter(Fork.github.like(fork_url)).first() @@ -1158,59 +1158,12 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list: log.debug('pull request test type detected') branch = "pull_request" - test_ids = [] linux_test = Test(TestPlatform.linux, test_type, fork.id, branch, commit, pr_nr) db.add(linux_test) - db.flush() # Get ID before commit - test_ids.append(linux_test.id) - windows_test = Test(TestPlatform.windows, test_type, fork.id, branch, commit, pr_nr) db.add(windows_test) - db.flush() # Get ID before commit - test_ids.append(windows_test.id) - if not safe_db_commit(db, f"adding test entries for commit {commit[:7]}"): log.error(f"Failed to add test entries for commit {commit}") - return [] - - return test_ids - - -def trigger_test_tasks(test_ids: list, bot_token: str) -> None: - """ - Optionally trigger Celery tasks for newly created tests. - - Only triggers if USE_CELERY_TASKS is True in config. - Falls back to waiting for cron/periodic task otherwise. - - :param test_ids: List of Test IDs to queue - :type test_ids: list - :param bot_token: GitHub bot token - :type bot_token: str - """ - from run import config, log - - if not config.get('USE_CELERY_TASKS', False): - log.debug("Celery tasks disabled, tests will be picked up by cron/periodic task") - return - - if not test_ids: - return - - try: - from mod_ci.tasks import start_test_task - - for test_id in test_ids: - start_test_task.apply_async( - args=[test_id, bot_token], - queue='test_execution', - countdown=30 # 30 second delay for artifact upload to complete - ) - log.info(f"Queued test {test_id} via Celery") - except ImportError: - log.warning("Celery tasks module not available, falling back to cron") - except Exception as e: - log.error(f"Failed to queue Celery tasks: {e}, tests will be picked up by cron") def schedule_test(gh_commit: Commit.Commit) -> None: @@ -1352,14 +1305,6 @@ def queue_test(gh_commit: Commit.Commit, commit, test_type, platform, branch="ma add_customized_regression_tests(platform_test.id) if gh_commit is not None: - # Check if test already has progress (started or completed) - # If so, don't overwrite the GitHub status with "Tests queued" - # This prevents the bug where a completed test gets its status overwritten - # when a later webhook triggers queue_test for the same commit - if len(platform_test.progress) > 0: - log.info(f"Test {platform_test.id} already has progress, not posting 'Tests queued' status") - return - target_url = url_for('test.by_id', test_id=platform_test.id, _external=True) status_context = f"CI - {platform_test.platform.value}" update_status_on_github(gh_commit, Status.PENDING, "Tests queued", status_context, target_url) @@ -1484,8 +1429,7 @@ def start_ci(): last_commit.value = ref.object.sha if not safe_db_commit(g.db, "updating last commit"): return 'ERROR' - test_ids = add_test_entry(g.db, commit_hash, TestType.commit) - trigger_test_tasks(test_ids, g.github['bot_token']) + add_test_entry(g.db, commit_hash, TestType.commit) else: g.log.warning('Unknown push type! Dumping payload for analysis') g.log.warning(payload) @@ -1515,8 +1459,7 @@ def start_ci(): try: pr = retry_with_backoff(lambda: repository.get_pull(number=pr_nr)) if pr.mergeable is not False: - test_ids = add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr) - trigger_test_tasks(test_ids, g.github['bot_token']) + add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr) except GithubException as e: g.log.error(f"Failed to get PR {pr_nr} after retries: {e}") diff --git a/mod_ci/tasks.py b/mod_ci/tasks.py deleted file mode 100644 index 2b9a079c..00000000 --- a/mod_ci/tasks.py +++ /dev/null @@ -1,236 +0,0 @@ -"""Celery tasks for CI platform operations.""" - -from celery.exceptions import SoftTimeLimitExceeded -from celery.utils.log import get_task_logger -from github import Auth, Github, GithubException - -from celery_app import celery - -logger = get_task_logger(__name__) - - -@celery.task( - bind=True, - max_retries=3, - default_retry_delay=60, - autoretry_for=(GithubException,), - retry_backoff=True, - retry_backoff_max=300, - acks_late=True -) -def start_test_task(self, test_id: int, bot_token: str): - """ - Execute a single test by creating a GCP VM instance. - - This task wraps the existing start_test() function with Celery's - retry mechanisms and proper error handling. - - :param test_id: The ID of the Test to execute - :param bot_token: GitHub bot token for artifact download - :return: Dict with status and message - """ - # Import inside task to avoid circular imports and ensure fresh Flask context - from database import create_session - from mod_ci.controllers import (get_compute_service_object, - mark_test_failed, start_test) - from mod_ci.models import GcpInstance - from mod_test.models import Test - from run import app, config - - with app.app_context(): - db = create_session(config['DATABASE_URI']) - - try: - # Fetch the test - test = Test.query.get(test_id) - if test is None: - logger.error(f"Test {test_id} not found") - return {'status': 'error', 'message': 'Test not found'} - - # Check if test is already finished - if test.finished: - logger.info(f"Test {test_id} already finished, skipping") - return {'status': 'skipped', 'message': 'Test already finished'} - - # Check if test already has a GCP instance (prevent duplicates) - existing_instance = GcpInstance.query.filter( - GcpInstance.test_id == test_id - ).first() - if existing_instance is not None: - logger.info(f"Test {test_id} already has GCP instance, skipping") - return {'status': 'skipped', 'message': 'Test already has instance'} - - # Get GitHub repository - gh = Github(auth=Auth.Token(bot_token)) - repository = gh.get_repo( - f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" - ) - - # Execute the test - compute = get_compute_service_object() - start_test(compute, app, db, repository, test, bot_token) - - logger.info(f"Test {test_id} started successfully") - return {'status': 'success', 'test_id': test_id} - - except SoftTimeLimitExceeded: - logger.error(f"Test {test_id} exceeded time limit") - try: - test = Test.query.get(test_id) - if test and not test.finished: - gh = Github(auth=Auth.Token(bot_token)) - repository = gh.get_repo( - f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" - ) - mark_test_failed(db, test, repository, "Task timed out") - except Exception as mark_error: - logger.error(f"Failed to mark test {test_id} as failed: {mark_error}") - raise - - except Exception as e: - logger.exception(f"Error starting test {test_id}: {e}") - # Retry on transient failures - if self.request.retries < self.max_retries: - raise self.retry(exc=e) - # Final failure - mark test as failed - try: - test = Test.query.get(test_id) - if test and not test.finished: - gh = Github(auth=Auth.Token(bot_token)) - repository = gh.get_repo( - f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" - ) - mark_test_failed(db, test, repository, f"Task failed: {str(e)[:100]}") - except Exception as mark_error: - logger.error(f"Failed to mark test {test_id} as failed: {mark_error}") - raise - - finally: - db.remove() - - -@celery.task(bind=True, acks_late=True) -def check_expired_instances_task(self): - """ - Periodic task to clean up expired GCP instances. - - This wraps delete_expired_instances() for Celery scheduling. - - :return: Dict with status and message - """ - from github import Auth, Github - - from database import create_session - from mod_ci.controllers import (delete_expired_instances, - get_compute_service_object) - from run import app, config - - with app.app_context(): - db = create_session(config['DATABASE_URI']) - - try: - vm_max_runtime = config.get('GCP_INSTANCE_MAX_RUNTIME', 120) - zone = config.get('ZONE', '') - project = config.get('PROJECT_NAME', '') - - if not zone or not project: - logger.error('GCP zone or project not configured') - return {'status': 'error', 'message': 'GCP not configured'} - - # Get GitHub repository - github_token = config.get('GITHUB_TOKEN', '') - if not github_token: - logger.error('GitHub token not configured') - return {'status': 'error', 'message': 'GitHub token missing'} - - gh = Github(auth=Auth.Token(github_token)) - repository = gh.get_repo( - f"{config['GITHUB_OWNER']}/{config['GITHUB_REPOSITORY']}" - ) - - compute = get_compute_service_object() - delete_expired_instances( - compute, vm_max_runtime, project, zone, db, repository - ) - - logger.info('Expired instances check completed') - return {'status': 'success'} - - except Exception as e: - logger.exception(f"Error checking expired instances: {e}") - return {'status': 'error', 'message': str(e)} - - finally: - db.remove() - - -@celery.task(bind=True, acks_late=True) -def process_pending_tests_task(self): - """ - Periodic task to find and queue pending tests for execution. - - This replaces the cron-based approach by finding pending tests - and dispatching individual start_test_task for each. - - :return: Dict with status and count of queued tests - """ - from database import create_session - from mod_ci.models import GcpInstance, MaintenanceMode - from mod_test.models import Test, TestPlatform, TestProgress, TestStatus - from run import app, config - - with app.app_context(): - db = create_session(config['DATABASE_URI']) - - try: - github_token = config.get('GITHUB_TOKEN', '') - if not github_token: - logger.error('GitHub token not configured') - return {'status': 'error', 'message': 'GitHub token missing'} - - bot_token = github_token - queued_count = 0 - - # Find pending tests for each platform - for platform in [TestPlatform.linux, TestPlatform.windows]: - # Check maintenance mode - maintenance_mode = MaintenanceMode.query.filter( - MaintenanceMode.platform == platform - ).first() - if maintenance_mode is not None and maintenance_mode.disabled: - logger.debug(f'[{platform.value}] In maintenance mode, skipping') - continue - - # Get tests with progress (finished or in progress) - finished_tests = db.query(TestProgress.test_id).filter( - TestProgress.status.in_([TestStatus.canceled, TestStatus.completed]) - ) - - # Get tests with GCP instances (currently running) - running_tests = db.query(GcpInstance.test_id) - - # Find pending tests (limit to 5 per platform per run) - pending_tests = Test.query.filter( - Test.id.notin_(finished_tests), - Test.id.notin_(running_tests), - Test.platform == platform - ).order_by(Test.id.asc()).limit(5).all() - - for test in pending_tests: - # Queue each test as a separate task - start_test_task.apply_async( - args=[test.id, bot_token], - queue='test_execution', - countdown=1 # Small delay between tasks - ) - queued_count += 1 - logger.info(f'Queued test {test.id} for {platform.value}') - - return {'status': 'success', 'queued_count': queued_count} - - except Exception as e: - logger.exception(f"Error processing pending tests: {e}") - return {'status': 'error', 'message': str(e)} - - finally: - db.remove() diff --git a/requirements.txt b/requirements.txt index 10112768..db49fccb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,5 +26,3 @@ cffi==2.0.0 PyGithub==2.8.1 blinker==1.9.0 click==8.1.7 -celery[redis]==5.3.6 -redis==5.0.1 diff --git a/tests/test_ci/test_tasks.py b/tests/test_ci/test_tasks.py deleted file mode 100644 index 7190e227..00000000 --- a/tests/test_ci/test_tasks.py +++ /dev/null @@ -1,128 +0,0 @@ -"""Unit tests for Celery tasks and related controller functions.""" - -import unittest -from unittest import mock -from unittest.mock import MagicMock, patch - -from flask import g - -from mod_test.models import TestType -from tests.base import BaseTestCase - - -class TestTriggerTestTasks(BaseTestCase): - """Test cases for trigger_test_tasks function.""" - - def test_trigger_test_tasks_disabled_by_default(self): - """Test that tasks are not triggered when USE_CELERY_TASKS is False (default).""" - from mod_ci.controllers import trigger_test_tasks - - # By default, USE_CELERY_TASKS is False - with self.app.app_context(): - # Should not raise exception and should log debug message - trigger_test_tasks([1, 2], 'fake_token') - # Function returns silently when disabled - - def test_trigger_test_tasks_empty_list(self): - """Test that empty list doesn't trigger any tasks.""" - from mod_ci.controllers import trigger_test_tasks - - with patch.dict(self.app.config, {'USE_CELERY_TASKS': True}): - with self.app.app_context(): - # Should return early without error - trigger_test_tasks([], 'fake_token') - - def test_trigger_test_tasks_handles_import_error(self): - """Test graceful handling when Celery module is not available.""" - from mod_ci.controllers import trigger_test_tasks - - with patch.dict(self.app.config, {'USE_CELERY_TASKS': True}): - # Simulate import error by patching the import - with patch.dict('sys.modules', {'mod_ci.tasks': None}): - with self.app.app_context(): - # Should not raise exception - trigger_test_tasks([1, 2], 'fake_token') - - -class TestAddTestEntryReturnsIds(BaseTestCase): - """Test that add_test_entry returns test IDs correctly.""" - - @mock.patch('mod_ci.controllers.g') - @mock.patch('mod_ci.controllers.safe_db_commit') - @mock.patch('mod_ci.controllers.Fork') - @mock.patch('mod_ci.controllers.Test') - def test_add_test_entry_returns_test_ids(self, mock_test, mock_fork, mock_commit, mock_g): - """Test that add_test_entry returns list of created test IDs.""" - from mod_ci.controllers import add_test_entry - - # Setup mocks - mock_g.github = {'repository_owner': 'test', 'repository': 'test'} - mock_fork_obj = MagicMock() - mock_fork_obj.id = 1 - mock_fork.query.filter.return_value.first.return_value = mock_fork_obj - mock_commit.return_value = True - - # Setup mock Test objects to return IDs - mock_linux_test = MagicMock() - mock_linux_test.id = 100 - mock_windows_test = MagicMock() - mock_windows_test.id = 101 - mock_test.side_effect = [mock_linux_test, mock_windows_test] - - mock_db = MagicMock() - - # Call the function with a valid commit hash (40 hex chars) - test_ids = add_test_entry(mock_db, 'a' * 40, TestType.commit) - - # Verify we got a list with 2 IDs - self.assertIsInstance(test_ids, list) - self.assertEqual(len(test_ids), 2) - self.assertEqual(test_ids[0], 100) - self.assertEqual(test_ids[1], 101) - - def test_add_test_entry_invalid_commit_returns_empty(self): - """Test that invalid commit hash returns empty list.""" - from mod_ci.controllers import add_test_entry - - mock_db = MagicMock() - - # Call with invalid commit hash - test_ids = add_test_entry(mock_db, 'invalid_hash', TestType.commit) - - # Verify empty list for invalid commit - self.assertIsInstance(test_ids, list) - self.assertEqual(len(test_ids), 0) - - @mock.patch('mod_ci.controllers.g') - @mock.patch('mod_ci.controllers.safe_db_commit') - @mock.patch('mod_ci.controllers.Fork') - @mock.patch('mod_ci.controllers.Test') - def test_add_test_entry_db_failure_returns_empty(self, mock_test, mock_fork, mock_commit, mock_g): - """Test that db commit failure returns empty list.""" - from mod_ci.controllers import add_test_entry - - # Setup mocks - mock_g.github = {'repository_owner': 'test', 'repository': 'test'} - mock_fork_obj = MagicMock() - mock_fork_obj.id = 1 - mock_fork.query.filter.return_value.first.return_value = mock_fork_obj - mock_commit.return_value = False # Simulate commit failure - - mock_linux_test = MagicMock() - mock_linux_test.id = 100 - mock_windows_test = MagicMock() - mock_windows_test.id = 101 - mock_test.side_effect = [mock_linux_test, mock_windows_test] - - mock_db = MagicMock() - - # Call the function - test_ids = add_test_entry(mock_db, 'a' * 40, TestType.commit) - - # Verify empty list when commit fails - self.assertIsInstance(test_ids, list) - self.assertEqual(len(test_ids), 0) - - -if __name__ == '__main__': - unittest.main()