Instead of using the Celery workers you can let Airflow run the tasks using Kubernetes executors, where Pods are created dynamically as needed without jobs being routed through a Redis queue to the workers.
To achieve this, swap spec.celeryExecutors with spec.kubernetesExecutors.
E.g. you would change the following example
spec:
celeryExecutors:
...
roleGroups:
default:
replicas: 2
config:
resources:
# ...to
spec:
kubernetesExecutors:
config:
resources:
# ...While there are many benefits to spawning a dedicated Pod for every task, this introduces some latency. The shorter the actual task runs, the higher the effect of this latency get’s on the overall DAG runtime.
If your tasks don’t do computationally expensive things (e.g. only submit a query to a Trino cluster), you can schedule them to run locally (on the scheduler) and not spawn a Pod to reduce the DAG runtime.
To achieve this enable the LocalExecutor in your Airflow stacklet with
spec:
webservers:
envOverrides: &envOverrides
# We default our tasks to KubernetesExecutor, however, tasks can opt in to using the LocalExecutor
# See https://docs.stackable.tech/home/stable/airflow/usage-guide/using-kubernetes-executors/
AIRFLOW__CORE__EXECUTOR: KubernetesExecutor,LocalExecutor
schedulers:
envOverrides: *envOverrides
kubernetesExecutors:
envOverrides: *envOverridesAfterwards tasks can opt-in to the LocalExecutor using
@task(executor="LocalExecutor")
def hello_world():
print("hello world!")As an alternative if all tasks of your DAG should run locally, you can also configure this on a DAG level (tasks can still explicitly use KubernetesExecutor):
with DAG(
dag_id="hello_worlds",
default_args={"executor": "LocalExecutor"}, # Applies to all tasks in the Dag
) as dag:See the official Airflow documentation for details.
|
Tip
|
You might need to increase the scheduler resources, as it now runs more stuff. |
Kubernetes Executors and their respective Pods only live as long as the task they are executing. Afterwards the Pod is immediately terminated and e.g. console output or logs are gone.
In order to persist task logs, Airflow can be configured to store its executor logs on disk (PV) or as described in the following section on S3.
In the Airflow Web UI, click on Admin → Connections → Add a new record (the plus).
Then enter your MinIO host and credentials as shown.
The name or connection ID is minio, the type is Amazon Web Services, the AWS Access Key ID and AWS Secret Access Key are filled with the S3 credentials.
The Extra field contains the endpoint URL like:
{
"endpoint_url": "http://minio.default.svc.cluster.local:9000"
}In order to configure the S3 logging, you need to add the following environment variables to the Airflow cluster definition:
link:example$example-airflow-kubernetes-executor-s3-logging.yaml[role=include]Now you should be able to fetch and inspect logs in the Airflow Web UI from S3 for each DAG run.
By default Airflow stores the xCom (cross-communications) objects in the Airflow database (often times PostgreSQL), which can cause the database to run out of disk space (especially if you have high xCom traffic). To prevent this, you can configure Airflow to do the xCom exchange via an object store backend (such as S3) and not the database. You can read on details in the official documentation.
|
Tip
|
Airflow stores the xCom objects in the Postgres table To e.g. determine the biggest xCom objects you can use select
length(value) as size,
dag_run_id,
task_id,
map_index,
key,
dag_id,
run_id,
"timestamp"
from xcom
order by length(value) desc limit 20;Being careful you can also clean up the table to save space in the database.
In our testing, running |
Please add an S3 connection in the Admin WebUI according to Add S3 connection in Airflow Web UI

