Skip to content

Commit f05dda1

Browse files
authored
fix: Prevent queue_test from overwriting completed test status
2 parents 6b211b8 + d7e3045 commit f05dda1

File tree

9 files changed

+682
-6
lines changed

9 files changed

+682
-6
lines changed

celery_app.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""Celery application factory and configuration for Sample Platform."""
2+
3+
import os
4+
5+
from celery import Celery
6+
from celery.schedules import crontab
7+
8+
# Load configuration - use empty dict for testing when config.py doesn't exist
9+
try:
10+
from config_parser import parse_config
11+
config = parse_config('config')
12+
except Exception:
13+
# In test environment, config.py may not exist
14+
config = {}
15+
16+
17+
def make_celery(app=None):
18+
"""
19+
Create a Celery application configured for the Sample Platform.
20+
21+
:param app: Optional Flask application for context binding
22+
:return: Configured Celery application
23+
"""
24+
celery_app = Celery(
25+
'sample_platform',
26+
broker=config.get('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
27+
backend=config.get('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
28+
include=['mod_ci.tasks']
29+
)
30+
31+
# Apply configuration from config.py
32+
celery_app.conf.update(
33+
task_serializer=config.get('CELERY_TASK_SERIALIZER', 'json'),
34+
result_serializer=config.get('CELERY_RESULT_SERIALIZER', 'json'),
35+
accept_content=config.get('CELERY_ACCEPT_CONTENT', ['json']),
36+
timezone=config.get('CELERY_TIMEZONE', 'UTC'),
37+
enable_utc=config.get('CELERY_ENABLE_UTC', True),
38+
task_acks_late=config.get('CELERY_TASK_ACKS_LATE', True),
39+
worker_prefetch_multiplier=config.get('CELERY_WORKER_PREFETCH_MULTIPLIER', 1),
40+
task_reject_on_worker_lost=config.get('CELERY_TASK_REJECT_ON_WORKER_LOST', True),
41+
task_soft_time_limit=config.get('CELERY_TASK_SOFT_TIME_LIMIT', 3600),
42+
task_time_limit=config.get('CELERY_TASK_TIME_LIMIT', 3900),
43+
)
44+
45+
# Beat schedule for periodic tasks
46+
celery_app.conf.beat_schedule = {
47+
'check-expired-instances-every-5-minutes': {
48+
'task': 'mod_ci.tasks.check_expired_instances_task',
49+
'schedule': crontab(minute='*/5'),
50+
'options': {'queue': 'maintenance'}
51+
},
52+
'process-pending-tests-every-minute': {
53+
'task': 'mod_ci.tasks.process_pending_tests_task',
54+
'schedule': crontab(minute='*'),
55+
'options': {'queue': 'default'}
56+
},
57+
}
58+
59+
# Queue routing
60+
celery_app.conf.task_routes = {
61+
'mod_ci.tasks.start_test_task': {'queue': 'test_execution'},
62+
'mod_ci.tasks.check_expired_instances_task': {'queue': 'maintenance'},
63+
'mod_ci.tasks.process_pending_tests_task': {'queue': 'default'},
64+
}
65+
66+
# If Flask app is provided, bind tasks to its context
67+
if app is not None:
68+
class ContextTask(celery_app.Task):
69+
"""Task base class that maintains Flask application context."""
70+
71+
def __call__(self, *args, **kwargs):
72+
with app.app_context():
73+
return self.run(*args, **kwargs)
74+
75+
celery_app.Task = ContextTask
76+
77+
return celery_app
78+
79+
80+
# Create the default celery instance (used by worker when started standalone)
81+
celery = make_celery()

config_sample.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,21 @@
3737
GCP_INSTANCE_MAX_RUNTIME = 120 # In minutes
3838
GCS_BUCKET_NAME = 'spdev'
3939
GCS_SIGNED_URL_EXPIRY_LIMIT = 720 # In minutes
40+
41+
42+
# CELERY TASK QUEUE CONFIG
43+
CELERY_BROKER_URL = 'redis://localhost:6379/0'
44+
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
45+
CELERY_TASK_SERIALIZER = 'json'
46+
CELERY_RESULT_SERIALIZER = 'json'
47+
CELERY_ACCEPT_CONTENT = ['json']
48+
CELERY_TIMEZONE = 'UTC'
49+
CELERY_ENABLE_UTC = True
50+
CELERY_TASK_ACKS_LATE = True # Task acknowledged after completion
51+
CELERY_WORKER_PREFETCH_MULTIPLIER = 1 # One task at a time per worker
52+
CELERY_TASK_REJECT_ON_WORKER_LOST = True # Requeue tasks if worker dies
53+
CELERY_TASK_SOFT_TIME_LIMIT = 3600 # 1 hour soft limit
54+
CELERY_TASK_TIME_LIMIT = 3900 # 1 hour 5 minutes hard limit
55+
56+
# Feature flag for gradual migration (set to True to enable Celery, False for cron fallback)
57+
USE_CELERY_TASKS = False

install/celery-beat.service

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[Unit]
2+
Description=Sample Platform Celery Beat Scheduler
3+
After=network.target redis.service celery-worker.service
4+
Requires=redis.service
5+
6+
[Service]
7+
Type=simple
8+
User=www-data
9+
Group=www-data
10+
WorkingDirectory=/var/www/sample-platform
11+
Environment="PATH=/var/www/sample-platform/venv/bin"
12+
ExecStart=/var/www/sample-platform/venv/bin/celery \
13+
-A celery_app.celery beat \
14+
--pidfile=/var/run/celery/beat.pid \
15+
--logfile=/var/www/sample-platform/logs/celery/beat.log \
16+
--loglevel=INFO \
17+
--schedule=/var/www/sample-platform/celerybeat-schedule
18+
RuntimeDirectory=celery
19+
Restart=always
20+
RestartSec=10
21+
22+
[Install]
23+
WantedBy=multi-user.target

install/celery-worker.service

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
[Unit]
2+
Description=Sample Platform Celery Worker
3+
After=network.target redis.service mysql.service
4+
Requires=redis.service
5+
6+
[Service]
7+
Type=forking
8+
User=www-data
9+
Group=www-data
10+
WorkingDirectory=/var/www/sample-platform
11+
Environment="PATH=/var/www/sample-platform/venv/bin"
12+
ExecStart=/var/www/sample-platform/venv/bin/celery \
13+
-A celery_app.celery multi start worker \
14+
--pidfile=/var/run/celery/%n.pid \
15+
--logfile=/var/www/sample-platform/logs/celery/%n%I.log \
16+
--loglevel=INFO \
17+
-Q default,test_execution,maintenance \
18+
--concurrency=2
19+
ExecStop=/var/www/sample-platform/venv/bin/celery \
20+
-A celery_app.celery multi stopwait worker \
21+
--pidfile=/var/run/celery/%n.pid
22+
ExecReload=/var/www/sample-platform/venv/bin/celery \
23+
-A celery_app.celery multi restart worker \
24+
--pidfile=/var/run/celery/%n.pid
25+
RuntimeDirectory=celery
26+
Restart=always
27+
RestartSec=10
28+
29+
[Install]
30+
WantedBy=multi-user.target

install/installation.md

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,107 @@ The file `mod_ci/cron.py` is to be run in periodic intervals. To setup a cron jo
217217
```
218218
Change the `/var/www/sample-plaform` directory, if you have installed the platform in a different directory.
219219
220+
## Optional: Setting up Celery Task Queue
221+
222+
As an alternative to cron-based polling, you can use Celery with Redis for event-driven test processing. This provides faster test execution, better retry handling, and parallel processing.
223+
224+
### Installing Redis
225+
226+
```bash
227+
sudo apt update
228+
sudo apt install redis-server
229+
230+
# Configure Redis
231+
sudo nano /etc/redis/redis.conf
232+
# Set: supervised systemd
233+
# Set: bind 127.0.0.1 ::1
234+
235+
# Enable and start Redis
236+
sudo systemctl enable redis-server
237+
sudo systemctl start redis-server
238+
239+
# Verify Redis is running
240+
redis-cli ping # Should return PONG
241+
```
242+
243+
### Configuring Celery
244+
245+
Add the following to your `config.py`:
246+
247+
```python
248+
# Celery Configuration
249+
CELERY_BROKER_URL = 'redis://localhost:6379/0'
250+
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
251+
USE_CELERY_TASKS = True # Set to False to use cron instead
252+
```
253+
254+
### Installing Celery Services
255+
256+
```bash
257+
# Create log directory
258+
sudo mkdir -p /var/www/sample-platform/logs/celery
259+
sudo chown -R www-data:www-data /var/www/sample-platform/logs/celery
260+
261+
# Create runtime directory
262+
sudo mkdir -p /var/run/celery
263+
sudo chown www-data:www-data /var/run/celery
264+
265+
# Install systemd services
266+
sudo cp /var/www/sample-platform/install/celery-worker.service /etc/systemd/system/
267+
sudo cp /var/www/sample-platform/install/celery-beat.service /etc/systemd/system/
268+
269+
# Reload systemd and enable services
270+
sudo systemctl daemon-reload
271+
sudo systemctl enable celery-worker celery-beat
272+
273+
# Start the services
274+
sudo systemctl start celery-worker
275+
sudo systemctl start celery-beat
276+
```
277+
278+
### Monitoring Celery
279+
280+
```bash
281+
# Check worker status
282+
celery -A celery_app.celery inspect active
283+
284+
# Check queue depth
285+
redis-cli LLEN celery
286+
287+
# View logs
288+
tail -f /var/www/sample-platform/logs/celery/*.log
289+
290+
# Optional: Install Flower for web-based monitoring
291+
pip install flower
292+
celery -A celery_app.celery flower --port=5555
293+
```
294+
295+
### Gradual Migration
296+
297+
For a safe transition from cron to Celery:
298+
299+
1. **Stage 1**: Set `USE_CELERY_TASKS = False` and keep cron running. Start Celery services and verify they work correctly in logs.
300+
301+
2. **Stage 2**: Set `USE_CELERY_TASKS = True`. Reduce cron frequency to every 30 minutes as a fallback.
302+
303+
3. **Stage 3**: Disable cron entirely once you're confident Celery is working correctly.
304+
305+
### Rollback to Cron
306+
307+
If you need to disable Celery:
308+
309+
```bash
310+
# Stop Celery services
311+
sudo systemctl stop celery-beat celery-worker
312+
313+
# Edit config.py and set USE_CELERY_TASKS = False
314+
315+
# Restart platform
316+
sudo systemctl restart platform
317+
318+
# Ensure cron is running every 10 minutes
319+
```
320+
220321
## GCS configuration to serve file downloads using Signed URLs
221322

222323
To serve file downloads directly from the private GCS bucket, Signed download URLs have been used.

mod_ci/controllers.py

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1124,7 +1124,7 @@ def save_xml_to_file(xml_node, folder_name, file_name) -> None:
11241124
)
11251125

11261126

1127-
def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None:
1127+
def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> list:
11281128
"""
11291129
Add test details entry into Test model for each platform.
11301130
@@ -1140,16 +1140,16 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None:
11401140
:type branch: str
11411141
:param pr_nr: Pull Request number, if applicable.
11421142
:type pr_nr: int
1143-
:return: Nothing
1144-
:rtype: None
1143+
:return: List of created test IDs
1144+
:rtype: list
11451145
"""
11461146
from run import log
11471147

11481148
# Validate commit hash before creating test entries
11491149
# Based on issue identified by NexionisJake in PR #937
11501150
if not is_valid_commit_hash(commit):
11511151
log.error(f"Invalid commit hash '{commit}' - skipping test entry creation")
1152-
return
1152+
return []
11531153

11541154
fork_url = f"%/{g.github['repository_owner']}/{g.github['repository']}.git"
11551155
fork = Fork.query.filter(Fork.github.like(fork_url)).first()
@@ -1158,12 +1158,59 @@ def add_test_entry(db, commit, test_type, branch="master", pr_nr=0) -> None:
11581158
log.debug('pull request test type detected')
11591159
branch = "pull_request"
11601160

1161+
test_ids = []
11611162
linux_test = Test(TestPlatform.linux, test_type, fork.id, branch, commit, pr_nr)
11621163
db.add(linux_test)
1164+
db.flush() # Get ID before commit
1165+
test_ids.append(linux_test.id)
1166+
11631167
windows_test = Test(TestPlatform.windows, test_type, fork.id, branch, commit, pr_nr)
11641168
db.add(windows_test)
1169+
db.flush() # Get ID before commit
1170+
test_ids.append(windows_test.id)
1171+
11651172
if not safe_db_commit(db, f"adding test entries for commit {commit[:7]}"):
11661173
log.error(f"Failed to add test entries for commit {commit}")
1174+
return []
1175+
1176+
return test_ids
1177+
1178+
1179+
def trigger_test_tasks(test_ids: list, bot_token: str) -> None:
1180+
"""
1181+
Optionally trigger Celery tasks for newly created tests.
1182+
1183+
Only triggers if USE_CELERY_TASKS is True in config.
1184+
Falls back to waiting for cron/periodic task otherwise.
1185+
1186+
:param test_ids: List of Test IDs to queue
1187+
:type test_ids: list
1188+
:param bot_token: GitHub bot token
1189+
:type bot_token: str
1190+
"""
1191+
from run import config, log
1192+
1193+
if not config.get('USE_CELERY_TASKS', False):
1194+
log.debug("Celery tasks disabled, tests will be picked up by cron/periodic task")
1195+
return
1196+
1197+
if not test_ids:
1198+
return
1199+
1200+
try:
1201+
from mod_ci.tasks import start_test_task
1202+
1203+
for test_id in test_ids:
1204+
start_test_task.apply_async(
1205+
args=[test_id, bot_token],
1206+
queue='test_execution',
1207+
countdown=30 # 30 second delay for artifact upload to complete
1208+
)
1209+
log.info(f"Queued test {test_id} via Celery")
1210+
except ImportError:
1211+
log.warning("Celery tasks module not available, falling back to cron")
1212+
except Exception as e:
1213+
log.error(f"Failed to queue Celery tasks: {e}, tests will be picked up by cron")
11671214

11681215

11691216
def schedule_test(gh_commit: Commit.Commit) -> None:
@@ -1305,6 +1352,14 @@ def queue_test(gh_commit: Commit.Commit, commit, test_type, platform, branch="ma
13051352
add_customized_regression_tests(platform_test.id)
13061353

13071354
if gh_commit is not None:
1355+
# Check if test already has progress (started or completed)
1356+
# If so, don't overwrite the GitHub status with "Tests queued"
1357+
# This prevents the bug where a completed test gets its status overwritten
1358+
# when a later webhook triggers queue_test for the same commit
1359+
if len(platform_test.progress) > 0:
1360+
log.info(f"Test {platform_test.id} already has progress, not posting 'Tests queued' status")
1361+
return
1362+
13081363
target_url = url_for('test.by_id', test_id=platform_test.id, _external=True)
13091364
status_context = f"CI - {platform_test.platform.value}"
13101365
update_status_on_github(gh_commit, Status.PENDING, "Tests queued", status_context, target_url)
@@ -1429,7 +1484,8 @@ def start_ci():
14291484
last_commit.value = ref.object.sha
14301485
if not safe_db_commit(g.db, "updating last commit"):
14311486
return 'ERROR'
1432-
add_test_entry(g.db, commit_hash, TestType.commit)
1487+
test_ids = add_test_entry(g.db, commit_hash, TestType.commit)
1488+
trigger_test_tasks(test_ids, g.github['bot_token'])
14331489
else:
14341490
g.log.warning('Unknown push type! Dumping payload for analysis')
14351491
g.log.warning(payload)
@@ -1459,7 +1515,8 @@ def start_ci():
14591515
try:
14601516
pr = retry_with_backoff(lambda: repository.get_pull(number=pr_nr))
14611517
if pr.mergeable is not False:
1462-
add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr)
1518+
test_ids = add_test_entry(g.db, commit_hash, TestType.pull_request, pr_nr=pr_nr)
1519+
trigger_test_tasks(test_ids, g.github['bot_token'])
14631520
except GithubException as e:
14641521
g.log.error(f"Failed to get PR {pr_nr} after retries: {e}")
14651522

0 commit comments

Comments
 (0)