Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 99 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ hatch run hatch-test.py3.11:setup-profile
hatch run hatch-test.py3.11:unit-tests
```

To run the integration tests we need to start the Airflow services before.

```bash
# 1. Start PostgreSQL
hatch run hatch-test.py3.11:start-psql-service

# 2. Create AiiDA profile and databases
hatch run hatch-test.py3.11:setup-profile

# 3. Start Airflow services
hatch run hatch-test.py3.11:daemon-start

# 4. Run integration tests
hatch run hatch-test.py3.11:integration-tests
```

## Unit tests

### Setup test environment
Expand Down Expand Up @@ -84,6 +100,22 @@ postgres (admin)
The profile will be located at `.pytest/.aiida/test/` (or `$AIIDA_PATH/.aiida/test/` if `AIIDA_PATH` is set) with the following structure:
```
.pytest/.aiida/test/
├── daemon/ # Daemon process management
│ ├── services/ # Per-service directories
│ │ ├── scheduler/
│ │ │ ├── state.json # PID, state, timestamps
│ │ │ ├── stdout.log # Service output
│ │ │ └── stderr.log # Service errors
│ │ ├── triggerer/
│ │ │ ├── state.json
│ │ │ ├── stdout.log
│ │ │ └── stderr.log
│ │ ├── dag-processor/
│ │ │ └── ...
│ │ └── api-server/
│ │ └── ...
│ ├── daemon.pid # Daemon PID file
│ └── daemon.log # Daemon output (background mode)
└── airflow/ # Airflow files
├── dags/ # DAG files
└── airflow.cfg # Airflow configuration
Expand Down Expand Up @@ -113,6 +145,66 @@ hatch test -- -m 'not integration'

Integration tests require running Airflow services (scheduler, triggerer, dag-processor). These tests are marked with `@pytest.mark.integration`.


### Start Airflow in background with daemon

Use the daemon manager to start all services at once:

```bash
# Start daemon in background (default)
hatch run hatch-test.py3.11:daemon-start
```

The daemon will:
- Check if daemon is already running
- Start all Airflow services (scheduler, triggerer, dag-processor, api-server)
- Run a health monitor thread that tracks service status every 5 seconds
- Detach and run in background

**Background mode** (default):
- Daemon detaches and runs in background
- Services continue running after terminal closes
- Use `daemon-stop` to stop all services
- Use `daemon-status` to check service status

**Foreground mode** (for debugging):
```bash
# Run daemon in foreground with --foreground flag
python scripts/cmd_daemon_start.py --profile-name test --foreground
```
- Keeps daemon running in your terminal
- Press Ctrl+C to gracefully stop all services
- Useful for interactive testing and debugging

Check service status in another terminal:

```bash
hatch run hatch-test.py3.11:daemon-status
```

Expected output:
```
=== Airflow Test Services Status ===

Health monitor daemon: RUNNING (PID: 12345)

Service Status:
--------------------------------------------------------------------------------
Service State PID Uptime Last Check Failures
--------------------------------------------------------------------------------
scheduler ✓ RUNNING 12346 2m 15s 3s ago 0
triggerer ✓ RUNNING 12347 2m 15s 3s ago 0
dag-processor ✓ RUNNING 12348 2m 15s 3s ago 0
api-server ✓ RUNNING 12349 2m 15s 3s ago 0
--------------------------------------------------------------------------------

Airflow home: .pytest/.aiida/test/airflow
DAGs folder: .pytest/.aiida/test/airflow/dags
Logs: .pytest/.aiida/test/airflow/logs
```



### Start Airflow services in foreground (recommended for debugging)

If you prefer to start services manually in separate terminals:
Expand Down Expand Up @@ -157,10 +249,16 @@ hatch test

Be sure that all airflow services and the docker service have been stopped.
The docker service can be stopped with.
```

```bash
hatch run hatch-test.py3.11:stop-psql-service
```

The Airflow services
```bash
hatch run hatch-test.py3.11:daemon-stop
```

To clean test artifacts including the PostgreSQL databasese cluster, as well as the aiida and the airflow config.
```bash
hatch run hatch-test.py3.11:clean
Expand Down
19 changes: 16 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ Source = "..." # TODO
file = "README.md"
content-type = "text/markdown"

[project.scripts]
airflow-provider-aiida-triggerer-service = "airflow_provider_aiida.aiida_core.engine.daemon.triggerer_service:main"

[project.entry-points.apache_airflow_provider]
provider_info = "airflow_provider_aiida.__init__:get_provider_info"

Expand All @@ -54,6 +57,7 @@ python = "3.11"

