diff --git a/README.md b/README.md
index fefcfd8..ebeca58 100644
--- a/README.md
+++ b/README.md
@@ -2,8 +2,16 @@
[](https://github.com/bcdev/gaiaflow/actions/workflows/unittest.yml)
[](https://github.com/charliermarsh/ruff)
+[](https://bcdev.github.io/gaiaflow/)
+
+
+
+
+
+

+
(Image created using ChatGPT)
The word `GaiaFlow` is a combination of `Gaia` (the Greek goddess of Earth, symbolizing our planet)
@@ -98,7 +106,9 @@ Any files or folders marked with `^` can be extended, but carefully.
├── utils.py * # Utility function to get the minikube gateway IP required for testing.
├── docker_config.py * # Utility function to get the docker image name based on your project.
├── kube_config_inline * # This file is needed for Airflow to communicate with Minikube when testing locally in a prod env.
-└── dockerfiles/ * # Dockerfiles and compose files
+├── airflow_test.cfg * # This file is needed for testing your airflow dags.
+├── Dockerfile ^ # Dockerfile for your package.
+└── dockerfiles/ * # Dockerfiles required by Docker compose
```
@@ -245,7 +255,7 @@ Once the pre-requisites are done, you can go ahead with the project creation:
When prompted for input, enter the details requested. If you dont provide any
input for a given choice, the first choice from the list is taken as the default.
-Once the project is created, please read the README.md from that.
+Once the project is created, please read the [user guide](https://bcdev.github.io/gaiaflow/dev/).
## Troubleshooting
@@ -322,3 +332,16 @@ If you face any other problems not mentioned above, please reach out to us.
- [Minio](https://min.io/docs/minio/container/index.html)
- [JupyterLab](https://jupyterlab.readthedocs.io/)
- [Minikube](https://minikube.sigs.k8s.io/docs/)
+
+
+### TODO:
+
+Make ECR work. How to add credentials?
+
+S3 credentials access?
+
+Add sensor based DAGs
+
+Make CI unittest using conda instead
+
+Update CI to use ECR credentials.
\ No newline at end of file
diff --git a/docs/dev.md b/docs/dev.md
index 367edbc..8498ff1 100644
--- a/docs/dev.md
+++ b/docs/dev.md
@@ -43,6 +43,7 @@ Any files or folders marked with `^` can be extended, but carefully.
├── utils.py * # Utility function to get the minikube gateway IP required for testing.
├── docker_config.py * # Utility function to get the docker image name based on your project.
├── kube_config_inline * # This file is needed for Airflow to communicate with Minikube when testing locally in a prod env.
+├── airflow_test.cfg * # This file is needed for testing your airflow dags.
├── Dockerfile ^ # Dockerfile for your package.
└── dockerfiles/ * # Dockerfiles required by Docker compose
```
@@ -125,10 +126,11 @@ Then start the MLOps services using:
python mlops_mananger.py --start -b
```
-NOTE: When you run this for the first time, make sure you use the `-b` flag as
-it builds the images for the first time as shown above.
-Next time when you start it again, you start it without the flag as it saves
-time by not building the same images again:
+
+
+**NOTE**: The `-b` flag only needs to be used for the first time.
+For consecutive starts
+and restarts, use the same command as above but without `-b` flag.
### 3. Accessing the services
@@ -275,6 +277,9 @@ mlflow logging. You can literally just add a `#` to the
the files, empty files have 0 bytes of content and that
creates issues with the urllib3 upload to S3 (this
happens inside MLFlow)
+- If there are any errors in using the Minikube manager, try restarting it
+by `python minikube_manager.py --restart` followed by
+`python mlops_manager.py --restart` to make sure that the changes are synced.
diff --git a/docs/prod.md b/docs/prod.md
index 0d9fc95..ac55b0a 100644
--- a/docs/prod.md
+++ b/docs/prod.md
@@ -173,26 +173,28 @@ per project by any team member of that project)
- Make sure you change the `task_factory` mode to `prod`.
- Contact the CDR maintainers and ask them to add your repository as a
`submodule` (They will know what to do!)
+- Create a `PYPI_API_TOKEN` from the PyPi website and add it as a secret to the
+repository.
+- Do the same with `CODECOV_TOKEN` from the CodeCov website.
+- Create a Personal Access Token from your account (make sure your account is a
+member of the bcdev org for this to work).
- - Create a Personal Access Token from your account (make sure your account is a
- member of the bcdev org for this to work).
-
- Do as follows:
+ Do as follows:
- - Go to your Github account Settings
- - Navigate to `<> Developer Settings` (the last button on the page)
- - Create a `Personal access token (classic)`
- - See the image below for reference:
+ - Go to your Github account Settings
+ - Navigate to `<> Developer Settings` (the last button on the page)
+ - Create a `Personal access token (classic)`
+ - See the image below for reference:
- - Only click on the `repo` permissions
- - Create the token
- - Copy and keep it safe somewhere (maybe on KeePassXC)
- - Now, navigate to your newly created project, create a new Secret
- - Go to Repository settings
- - Navigate to `Secrets and Variables -> Actions -> New repository secret`
- - In the `Name` field, insert `CDR_PAT`
- - In the `Secret` field, insert your token that generated a few moments ago
- - Click on `Add Secret`
+ - Only click on the `repo` permissions
+ - Create the token
+ - Copy and keep it safe somewhere (maybe on KeePassXC)
+ - Now, navigate to your newly created project, create a new Secret
+ - Go to Repository settings
+ - Navigate to `Secrets and Variables -> Actions -> New repository secret`
+ - In the `Name` field, insert `CDR_PAT`
+ - In the `Secret` field, insert your token that generated a few moments ago
+ - Click on `Add Secret`
- Now you are ready to deploy your dags to the production Airflow.
diff --git a/env.yml b/env.yml
deleted file mode 100644
index 744028a..0000000
--- a/env.yml
+++ /dev/null
@@ -1,13 +0,0 @@
-name: mkdocs-env
-channels:
- - conda-forge
- - defaults
-dependencies:
- - python=3.12
- - pip
- - pip:
- - mkdocs
- - mkdocs-material
- - mkdocstrings
- - mkdocstrings-python
- - mkdocs-autorefs
diff --git a/tests/test_cookiecutter.py b/tests/test_cookiecutter.py
index 9818c95..b5f7438 100644
--- a/tests/test_cookiecutter.py
+++ b/tests/test_cookiecutter.py
@@ -23,7 +23,7 @@
"minikube_manager.py",
"kube_config_inline",
"utils.py",
- "docker_config.py",
+ "docker_image_name_generator.py",
"docker-compose.yml",
"README.md",
".env",
diff --git a/{{ cookiecutter.folder_name }}/.env b/{{ cookiecutter.folder_name }}/.env
index 727e990..932e49c 100644
--- a/{{ cookiecutter.folder_name }}/.env
+++ b/{{ cookiecutter.folder_name }}/.env
@@ -52,3 +52,5 @@ POSTGRES_AIRFLOW_DB=airflow
J_MLFLOW_TRACKING_URI="http://localhost:5000"
J_MLFLOW_S3_ENDPOINT_URL="http://localhost:9000"
+
+
diff --git a/{{ cookiecutter.folder_name }}/.github/workflows/publish.yml b/{{ cookiecutter.folder_name }}/.github/workflows/publish.yml
index 73bae37..ec9dfe6 100644
--- a/{{ cookiecutter.folder_name }}/.github/workflows/publish.yml
+++ b/{{ cookiecutter.folder_name }}/.github/workflows/publish.yml
@@ -17,24 +17,26 @@ jobs:
- name: checkout
uses: actions/checkout@v4
{% raw %}
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python@v3
+ - name: Set up MicroMamba and install dependencies with Python ${{ matrix.python-version }}
+ uses: mamba-org/setup-micromamba@v1
with:
- python-version: ${{ matrix.python-version }}
+ environment-file: environment.yml
+ create-args: >-
+ python=${{ matrix.python-version }}
{% endraw %}
- - name: Install dependencies
- run: |
- python -m pip install --upgrade pip
- pip install .[dev,lint,test]
-
- name: Lint with ruff
run: |
ruff check
+ - name: Run unit tests
+ shell: bash -l {0}
+ run:
+ pytest test/ --cov=xcube_gedidb --cov-report=xml
+
- name: Run unit tests
shell: bash -l {0}
run: |
- pytest --cov={{ cookiecutter.package_name }} --cov-branch --cov-report=xml
+ pytest -m "not gaiaflow" --cov={{ cookiecutter.package_name }} --cov-branch --cov-report=xml
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
diff --git a/{{ cookiecutter.folder_name }}/.github/workflows/unittest.yml b/{{ cookiecutter.folder_name }}/.github/workflows/unittest.yml
index 33e7c0b..0a2d500 100644
--- a/{{ cookiecutter.folder_name }}/.github/workflows/unittest.yml
+++ b/{{ cookiecutter.folder_name }}/.github/workflows/unittest.yml
@@ -16,25 +16,27 @@ jobs:
steps:
- name: checkout
uses: actions/checkout@v4
- {% raw %}
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python@v3
+ {% raw %}
+ - name: Set up MicroMamba and install dependencies with Python ${{ matrix.python-version }}
+ uses: mamba-org/setup-micromamba@v1
with:
- python-version: ${{ matrix.python-version }}
+ environment-file: environment.yml
+ create-args: >-
+ python=${{ matrix.python-version }}
{% endraw %}
- - name: Install dependencies
- run: |
- python -m pip install --upgrade pip
- pip install .[dev,lint,test]
-
- name: Lint with ruff
run: |
ruff check
+ - name: Run unit tests
+ shell: bash -l {0}
+ run:
+ pytest test/ --cov=xcube_gedidb --cov-report=xml
+
- name: Run unit tests
shell: bash -l {0}
run: |
- pytest --cov={{ cookiecutter.package_name }} --cov-branch --cov-report=xml
+ pytest -m "not gaiaflow" --cov={{ cookiecutter.package_name }} --cov-branch --cov-report=xml
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4
diff --git a/{{ cookiecutter.folder_name }}/README.md b/{{ cookiecutter.folder_name }}/README.md
index 8ae364c..75a0003 100644
--- a/{{ cookiecutter.folder_name }}/README.md
+++ b/{{ cookiecutter.folder_name }}/README.md
@@ -13,7 +13,7 @@ Add License Badge
{{ cookiecutter.project_name|capitalize }}
-Please have a look at the documentation to get started!
+Please have a look at the [documentation](https://bcdev.github.io/gaiaflow/getting_started/) to get started!
Feel free to update this README as your own.
To add a license, please choose which license you want for your project
diff --git a/{{ cookiecutter.folder_name }}/airflow_test.cfg b/{{ cookiecutter.folder_name }}/airflow_test.cfg
new file mode 100644
index 0000000..f252796
--- /dev/null
+++ b/{{ cookiecutter.folder_name }}/airflow_test.cfg
@@ -0,0 +1,3 @@
+[core]
+dags_folder = ./dags
+load_examples = False
\ No newline at end of file
diff --git a/{{ cookiecutter.folder_name }}/dags/README.md b/{{ cookiecutter.folder_name }}/dags/README.md
index 56ac717..6506abb 100644
--- a/{{ cookiecutter.folder_name }}/dags/README.md
+++ b/{{ cookiecutter.folder_name }}/dags/README.md
@@ -18,8 +18,6 @@ DAGs.
- For testing purposes, you can trigger them manually. If you would like to also manually trigger them for your workflow
you can!
- But if you want your DAG to run periodically, setting the start_date and schedule is important.
-- NOTE: By default, if you set a `start_date` in the past, Airflow will try to backfill all those runs. To avoid that,
-use catchup=False inside the dag definitions.
## Common parameters used while defining a DAG
diff --git a/{{ cookiecutter.folder_name }}/dags/change_me_task_factory_dag.py b/{{ cookiecutter.folder_name }}/dags/change_me_task_factory_dag.py
index a66283a..18974c3 100644
--- a/{{ cookiecutter.folder_name }}/dags/change_me_task_factory_dag.py
+++ b/{{ cookiecutter.folder_name }}/dags/change_me_task_factory_dag.py
@@ -112,7 +112,7 @@
# create a docker image to run your package with all the dependencies included.
# Please update the image name below:
# TODO: Talk with Tejas to align on image naming.
- image="my-local-image/my-package:0.0.1",
+ image="",
# TODO: Discuss with Tejas about a process for creating secrets
secrets=["my-minio-creds"],
@@ -149,7 +149,7 @@
},
},
- image="my-local-image/my-package:0.0.1",
+ image="",
secrets=["my-minio-creds"],
env_vars={
"MLFLOW_TRACKING_URI": f"http://{MINIKUBE_GATEWAY}:5000",
@@ -174,7 +174,7 @@
"key": "return_value",
},
},
- image="my-local-image/my-package:0.0.1",
+ image="",
secrets=["my-minio-creds"],
env_vars={
"MLFLOW_TRACKING_URI": f"http://{MINIKUBE_GATEWAY}:5000",
@@ -187,35 +187,4 @@
trainer >> predictor
-# TODO:
-# [DONE] Update ti.xcom code with simple return dict statements.
-# [DONE] Update the cookiecutter so that it allows using Airflow standalone (
-# without
-# MLOps) for projects requiring only Airflow.
-# Make ECR work. How to add credentials?
-# [DONE]Make sure task factory works out of the box when new projects are
-# created.
-# [DONE]Add tests for Airflow dags.
-# [DONE]Update the documentation stating that we should only return simple
-# objects from the
-# main function that airflow needs to execute.
-# [DONE]Update documentation providing best practices while working with
-# Docker (
-# cleanup images on registry, local etc.)
-# S3 credentials access?
-# Add sensor based DAGs
-# [DONE] Add version.py in package
-# [DONE] Improve change_me_train.py and other files.
-# Make CI unittest using conda instead
-# Update CI to use ECR credentials.
-# Run ruff, isort.
-# [done] Update documentation also including, restarting airflow service after
-# env update. now possible using --restart
-# [done] after starting prod, restart airflow containers.
-# [done] on windows, run pytest --ignore=logs and before that run set
-# [done] AIRFLOW_CONFIG=%cd%\airflow_test.cfg
-# check jupyter notebooks if they work to be sure.
-# [DONE] add task_factory tutorial
-# [DONE] write up about the architecture
-# [DONE] check all files and readmes once more.
-# [DONE] update the architecture diagram in main README
\ No newline at end of file
+
diff --git a/{{ cookiecutter.folder_name }}/dags/example_task_factory_dag.py b/{{ cookiecutter.folder_name }}/dags/example_task_factory_dag.py
index 05a0248..bc20cb9 100644
--- a/{{ cookiecutter.folder_name }}/dags/example_task_factory_dag.py
+++ b/{{ cookiecutter.folder_name }}/dags/example_task_factory_dag.py
@@ -62,10 +62,10 @@
func_path="{{ cookiecutter.package_name }}.example_preprocess",
func_kwargs={"dummy_arg": "hello world"},
- # For prod_local and prod mode only
- # When you run the ./prod_local_setup.sh as shown above, it will also
- # create a docker image from your package with your environment.yml.
- # Please put the image name below
+ # # For prod_local and prod mode only
+ # You must run the `python minikube_manager.py --build-only`, it will then
+ # create a docker image to run your package with all the dependencies included.
+ # Please update the image name below:
image="",
secrets=["my-minio-creds"],
env_vars={
diff --git a/{{ cookiecutter.folder_name }}/docker-compose.yml b/{{ cookiecutter.folder_name }}/docker-compose.yml
index 66ac099..c8e1030 100644
--- a/{{ cookiecutter.folder_name }}/docker-compose.yml
+++ b/{{ cookiecutter.folder_name }}/docker-compose.yml
@@ -6,9 +6,17 @@ x-airflow-common:
context: .
dockerfile: ${AIRFLOW_DOCKERFILE}
environment:
+ &airflow-common-env
+ AIRFLOW__CORE__AUTH_MANAGER: "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager"
+ AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
+ AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
AIRFLOW__CORE__EXECUTOR: ${CORE_EXECUTOR}
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: ${AIRFLOW_DB_CONN}
+ AIRFLOW__CORE__FERNET_KEY: ''
+# NOTE: The following secret can be commited to .git as it is only for local development.
+ AIRFLOW__API_AUTH__JWT_SECRET: 'dkWAUJ756yP9oJhTXhG0nj-wTvRJMoSTioGFHECuO0k'
AIRFLOW__CORE__LOAD_EXAMPLES: ${LOAD_EXAMPLES}
+ AIRFLOW__LOGGING__LOGGING_LEVEL: 'DEBUG'
+ AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: ${AIRFLOW_DB_CONN}
MLFLOW_TRACKING_URI: http://mlflow:${MLFLOW_HOST_PORT}
AWS_ACCESS_KEY_ID: ${AWS_ACCESS_KEY_ID}
AWS_SECRET_ACCESS_KEY: ${AWS_SECRET_ACCESS_KEY}
@@ -18,6 +26,7 @@ x-airflow-common:
LD_LIBRARY_PATH: /home/airflow/.local/share/mamba/envs/{{ cookiecutter.folder_name }}/lib:/lib/x86_64-linux-gnu:${LD_LIBRARY_PATH}
AIRFLOW__CORE__DEFAULT_PYTHON_INTERPRETER: /home/airflow/.local/share/mamba/envs/{{ cookiecutter.folder_name }}/bin/python
PYTHONNOUSERSITE: 1
+ user: "${AIRFLOW_UID:-50000}:0"
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
@@ -95,17 +104,32 @@ services:
networks:
- ml-network
- airflow-webserver:
- container_name: airflow-webserver
+ airflow-apiserver:
<<: *airflow-common
- command: webserver
+ command: api-server
ports:
- ${AIRFLOW_WEBSERVER_PORT:-8080}:8080
healthcheck:
- test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
- interval: 10s
+ test: [ "CMD", "curl", "--fail", "http://localhost:8080/api/v2/version" ]
+ interval: 30s
timeout: 10s
retries: 5
+ start_period: 30s
+ restart: always
+ depends_on:
+ <<: *airflow-common-depends-on
+ airflow-init:
+ condition: service_completed_successfully
+
+ airflow-dag-processor:
+ <<: *airflow-common
+ command: dag-processor
+ healthcheck:
+ test: [ "CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"' ]
+ interval: 30s
+ timeout: 10s
+ retries: 5
+ start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
@@ -132,13 +156,112 @@ services:
airflow-init:
container_name: airflow-init
<<: *airflow-common
+ entrypoint: /bin/bash
+ # yamllint disable rule:line-length
command:
- - "bash"
- "-c"
- |
- airflow db init &&
- airflow db migrate &&
- airflow users create --username admin --password admin --firstname Admin --lastname User --role Admin --email admin@example.org
+ echo "=== FORCING USER CREATION ==="
+ echo "Airflow's auth manager check is broken on windows, creating user manually..."
+
+ # Create the user directly, bypassing the broken auth manager check
+ /entrypoint airflow users create \
+ --username "${_AIRFLOW_WWW_USER_USERNAME:-admin}" \
+ --firstname "Admin" \
+ --lastname "User" \
+ --role "Admin" \
+ --email "admin@example.com" \
+ --password "${_AIRFLOW_WWW_USER_PASSWORD:-admin}" \
+ --verbose || echo "User creation failed or user already exists"
+
+ echo "=== USER CREATION COMPLETED ==="
+ python -c "import airflow.providers.fab.auth_manager.fab_auth_manager; print('FAB provider imported successfully')"
+ python -c "from airflow.configuration import conf; print('Detected auth manager:', conf.get('core', 'auth_manager'))"
+ if [[ -z "${AIRFLOW_UID}" ]]; then
+ echo
+ echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
+ echo "If you are on Linux, you SHOULD follow the instructions below to set "
+ echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
+ echo "For other operating systems you can get rid of the warning with manually created .env file:"
+ echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
+ echo
+ export AIRFLOW_UID=$(id -u)
+ fi
+ one_meg=1048576
+ mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
+ cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
+ disk_available=$$(df / | tail -1 | awk '{print $$4}')
+ warning_resources="false"
+ if (( mem_available < 4000 )) ; then
+ echo
+ echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
+ echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
+ echo
+ warning_resources="true"
+ fi
+ if (( cpus_available < 2 )); then
+ echo
+ echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
+ echo "At least 2 CPUs recommended. You have $${cpus_available}"
+ echo
+ warning_resources="true"
+ fi
+ if (( disk_available < one_meg * 10 )); then
+ echo
+ echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
+ echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
+ echo
+ warning_resources="true"
+ fi
+ if [[ $${warning_resources} == "true" ]]; then
+ echo
+ echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
+ echo "Please follow the instructions to increase amount of resources available:"
+ echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
+ echo
+ fi
+ echo
+ echo "Creating missing opt dirs if missing:"
+ echo
+ mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
+ echo
+ echo "Airflow version:"
+ /entrypoint airflow version
+ echo
+ echo "Files in shared volumes:"
+ echo
+ ls -la /opt/airflow/{logs,dags,plugins,config}
+ echo
+ echo "Running airflow config list to create default config file if missing."
+ echo
+ /entrypoint airflow config list >/dev/null
+ echo
+ echo "Files in shared volumes:"
+ echo
+ ls -la /opt/airflow/{logs,dags,plugins,config}
+ echo
+ echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
+ echo
+ chown -R "${AIRFLOW_UID}:0" /opt/airflow/
+ echo
+ echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
+ echo
+ chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
+ echo
+ echo "Files in shared volumes:"
+ echo
+ ls -la /opt/airflow/{logs,dags,plugins,config}
+
+ # yamllint enable rule:line-length
+ environment:
+ <<: *airflow-common-env
+ _AIRFLOW_DB_MIGRATE: 'true'
+ _AIRFLOW_WWW_USER_CREATE: 'true'
+ _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-admin}
+ _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-admin}
+ _PIP_ADDITIONAL_REQUIREMENTS: ''
+ AIRFLOW__CORE__AUTH_MANAGER: "airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager"
+ user: "0:0"
restart: no
minio:
diff --git a/{{ cookiecutter.folder_name }}/docker_config.py b/{{ cookiecutter.folder_name }}/docker_image_name_generator.py
similarity index 100%
rename from {{ cookiecutter.folder_name }}/docker_config.py
rename to {{ cookiecutter.folder_name }}/docker_image_name_generator.py
diff --git a/{{ cookiecutter.folder_name }}/dockerfiles/airflow/Dockerfile b/{{ cookiecutter.folder_name }}/dockerfiles/airflow/Dockerfile
index ae14e6f..61131a2 100644
--- a/{{ cookiecutter.folder_name }}/dockerfiles/airflow/Dockerfile
+++ b/{{ cookiecutter.folder_name }}/dockerfiles/airflow/Dockerfile
@@ -1,6 +1,6 @@
# 🚨 WARNING: Do not modify this Dockerfile without consulting the team! 🚨
-FROM apache/airflow:2.10.4-python3.12
+FROM apache/airflow:3.0.2-python3.12
USER root
WORKDIR /opt/airflow
diff --git a/{{ cookiecutter.folder_name }}/environment.yml b/{{ cookiecutter.folder_name }}/environment.yml
index a0c9460..e34b3a9 100644
--- a/{{ cookiecutter.folder_name }}/environment.yml
+++ b/{{ cookiecutter.folder_name }}/environment.yml
@@ -6,15 +6,17 @@ dependencies:
- python >3.10,<3.13
- mlflow
- psycopg2-binary
+ - asyncpg
- boto3
- - airflow < 3.0
+ - airflow
- apache-airflow-providers-cncf-kubernetes
+ - apache-airflow-providers-fab
- jupyter
- python-dotenv
- git
- fsspec
{% if cookiecutter.show_examples == "yes" %}
- # This is only needed for an exmaple. If you dont need this, please feel free to remove it.
+ # This is only needed for the examples. If you don't need this, please feel free to remove it.
- keras
- tensorflow
- numpy =2.0.2
diff --git a/{{ cookiecutter.folder_name }}/minikube_manager.py b/{{ cookiecutter.folder_name }}/minikube_manager.py
index 62fae8e..656aeba 100644
--- a/{{ cookiecutter.folder_name }}/minikube_manager.py
+++ b/{{ cookiecutter.folder_name }}/minikube_manager.py
@@ -9,7 +9,7 @@
from typing import Any
import yaml
-from docker_config import DOCKER_IMAGE_NAME
+from docker_image_name_generator import DOCKER_IMAGE_NAME
class MinikubeManager:
@@ -42,7 +42,7 @@ def error(message):
def run(self, command: list, error: str, env=None):
try:
- return subprocess.check_call(command, env=env)
+ subprocess.call(command, env=env)
except subprocess.CalledProcessError:
self.error(error)
@@ -128,7 +128,7 @@ def create_kube_config_inline(self):
self.log("Creating kube config inline file...")
with open(filename, "w") as f:
- subprocess.check_call(
+ subprocess.call(
[
"minikube",
"kubectl",
diff --git a/{{ cookiecutter.folder_name }}/mlops_manager.py b/{{ cookiecutter.folder_name }}/mlops_manager.py
index c9364ba..b30799f 100644
--- a/{{ cookiecutter.folder_name }}/mlops_manager.py
+++ b/{{ cookiecutter.folder_name }}/mlops_manager.py
@@ -1,5 +1,6 @@
import argparse
import os
+import platform
import socket
import subprocess
import sys
@@ -16,6 +17,7 @@ def __init__(self, action=None, service=None, cache=False, jupyter_port=8895, de
self.jupyter_port = jupyter_port
self.delete_volume = delete_volume
self.docker_build = docker_build
+ self.os_type = platform.system().lower()
if action == "stop":
self.cleanup()
@@ -37,8 +39,8 @@ def error(message: str):
def run(self, command: list, error: str):
try:
- return subprocess.check_call(command)
- except subprocess.CalledProcessError:
+ subprocess.call(command)
+ except Exception:
self.log(error)
raise
@@ -83,8 +85,10 @@ def stop_jupyter(self):
@staticmethod
def docker_services_for(component):
services = {
- "airflow": ["airflow-webserver", "airflow-scheduler", "airflow-init", "postgres-airflow"],
- "mlflow": ["mlflow", "postgres-mlflow", "minio", "minio_client"],
+ "airflow": ["airflow-apiserver", "airflow-scheduler",
+ "airflow-init", "airflow-dag-processor",
+ "postgres-airflow", "minio", "minio_client"],
+ "mlflow": ["mlflow", "postgres-mlflow"],
}
return services.get(component, [])
@@ -119,28 +123,55 @@ def start_jupyter(self):
cmd = ["jupyter", "lab", "--ip=0.0.0.0", f"--port={self.jupyter_port}"]
subprocess.Popen(cmd)
+ def update_env_file_with_airflow_uid(self, env_path=".env"):
+ if self.os_type == "linux":
+ uid = str(os.getuid())
+ else:
+ uid = 50000
+
+ lines = []
+ if os.path.exists(env_path):
+ with open(env_path, "r") as f:
+ lines = f.readlines()
+
+ key_found = False
+ new_lines = []
+ for line in lines:
+ if line.strip().startswith("AIRFLOW_UID="):
+ new_lines.append(f"AIRFLOW_UID={uid}\n")
+ key_found = True
+ else:
+ new_lines.append(line)
+
+ if not key_found:
+ new_lines.append(f"AIRFLOW_UID={uid}\n")
+
+ with open(env_path, "w") as f:
+ f.writelines(new_lines)
+
+ print(f"Set AIRFLOW_UID={uid} in {env_path}")
+
def start(self):
self.log("Setting up directories...")
self.create_directory("logs")
self.create_directory("data")
+ self.update_env_file_with_airflow_uid()
if self.service == "jupyter":
self.check_port()
if self.docker_build:
- build_cmd = ["docker", "compose", "build"]
+ build_cmd = ["build"]
if not self.cache:
build_cmd.append("--no-cache")
self.log("Building Docker images")
- self.run(build_cmd, "Error during Docker build")
+ self.docker_compose_action(build_cmd, self.service)
if self.service == "jupyter":
self.start_jupyter()
else:
self.docker_compose_action(["up", "-d"], service=self.service)
- self.log("MLOps service started!")
-
def main():
parser = argparse.ArgumentParser(description="Gaiaflow: MLOps Environment Launcher")
diff --git a/{{ cookiecutter.folder_name }}/tests/conftest.py b/{{ cookiecutter.folder_name }}/tests/conftest.py
index 9bd41a8..e44c7f6 100644
--- a/{{ cookiecutter.folder_name }}/tests/conftest.py
+++ b/{{ cookiecutter.folder_name }}/tests/conftest.py
@@ -1,5 +1,4 @@
import os
-
def pytest_configure():
os.environ['AIRFLOW_CONFIG'] = os.path.join(os.getcwd(), 'airflow_test.cfg')
\ No newline at end of file
diff --git a/{{ cookiecutter.folder_name }}/tests/dags/test_dag_integrity.py b/{{ cookiecutter.folder_name }}/tests/dags/test_dag_integrity.py
index fa1f975..3b059e9 100644
--- a/{{ cookiecutter.folder_name }}/tests/dags/test_dag_integrity.py
+++ b/{{ cookiecutter.folder_name }}/tests/dags/test_dag_integrity.py
@@ -20,11 +20,11 @@ def test_dag_in_detail(dag_id, dag):
assert dag.tags, f"DAG '{dag_id}' has no tags."
assert dag.catchup is False, f"DAG '{dag_id}' has catchup enabled."
try:
- assert dag.schedule_interval is not None, (
- f"DAG '{dag_id}' has no schedule_interval."
+ assert dag.schedule is not None, (
+ f"DAG '{dag_id}' has no schedule."
)
except AssertionError:
- logging.warning(f"DAG '{dag_id}' has no schedule_interval.")
+ logging.warning(f"DAG '{dag_id}' has no schedule.")
assert len(dag.tasks) > 0, f"DAG '{dag_id}' has no tasks."
task_ids = [task.task_id for task in dag.tasks]
diff --git a/{{ cookiecutter.folder_name }}/tests/test_minikube_manager.py b/{{ cookiecutter.folder_name }}/tests/test_minikube_manager.py
index 4f51bd0..19b9401 100644
--- a/{{ cookiecutter.folder_name }}/tests/test_minikube_manager.py
+++ b/{{ cookiecutter.folder_name }}/tests/test_minikube_manager.py
@@ -1,7 +1,7 @@
import subprocess
import unittest
from pathlib import Path
-from unittest.mock import ANY, mock_open, patch
+from unittest.mock import ANY, mock_open, patch, call
import pytest
from minikube_manager import MinikubeManager, main
@@ -42,7 +42,7 @@ def test_init_with_create_config_only(self, mock_cfg):
@patch.object(MinikubeManager, "error")
def test_run_with_exception_calls_error(self, mock_error):
- with patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, ["fake"])):
+ with patch("subprocess.call", side_effect=subprocess.CalledProcessError(1, ["fake"])):
self.manager.run(["fake"], "Failed!")
mock_error.assert_called_once_with("Failed!")
@@ -65,26 +65,26 @@ def test_main_with_build_only_flag(self, mock_init):
mock_init.assert_called_once()
self.assertTrue(mock_init.call_args.kwargs["build_only"])
- @patch('subprocess.check_call')
- def test_run_success(self, mock_check_call):
+ @patch('subprocess.call')
+ def test_run_success(self, mock_call):
manager = MinikubeManager()
- mock_check_call.return_value = 0
+ mock_call.return_value = 0
manager.run(["echo", "hello"], "Error executing command")
- mock_check_call.assert_called_with(["echo", "hello"], env=None)
+ mock_call.assert_called_with(["echo", "hello"], env=None)
- @patch('subprocess.check_call')
- def test_run_error(self, mock_check_call):
+ @patch('subprocess.call')
+ def test_run_error(self, mock_call):
manager = MinikubeManager()
- mock_check_call.side_effect = subprocess.CalledProcessError(1, "echo")
+ mock_call.side_effect = subprocess.CalledProcessError(1, "echo")
with self.assertRaises(SystemExit):
manager.run(["echo", "hello"], "Error executing command")
- @patch('subprocess.check_call')
+ @patch('subprocess.call')
@patch('subprocess.run')
- def test_start(self, mock_subprocess_run, mock_check_call):
+ def test_start(self, mock_subprocess_run, mock_call):
manager = MinikubeManager()
- mock_check_call.return_value = None
+ mock_call.return_value = None
mock_subprocess_run.return_value.returncode = 0
with patch.object(manager, 'start_minikube') as mock_start_minikube, \
@@ -126,35 +126,41 @@ def test_start_minikube_not_running(self, mock_subprocess_run):
"Error starting minikube profile [airflow]"
)
- @patch('subprocess.check_call')
- def test_stop_minikube(self, mock_check_call):
+ @patch('subprocess.call')
+ def test_stop_minikube(self, mock_call):
manager = MinikubeManager()
manager.stop_minikube()
- mock_check_call.assert_called_with(
- ["minikube", "stop", "--profile", manager.minikube_profile],
- env=ANY
+ call_args_list = mock_call.call_args_list
+ print(call_args_list)
+ self.assertEqual(
+ call_args_list[0],
+ call(['minikube', 'stop', '--profile', 'airflow'], env=None)
+ )
+ self.assertEqual(
+ call_args_list[1],
+ call(["minikube", "delete", "--profile", "airflow"], env=None),
)
@patch('subprocess.run')
- @patch('subprocess.check_call')
- def test_build_docker_image(self, mock_check_call, mock_subprocess_run):
+ @patch('subprocess.call')
+ def test_build_docker_image(self, mock_call, mock_subprocess_run):
manager = MinikubeManager()
mock_subprocess_run.return_value.stdout = b"export DOCKER_TLS_VERIFY=\"1\"\nexport DOCKER_HOST=\"tcp://127.0.0.1:2376\""
manager.build_docker_image()
- mock_check_call.assert_called_with(
+ mock_call.assert_called_with(
["docker", "build", "-t", manager.docker_image_name, "."],
env=ANY
)
- @patch('subprocess.check_call')
+ @patch('subprocess.call')
@patch('subprocess.check_output')
- def test_create_kube_config_inline(self, mock_check_output, mock_check_call):
+ def test_create_kube_config_inline(self, mock_check_output, mock_call):
manager = MinikubeManager()
mock_check_output.return_value = b"minikube kubeconfig"
with patch('pathlib.Path.home', return_value=Path("C:/Users/test")):
with patch('builtins.open', mock_open(read_data='{"dummy config"}')):
manager.create_kube_config_inline()
- mock_check_call.assert_called_with(
+ mock_call.assert_called_with(
[
"minikube", "kubectl", "--", "config", "view", "--flatten", "--minify", "--raw"
],
diff --git a/{{ cookiecutter.folder_name }}/tests/test_mlops_manager.py b/{{ cookiecutter.folder_name }}/tests/test_mlops_manager.py
index 49ad455..00cb9be 100644
--- a/{{ cookiecutter.folder_name }}/tests/test_mlops_manager.py
+++ b/{{ cookiecutter.folder_name }}/tests/test_mlops_manager.py
@@ -85,28 +85,30 @@ def test_stop_jupyter(self, mock_process_iter):
def test_docker_services_for(self):
self.assertEqual(
MlopsManager.docker_services_for("airflow"),
- ["airflow-webserver", "airflow-scheduler", "airflow-init", "postgres-airflow"],
+ ["airflow-apiserver", "airflow-scheduler", "airflow-init",
+ "airflow-dag-processor", "postgres-airflow", "minio",
+ "minio_client"],
)
self.assertEqual(MlopsManager.docker_services_for("none"), [])
- @patch("subprocess.check_call")
- def test_docker_compose_action_all(self, mock_check_call):
+ @patch("subprocess.call")
+ def test_docker_compose_action_all(self, mock_call):
self.manager.docker_compose_action(["up", "-d"])
- mock_check_call.assert_called_once()
+ mock_call.assert_called_once()
- @patch("subprocess.check_call")
- def test_docker_compose_action_with_service(self, mock_check_call):
+ @patch("subprocess.call")
+ def test_docker_compose_action_with_service(self, mock_call):
self.manager.service = "mlflow"
self.manager.docker_compose_action(["up", "-d"], "mlflow")
- self.assertTrue(mock_check_call.called)
+ self.assertTrue(mock_call.called)
def test_docker_compose_action_with_invalid_service(self):
with self.assertRaises(SystemExit):
self.manager.docker_compose_action(["up", "-d"], "unknown_service")
- @patch("subprocess.check_call")
+ @patch("subprocess.call")
@patch("psutil.process_iter", return_value=[])
- def test_cleanup_jupyter(self, mock_iter, mock_check_call):
+ def test_cleanup_jupyter(self, mock_iter, mock_call):
manager = MlopsManager(
action=None,
service="jupyter",
@@ -117,60 +119,60 @@ def test_cleanup_jupyter(self, mock_iter, mock_check_call):
)
manager.cleanup()
self.assertTrue(mock_iter.called)
- mock_check_call.assert_not_called()
+ mock_call.assert_not_called()
- @patch("subprocess.check_call")
- def test_cleanup_with_volume(self, mock_check_call):
+ @patch("subprocess.call")
+ def test_cleanup_with_volume(self, mock_call):
self.manager.service = "mlflow"
self.manager.delete_volume = True
self.manager.cleanup()
- mock_check_call.assert_called_with(
- ["docker", "compose", "down", "-v", "mlflow", 'postgres-mlflow', "minio", "minio_client"]
+ mock_call.assert_called_with(
+ ["docker", "compose", "down", "-v", "mlflow", 'postgres-mlflow']
)
- @patch("subprocess.check_call")
- def test_cleanup_without_volume(self, mock_check_call):
+ @patch("subprocess.call")
+ def test_cleanup_without_volume(self, mock_call):
self.manager.service = "mlflow"
self.manager.delete_volume = False
self.manager.cleanup()
- mock_check_call.assert_called_with(
- ["docker", "compose", "down", "mlflow", 'postgres-mlflow', "minio", "minio_client"]
+ mock_call.assert_called_with(
+ ["docker", "compose", "down", "mlflow", 'postgres-mlflow']
)
@patch("subprocess.Popen")
- @patch("subprocess.check_call")
+ @patch("subprocess.call")
@patch("socket.socket.connect_ex", return_value=1)
- def test_start_jupyter(self, mock_connect, mock_check_call, mock_popen):
+ def test_start_jupyter(self, mock_connect, mock_call, mock_popen):
self.manager.service = "jupyter"
self.manager.start()
mock_popen.assert_called_once()
- @patch("subprocess.check_call")
+ @patch("subprocess.call")
@patch("subprocess.Popen")
- def test_start_service_without_build(self, mock_popen, mock_check_call):
+ def test_start_service_without_build(self, mock_popen, mock_call):
self.manager.service = "mlflow"
self.manager.docker_build = False
self.manager.start()
- self.assertTrue(mock_check_call.called)
+ self.assertTrue(mock_call.called)
mock_popen.assert_not_called()
- @patch("subprocess.check_call")
- def test_start_with_build_and_cache(self, mock_check_call):
+ @patch("subprocess.call")
+ def test_start_with_build_and_cache(self, mock_call):
self.manager.docker_build = True
self.manager.cache = True
self.manager.service = "mlflow"
self.manager.start()
- mock_check_call.assert_any_call(["docker", "compose", "build", "mlflow", "postgres-mlflow", "minio", "minio_client"])
- mock_check_call.assert_any_call(["docker", "compose", "up", "-d", "mlflow", "postgres-mlflow", "minio", "minio_client"])
+ mock_call.assert_any_call(["docker", "compose", "build", "mlflow", "postgres-mlflow"])
+ mock_call.assert_any_call(["docker", "compose", "up", "-d", "mlflow", "postgres-mlflow"])
- @patch("subprocess.check_call")
- def test_start_with_build_no_cache(self, mock_check_call):
+ @patch("subprocess.call")
+ def test_start_with_build_no_cache(self, mock_call):
self.manager.docker_build = True
self.manager.cache = False
self.manager.service = "mlflow"
self.manager.start()
- mock_check_call.assert_any_call(["docker", "compose", "build", "--no-cache", "mlflow", "postgres-mlflow", "minio", "minio_client"])
- mock_check_call.assert_any_call(["docker", "compose", "up", "-d", "mlflow", "postgres-mlflow", "minio", "minio_client"])
+ mock_call.assert_any_call(["docker", "compose", "build", "--no-cache", "mlflow", "postgres-mlflow"])
+ mock_call.assert_any_call(["docker", "compose", "up", "-d", "mlflow", "postgres-mlflow"])
@patch("sys.argv", ["mlops_manager.py", "--start"])
@patch.object(MlopsManager, "__init__", return_value=None)
diff --git a/{{ cookiecutter.folder_name }}/tests/train/test_train.py b/{{ cookiecutter.folder_name }}/tests/train/test_train.py
index 0244151..b80a60c 100644
--- a/{{ cookiecutter.folder_name }}/tests/train/test_train.py
+++ b/{{ cookiecutter.folder_name }}/tests/train/test_train.py
@@ -1,7 +1,7 @@
# Hi, I am a test file. Please update me in the required places after you
# have updated your package.
-from unittest.mock import MagicMock, patch
+from unittest.mock import MagicMock
from {{cookiecutter.package_name}}.train.change_me_train import Trainer
@@ -25,8 +25,7 @@ def test_trainer_initialization():
assert trainer.hyperparams == hyperparams
assert trainer.trained_model_path == model_path
-@patch('{{ cookiecutter.package_name }}.train.change_me_train.mlflow.pyfunc.log_model')
-def test_training_process(mock_log_model):
+def test_training_process():
model = MagicMock()
model.fit = MagicMock()
model.predict = MagicMock(return_value=[0, 1, 0])
@@ -45,15 +44,5 @@ def test_training_process(mock_log_model):
trainer.train()
- # Enable these assertions once you have them in your code
- # model.fit.assert_called_once_with(("X_train", "y_train"))
- #
- # mock_log_model.assert_called_once()
- # _, kwargs = mock_log_model.call_args
- #
- # assert isinstance(kwargs['python_model'], mlflow.pyfunc.PythonModel)
- # assert 'code_paths' in kwargs
- # assert 'extra_pip_requirements' in kwargs
-
# Add specific assertions for your training such as predict assertions,
# mlflow logging assertions etc.
\ No newline at end of file