Skip to content

Commit 4e1bb07

Browse files
committed
Implementation of supervisor service infrastructure
1 parent 80b37d8 commit 4e1bb07

File tree

10 files changed

+1873
-5
lines changed

10 files changed

+1873
-5
lines changed

CONTRIBUTING.md

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,22 @@ hatch run hatch-test.py3.11:setup-profile
1515
hatch run hatch-test.py3.11:unit-tests
1616
```
1717

18+
To run the integration tests we need to start the Airflow services before.
19+
20+
```bash
21+
# 1. Start PostgreSQL
22+
hatch run hatch-test.py3.11:start-psql-service
23+
24+
# 2. Create AiiDA profile and databases
25+
hatch run hatch-test.py3.11:setup-profile
26+
27+
# 3. Start Airflow services
28+
hatch run hatch-test.py3.11:daemon-start
29+
30+
# 4. Run integration tests
31+
hatch run hatch-test.py3.11:integration-tests
32+
```
33+
1834
## Unit tests
1935

2036
### Setup test environment
@@ -84,6 +100,22 @@ postgres (admin)
84100
The profile will be located at `.pytest/.aiida/test/` (or `$AIIDA_PATH/.aiida/test/` if `AIIDA_PATH` is set) with the following structure:
85101
```
86102
.pytest/.aiida/test/
103+
├── daemon/ # Daemon process management
104+
│ ├── services/ # Per-service directories
105+
│ │ ├── scheduler/
106+
│ │ │ ├── state.json # PID, state, timestamps
107+
│ │ │ ├── stdout.log # Service output
108+
│ │ │ └── stderr.log # Service errors
109+
│ │ ├── triggerer/
110+
│ │ │ ├── state.json
111+
│ │ │ ├── stdout.log
112+
│ │ │ └── stderr.log
113+
│ │ ├── dag-processor/
114+
│ │ │ └── ...
115+
│ │ └── api-server/
116+
│ │ └── ...
117+
│ ├── daemon.pid # Daemon PID file
118+
│ └── daemon.log # Daemon output (background mode)
87119
└── airflow/ # Airflow files
88120
├── dags/ # DAG files
89121
└── airflow.cfg # Airflow configuration
@@ -113,6 +145,66 @@ hatch test -- -m 'not integration'
113145

114146
Integration tests require running Airflow services (scheduler, triggerer, dag-processor). These tests are marked with `@pytest.mark.integration`.
115147

148+
149+
### Start Airflow in background with daemon
150+
151+
Use the daemon manager to start all services at once:
152+
153+
```bash
154+
# Start daemon in background (default)
155+
hatch run hatch-test.py3.11:daemon-start
156+
```
157+
158+
The daemon will:
159+
- Check if daemon is already running
160+
- Start all Airflow services (scheduler, triggerer, dag-processor, api-server)
161+
- Run a health monitor thread that tracks service status every 5 seconds
162+
- Detach and run in background
163+
164+
**Background mode** (default):
165+
- Daemon detaches and runs in background
166+
- Services continue running after terminal closes
167+
- Use `daemon-stop` to stop all services
168+
- Use `daemon-status` to check service status
169+
170+
**Foreground mode** (for debugging):
171+
```bash
172+
# Run daemon in foreground with --foreground flag
173+
python scripts/cmd_daemon_start.py --profile-name test --foreground
174+
```
175+
- Keeps daemon running in your terminal
176+
- Press Ctrl+C to gracefully stop all services
177+
- Useful for interactive testing and debugging
178+
179+
Check service status in another terminal:
180+
181+
```bash
182+
hatch run hatch-test.py3.11:daemon-status
183+
```
184+
185+
Expected output:
186+
```
187+
=== Airflow Test Services Status ===
188+
189+
Health monitor daemon: RUNNING (PID: 12345)
190+
191+
Service Status:
192+
--------------------------------------------------------------------------------
193+
Service State PID Uptime Last Check Failures
194+
--------------------------------------------------------------------------------
195+
scheduler ✓ RUNNING 12346 2m 15s 3s ago 0
196+
triggerer ✓ RUNNING 12347 2m 15s 3s ago 0
197+
dag-processor ✓ RUNNING 12348 2m 15s 3s ago 0
198+
api-server ✓ RUNNING 12349 2m 15s 3s ago 0
199+
--------------------------------------------------------------------------------
200+
201+
Airflow home: .pytest/.aiida/test/airflow
202+
DAGs folder: .pytest/.aiida/test/airflow/dags
203+
Logs: .pytest/.aiida/test/airflow/logs
204+
```
205+
206+
207+
116208
### Start Airflow services in foreground (recommended for debugging)
117209

