Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
860 changes: 845 additions & 15 deletions deploy/helm/airflow-operator/crds/crds.yaml

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion rust/operator-binary/src/airflow_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,10 @@ fn build_server_rolegroup_statefulset(
AirflowRole::Scheduler => {
"OrderedReady" // Scheduler pods should start after another, since part of their startup phase is initializing the database, see crd/src/lib.rs
}
AirflowRole::Webserver | AirflowRole::Worker => "Parallel",
AirflowRole::Webserver
| AirflowRole::Worker
| AirflowRole::DagProcessor
| AirflowRole::Triggerer => "Parallel",
}
.to_string(),
),
Expand Down
103 changes: 95 additions & 8 deletions rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,25 @@ pub mod versioned {
#[serde(default)]
pub cluster_operation: ClusterOperation,

/// The `webserver` role provides the main UI for user interaction.
/// The `webservers` role provides the main UI for user interaction.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub webservers: Option<Role<AirflowConfigFragment, v1alpha1::WebserverRoleConfig>>,

/// The `scheduler` is responsible for triggering jobs and persisting their metadata to the backend database.
/// The `schedulers` is responsible for triggering jobs and persisting their metadata to the backend database.
/// Jobs are scheduled on the workers/executors.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schedulers: Option<Role<AirflowConfigFragment>>,

#[serde(flatten)]
pub executor: AirflowExecutor,

/// The `dagProcessors` role runs the DAG processor routine for DAG preparation.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dag_processors: Option<Role<AirflowConfigFragment>>,

/// The `triggerers` role runs the triggerer process for use with deferrable DAG operators.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub triggerers: Option<Role<AirflowConfigFragment>>,
}