[tool.hatch.envs.hatch-test]
default-args = []
installer = "uv"
dependencies = [
"pytest>=7.0",
"pytest-mock>=3.10",
Expand Down Expand Up @@ -83,9 +87,6 @@ AIRFLOW_PROVIDER_AIIDA__TESTS__AIRFLOW_POSTGRES_PASSWORD = "password"
setup-profile = [
"python scripts/cmd_profile_create.py --postgres-host {env:POSTGRES_HOST} --postgres-port {env:POSTGRES_HOST_PORT} --postgres-user {env:POSTGRES_USER} --postgres-password {env:POSTGRES_PASSWORD} --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} --aiida-password {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIRFLOW_POSTGRES_PASSWORD}",
]
reserialize = [
"python scripts/cmd_airflow.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} dags reserialize --bundle-name aiida_dags",
]

teardown-profile = [
"python scripts/cmd_profile_delete.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} --postgres-user {env:POSTGRES_USER} --postgres-password {env:POSTGRES_PASSWORD}"
Expand All @@ -96,6 +97,11 @@ start-psql-service = "docker compose -f docker-compose.test.yml up -d"
stop-psql-service = "docker compose -f docker-compose.test.yml down -v"
status-psql-service = "docker ps"

# Daemon commands
daemon-start = "python scripts/cmd_daemon_start.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE}"
daemon-stop = "python scripts/cmd_daemon_stop.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE}"
daemon-status = "python scripts/cmd_daemon_status.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE}"

# Start individual Airflow services (for manual testing)
scheduler = "python scripts/cmd_airflow.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} scheduler"
api-server = "python scripts/cmd_airflow.py --profile-name {env:AIRFLOW_PROVIDER_AIIDA__TESTS__AIIDA_PROFILE} api-server"
Expand All @@ -108,6 +114,13 @@ integration-tests = "pytest -m 'integration' {args:{root}/tests}"
# Default test runner - excludes integration tests, only runs tests/ directory
run = "pytest {args:{root}/tests}"

kill-zombies = [
"pkill -f 'airflow dag-processor'",
"pkill -f 'airflow api-server'",
"pkill -f 'airflow scheduler'",
"pkill -f 'airflow triggerer'",
]

# Clean test artifacts
# NOTE: Only use after services have been stopped to
clean = [
Expand Down
1 change: 0 additions & 1 deletion scripts/cmd_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""

import sys
import os
import subprocess
from airflow_provider_aiida.aiida_core import load_profile

Expand Down
106 changes: 106 additions & 0 deletions scripts/cmd_daemon_start.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python
"""
Start Airflow services for testing using the new Airflow Daemon.

This script is designed to be run via hatch:
hatch run hatch-test.py3.11:start-airflow-services
"""

import sys
import os
import argparse


def start_services(profile_name: str, num_sync_workers: int, num_async_workers: int, foreground: bool):
"""Start all Airflow services using the daemon."""
from airflow_provider_aiida.aiida_core.engine.daemon.airflow_daemon import AirflowDaemon

print("=== Starting Airflow Test Services ===\n")
print(f"Sync workers (scheduler): {num_sync_workers}")
print(f"Async workers (triggerer): {num_async_workers}\n")

try:
# Load AiiDA profile
from airflow_provider_aiida.aiida_core import load_profile
aiida_profile = load_profile(profile_name)
AirflowDaemon(aiida_profile).start(
num_workers=num_sync_workers,
num_triggerers=num_async_workers,
foreground=foreground
)

return 0

except Exception as e:
print(f"\n✗ Error: {e}")
import traceback
traceback.print_exc()
return 1


def main():
"""Start Airflow services."""
parser = argparse.ArgumentParser(
description="Start Airflow test services using daemon architecture",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Start with default (1 sync worker, 1 async worker)
python cmd_daemon_start.py

# Start with 2 sync workers and 3 async workers
python cmd_daemon_start.py 2 3

# Start with 4 sync and 4 async workers, with specific profile
python cmd_daemon_start.py 4 4 --profile-name myprofile

# Start in foreground mode with defaults
python cmd_daemon_start.py --foreground
"""
)

parser.add_argument(
'num_sync_workers',
type=int,
nargs='?',
default=1,
help='Number of sync workers (scheduler parallelism, default: 1)'
)

parser.add_argument(
'num_async_workers',
type=int,
nargs='?',
default=1,
help='Number of async workers (triggerer instances, default: 1)'
)

parser.add_argument(
'--foreground', '-f',
action='store_true',
help='Run daemon in foreground (default: background)'
)

parser.add_argument(
'--profile-name',
default=os.getenv('AIIDA_PROFILE'),
help='AiiDA profile name (default: from AIIDA_PROFILE env)'
)

args = parser.parse_args()

if args.num_sync_workers < 1:
parser.error("Number of sync workers must be at least 1")
if args.num_async_workers < 1:
parser.error("Number of async workers must be at least 1")

return start_services(
args.profile_name,
args.num_sync_workers,
args.num_async_workers,
args.foreground
)


if __name__ == "__main__":
sys.exit(main())
Loading