118210
If you prefer to start services manually in separate terminals:
@@ -157,10 +249,16 @@ hatch test
157249

158250
Be sure that all airflow services and the docker service have been stopped.
159251
The docker service can be stopped with.
160-
```
252+
253+
```bash
161254
hatch run hatch-test.py3.11:stop-psql-service
162255
```
163256

257+
The Airflow services
258+
```bash
259+
hatch run hatch-test.py3.11:daemon-stop
260+
```
261+
164262
To clean test artifacts including the PostgreSQL databasese cluster, as well as the aiida and the airflow config.
165263
```bash
166264
hatch run hatch-test.py3.11:clean

pyproject.toml

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ Source = "..." # TODO
3737
file = "README.md"
3838
content-type = "text/markdown"
3939

40+
[project.scripts]
41+
airflow-provider-aiida-triggerer-service = "airflow_provider_aiida.aiida_core.engine.daemon.triggerer_service:main"
42+
4043
[project.entry-points.apache_airflow_provider]
4144
provider_info = "airflow_provider_aiida.__init__:get_provider_info"
4245

@@ -54,6 +57,7 @@ python = "3.11"
5457

5558
[tool.hatch.envs.hatch-test]
5659
default-args = []
60+
installer = "uv"
5761
dependencies = [
5862
"pytest>=7.0",
5963
"pytest-mock>=3.10",
@@ -83,9 +87,6 @@ AIRFLOW_PROVIDER_AIIDA__TESTS__AIRFLOW_POSTGRES_PASSWORD = "password"
8387
setup-profile = [
8488
"python scripts/cmd_profile_create.py --postgres-host {env:POSTGRES_HOST} --postgres-port {env:POSTGRES_HOST_PORT} --postgres-user {env:POSTGRES_USER} --postgres-password {env:POSTGRES_PASSWORD} --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} --aiida-password {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIRFLOW_POSTGRES_PASSWORD}",
8589
]
86-
reserialize = [
87-
"python scripts/cmd_airflow.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} dags reserialize --bundle-name aiida_dags",
88-
]
8990

9091
teardown-profile = [
9192
"python scripts/cmd_profile_delete.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} --postgres-user {env:POSTGRES_USER} --postgres-password {env:POSTGRES_PASSWORD}"
@@ -96,6 +97,11 @@ start-psql-service = "docker compose -f docker-compose.test.yml up -d"
9697
stop-psql-service = "docker compose -f docker-compose.test.yml down -v"
9798
status-psql-service = "docker ps"
9899

100+
# Daemon commands
101+
daemon-start = "python scripts/cmd_daemon_start.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE}"
102+
daemon-stop = "python scripts/cmd_daemon_stop.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE}"
103+
daemon-status = "python scripts/cmd_daemon_status.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE}"
104+
99105
# Start individual Airflow services (for manual testing)
100106
scheduler = "python scripts/cmd_airflow.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} scheduler"
101107
api-server = "python scripts/cmd_airflow.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} api-server"
@@ -108,6 +114,13 @@ integration-tests = "pytest -m 'integration' {args:{root}/tests}"
108114
# Default test runner - excludes integration tests, only runs tests/ directory
109115
run = "pytest {args:{root}/tests}"
110116

117+
kill-zombies = [
118+
"pkill -f 'airflow dag-processor'",
119+
"pkill -f 'airflow api-server'",
120+
"pkill -f 'airflow scheduler'",
121+
"pkill -f 'airflow triggerer'",
122+
]
123+
111124
# Clean test artifacts
112125
# NOTE: Only use after services have been stopped to
113126
clean = [

scripts/cmd_airflow.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
"""
1414

1515
import sys
16-
import os
1716
import subprocess
1817
from airflow_provider_aiida.aiida_core import load_profile
1918

scripts/cmd_daemon_start.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#!/usr/bin/env python
2+
"""
3+
Start Airflow services for testing using the new Airflow Daemon.
4+
5+
This script is designed to be run via hatch:
6+
hatch run hatch-test.py3.11:start-airflow-services
7+
"""
8+
9+
import sys
10+
import os
11+
import argparse
12+
13+
14+
def start_services(profile_name: str, num_sync_workers: int, num_async_workers: int, foreground: bool):
15+
"""Start all Airflow services using the daemon."""
16+
from airflow_provider_aiida.aiida_core.engine.daemon.airflow_daemon import AirflowDaemon
17+
18+
print("=== Starting Airflow Test Services ===\n")
19+
print(f"Sync workers (scheduler): {num_sync_workers}")
20+
print(f"Async workers (triggerer): {num_async_workers}\n")
21+
22+
try:
23+
# Load AiiDA profile
24+
from airflow_provider_aiida.aiida_core import load_profile
25+
aiida_profile = load_profile(profile_name)
26+
AirflowDaemon(aiida_profile).start(
27+
num_workers=num_sync_workers,
28+
num_triggerers=num_async_workers,
29+
foreground=foreground
30+
)
31+
32+
return 0
33+
34+
except Exception as e:
35+
print(f"\n✗ Error: {e}")
36+
import traceback
37+
traceback.print_exc()
38+
return 1
39+
40+
41+
def main():
42+
"""Start Airflow services."""
43+
parser = argparse.ArgumentParser(
44+
description="Start Airflow test services using daemon architecture",
45+
formatter_class=argparse.RawDescriptionHelpFormatter,
46+
epilog="""
47+
Examples:
48+
# Start with default (1 sync worker, 1 async worker)
49+
python cmd_daemon_start.py
50+
51+
# Start with 2 sync workers and 3 async workers
52+
python cmd_daemon_start.py 2 3
53+
54+
# Start with 4 sync and 4 async workers, with specific profile
55+
python cmd_daemon_start.py 4 4 --profile-name myprofile
56+
57+
# Start in foreground mode with defaults
58+
python cmd_daemon_start.py --foreground
59+
"""
60+
)
61+
62+
parser.add_argument(
63+
'num_sync_workers',
64+
type=int,
65+
nargs='?',
66+
default=1,
67+
help='Number of sync workers (scheduler parallelism, default: 1)'
68+
)
69+
70+
parser.add_argument(
71+
'num_async_workers',
72+
type=int,
73+
nargs='?',
74+
default=1,
75+
help='Number of async workers (triggerer instances, default: 1)'
76+
)
77+
78+
parser.add_argument(
79+
'--foreground', '-f',
80+
action='store_true',
81+
help='Run daemon in foreground (default: background)'
82+
)
83+
84+
parser.add_argument(
85+
'--profile-name',
86+
default=os.getenv('AIIDA_PROFILE'),
87+
help='AiiDA profile name (default: from AIIDA_PROFILE env)'
88+
)
89+
90+
args = parser.parse_args()
91+
92+
if args.num_sync_workers < 1:
93+
parser.error("Number of sync workers must be at least 1")
94+
if args.num_async_workers < 1:
95+
parser.error("Number of async workers must be at least 1")
96+
97+
return start_services(
98+
args.profile_name,
99+
args.num_sync_workers,
100+
args.num_async_workers,
101+
args.foreground
102+
)
103+
104+
105+
if __name__ == "__main__":
106+
sys.exit(main())

0 commit comments

Comments
 (0)