#[derive(Clone, Deserialize, Debug, JsonSchema, PartialEq, Serialize)]
Expand Down Expand Up @@ -342,7 +350,10 @@ impl v1alpha1::AirflowCluster {
pub fn group_listener_name(&self, role: &AirflowRole) -> Option<String> {
match role {
AirflowRole::Webserver => Some(role_service_name(&self.name_any(), &role.to_string())),
AirflowRole::Scheduler | AirflowRole::Worker => None,
AirflowRole::Scheduler
| AirflowRole::Worker
| AirflowRole::DagProcessor
| AirflowRole::Triggerer => None,
}
}

Expand All @@ -356,6 +367,8 @@ impl v1alpha1::AirflowCluster {
.to_owned()
.map(extract_role_from_webserver_config),
AirflowRole::Scheduler => self.spec.schedulers.to_owned(),
AirflowRole::DagProcessor => self.spec.dag_processors.to_owned(),
AirflowRole::Triggerer => self.spec.triggerers.to_owned(),
AirflowRole::Worker => {
if let AirflowExecutor::CeleryExecutor { config } = &self.spec.executor {
Some(config.clone())
Expand Down Expand Up @@ -416,6 +429,24 @@ impl v1alpha1::AirflowCluster {
roles: AirflowRole::roles(),
})?
}
AirflowRole::DagProcessor => {
self.spec
.dag_processors
.as_ref()
.context(UnknownAirflowRoleSnafu {
role: role.to_string(),
roles: AirflowRole::roles(),
})?
}
AirflowRole::Triggerer => {
self.spec
.triggerers
.as_ref()
.context(UnknownAirflowRoleSnafu {
role: role.to_string(),
roles: AirflowRole::roles(),
})?
}
};

// Retrieve role resource config
Expand Down Expand Up @@ -564,6 +595,12 @@ pub enum AirflowRole {

#[strum(serialize = "worker")]
Worker,

#[strum(serialize = "dagprocessor")]
DagProcessor,

#[strum(serialize = "triggerer")]
Triggerer,
}

impl AirflowRole {
Expand Down Expand Up @@ -625,10 +662,27 @@ impl AirflowRole {
command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
"airflow dag-processor &".to_string(),
"airflow scheduler &".to_string(),
]);
if airflow.spec.dag_processors.is_none() {
// If no dag_processors role has been specified, the
// process needs to be included with the scheduler
// (with 3.x there is no longer the possibility of
// starting it as a subprocess, so it has to be
// explicitly started *somewhere*)
command.extend(vec!["airflow dag-processor &".to_string()]);
}
}
AirflowRole::DagProcessor => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
"airflow dag-processor &".to_string(),
]),
AirflowRole::Triggerer => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
"airflow triggerer &".to_string(),
]),
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
Expand Down Expand Up @@ -671,6 +725,16 @@ impl AirflowRole {
"airflow scheduler &".to_string(),
]);
}
AirflowRole::DagProcessor => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
"airflow dag-processor &".to_string(),
]),
AirflowRole::Triggerer => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
"airflow triggerer &".to_string(),
]),
AirflowRole::Worker => command.extend(vec![
"prepare_signal_handlers".to_string(),
container_debug_command(),
Expand Down Expand Up @@ -727,6 +791,8 @@ impl AirflowRole {
AirflowRole::Webserver => Some(HTTP_PORT),
AirflowRole::Scheduler => None,
AirflowRole::Worker => None,
AirflowRole::DagProcessor => None,
AirflowRole::Triggerer => None,
}
}

Expand All @@ -745,7 +811,7 @@ impl AirflowRole {
.webservers
.to_owned()
.map(|webserver| webserver.role_config.listener_class),
Self::Worker | Self::Scheduler => None,
Self::Worker | Self::Scheduler | Self::DagProcessor | Self::Triggerer => None,
}
}
}
Expand Down Expand Up @@ -881,9 +947,10 @@ impl AirflowConfig {
logging: product_logging::spec::default_logging(),
affinity: get_affinity(cluster_name, role),
graceful_shutdown_timeout: Some(match role {
AirflowRole::Webserver | AirflowRole::Scheduler => {
DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT
}
AirflowRole::Webserver
| AirflowRole::Scheduler
| AirflowRole::DagProcessor
| AirflowRole::Triggerer => DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT,
AirflowRole::Worker => DEFAULT_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT,
}),
}
Expand Down Expand Up @@ -956,6 +1023,26 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment<AirflowStorageConf
runtime_limits: NoRuntimeLimitsFragment {},
},
),
AirflowRole::DagProcessor => (
CpuLimitsFragment {
min: Some(Quantity("1".to_owned())),
max: Some(Quantity("2".to_owned())),
},
MemoryLimitsFragment {
limit: Some(Quantity("1Gi".to_owned())),
runtime_limits: NoRuntimeLimitsFragment {},
},
),
AirflowRole::Triggerer => (
CpuLimitsFragment {
min: Some(Quantity("1".to_owned())),
max: Some(Quantity("2".to_owned())),
},
MemoryLimitsFragment {
limit: Some(Quantity("1Gi".to_owned())),
runtime_limits: NoRuntimeLimitsFragment {},
},
),
};

ResourcesFragment {
Expand Down
81 changes: 45 additions & 36 deletions rust/operator-binary/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use stackable_operator::{

use crate::{
crd::{
AirflowConfig, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR,
STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME,
AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR,
TEMPLATE_LOCATION, TEMPLATE_NAME,
authentication::{
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
},
Expand Down Expand Up @@ -86,57 +86,53 @@ pub fn build_airflow_statefulset_envs(
resolved_product_image: &ResolvedProductImage,
) -> Result<Vec<EnvVar>, Error> {
let mut env: BTreeMap<String, EnvVar> = BTreeMap::new();
let secret = airflow.spec.cluster_config.credentials_secret.as_str();

env.extend(static_envs(git_sync_resources));

add_version_specific_env_vars(airflow, airflow_role, resolved_product_image, &mut env);

// environment variables
let env_vars = rolegroup_config.get(&PropertyNameKind::Env);

let secret_prop =
env_vars.and_then(|vars| vars.get(AirflowConfig::CREDENTIALS_SECRET_PROPERTY));
add_version_specific_env_vars(airflow, airflow_role, resolved_product_image, &mut env);

env.insert(
AIRFLOW_WEBSERVER_SECRET_KEY.into(),
// The secret key is used to run the webserver flask app and also used to authorize
// requests to Celery workers when logs are retrieved.
env_var_from_secret(
AIRFLOW_WEBSERVER_SECRET_KEY,
secret,
"connections.secretKey",
),
);
env.insert(
AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(),
env_var_from_secret(
AIRFLOW_DATABASE_SQL_ALCHEMY_CONN,
secret,
"connections.sqlalchemyDatabaseUri",
),
);

if let Some(secret) = secret_prop {
// Redis is only needed when celery executors are used
// see https://github.com/stackabletech/airflow-operator/issues/424 for details
if matches!(executor, AirflowExecutor::CeleryExecutor { .. }) {
env.insert(
AIRFLOW_WEBSERVER_SECRET_KEY.into(),
// The secret key is used to run the webserver flask app and also used to authorize
// requests to Celery workers when logs are retrieved.
AIRFLOW_CELERY_RESULT_BACKEND.into(),
env_var_from_secret(
AIRFLOW_WEBSERVER_SECRET_KEY,
AIRFLOW_CELERY_RESULT_BACKEND,
secret,
"connections.secretKey",
"connections.celeryResultBackend",
),
);
env.insert(
AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(),
AIRFLOW_CELERY_BROKER_URL.into(),
env_var_from_secret(
AIRFLOW_DATABASE_SQL_ALCHEMY_CONN,
AIRFLOW_CELERY_BROKER_URL,
secret,
"connections.sqlalchemyDatabaseUri",
"connections.celeryBrokerUrl",
),
);

// Redis is only needed when celery executors are used
// see https://github.com/stackabletech/airflow-operator/issues/424 for details
if matches!(executor, AirflowExecutor::CeleryExecutor { .. }) {
env.insert(
AIRFLOW_CELERY_RESULT_BACKEND.into(),
env_var_from_secret(
AIRFLOW_CELERY_RESULT_BACKEND,
secret,
"connections.celeryResultBackend",
),
);
env.insert(
AIRFLOW_CELERY_BROKER_URL.into(),
env_var_from_secret(
AIRFLOW_CELERY_BROKER_URL,
secret,
"connections.celeryBrokerUrl",
),
);
}
}

let dags_folder = get_dags_folder(git_sync_resources);
Expand Down Expand Up @@ -527,6 +523,19 @@ fn add_version_specific_env_vars(
..Default::default()
},
);
if airflow.spec.dag_processors.is_some() {
// In airflow 2.x the dag-processor can optionally be started as a
// standalone process (rather then as a scheduler subprocess),
// accompanied by this env-var being set to True.
env.insert(
"AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(),
EnvVar {
name: "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(),
value: Some("True".into()),
..Default::default()
},
);
}
}
}

Expand Down
10 changes: 10 additions & 0 deletions rust/operator-binary/src/operations/pdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub async fn add_pdbs(
let max_unavailable = pdb.max_unavailable.unwrap_or(match role {
AirflowRole::Scheduler => max_unavailable_schedulers(),
AirflowRole::Webserver => max_unavailable_webservers(),
AirflowRole::DagProcessor => max_unavailable_dag_processors(),
AirflowRole::Triggerer => max_unavailable_triggerers(),
AirflowRole::Worker => match airflow.spec.executor {
AirflowExecutor::CeleryExecutor { .. } => max_unavailable_workers(),
AirflowExecutor::KubernetesExecutor { .. } => {
Expand Down Expand Up @@ -77,3 +79,11 @@ fn max_unavailable_workers() -> u16 {
fn max_unavailable_webservers() -> u16 {
1
}

fn max_unavailable_dag_processors() -> u16 {
1
}

fn max_unavailable_triggerers() -> u16 {
1
}
42 changes: 42 additions & 0 deletions tests/templates/kuttl/smoke/40-assert.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@ status:
readyReplicas: 1
replicas: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-dagprocessor-default
spec:
template:
spec:
terminationGracePeriodSeconds: 120
status:
readyReplicas: 1
replicas: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: airflow-triggerer-default
spec:
template:
spec:
terminationGracePeriodSeconds: 120
status:
readyReplicas: 1
replicas: 1
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
Expand Down Expand Up @@ -79,3 +103,21 @@ status:
expectedPods: 1
currentHealthy: 1
disruptionsAllowed: 1
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-dagprocessor
status:
expectedPods: 1
currentHealthy: 1
disruptionsAllowed: 1
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: airflow-triggerer
status:
expectedPods: 1
currentHealthy: 1
disruptionsAllowed: 1
Loading
Loading