Skip to content

Commit 01d4ac0

Browse files
authored
fix: OPA authorization for Airflow 3 (#668)
* fix: OPA authorization for Airflow 3 * changelog * changelog * Re-add AuhtZ test * changelog
1 parent 4d35655 commit 01d4ac0

File tree

8 files changed

+193
-54
lines changed

8 files changed

+193
-54
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
### Fixed
66

77
- Don't panic on invalid authorization config. Previously, a missing OPA ConfigMap would crash the operator ([#667]).
8-
- getting_started: Add a 120 second timeout before trying to enable the DAG ([#665]).
8+
- Fix OPA authorization for Airflow 3. Airflow 3 needs to be configured via env variables, the operator now does this correctly ([#668]).
99

10-
[#665]: https://github.com/stackabletech/airflow-operator/pull/665
1110
[#667]: https://github.com/stackabletech/airflow-operator/pull/667
11+
[#668]: https://github.com/stackabletech/airflow-operator/pull/668
1212

1313
## [25.7.0] - 2025-07-23
1414

rust/operator-binary/src/airflow_controller.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -723,8 +723,13 @@ fn build_rolegroup_config_map(
723723
let mut config: BTreeMap<String, String> = BTreeMap::new();
724724

725725
// this will call default values from AirflowClientAuthenticationDetails
726-
config::add_airflow_config(&mut config, authentication_config, authorization_config)
727-
.context(ConstructConfigSnafu)?;
726+
config::add_airflow_config(
727+
&mut config,
728+
authentication_config,
729+
authorization_config,
730+
&resolved_product_image.product_version,
731+
)
732+
.context(ConstructConfigSnafu)?;
728733

729734
tracing::debug!(
730735
"Default config for {}: {:?}",

rust/operator-binary/src/config.rs

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::crd::{
1313
AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved,
1414
DEFAULT_OIDC_PROVIDER, FlaskRolesSyncMoment,
1515
},
16-
authorization::{AirflowAuthorizationResolved, OpaConfigResolved},
16+
authorization::AirflowAuthorizationResolved,
1717
};
1818

1919
pub const PYTHON_IMPORTS: &[&str] = &[
@@ -41,6 +41,7 @@ pub fn add_airflow_config(
4141
config: &mut BTreeMap<String, String>,
4242
authentication_config: &AirflowClientAuthenticationDetailsResolved,
4343
authorization_config: &AirflowAuthorizationResolved,
44+
product_version: &str,
4445
) -> Result<()> {
4546
if !config.contains_key(&*AirflowConfigOptions::AuthType.to_string()) {
4647
config.insert(
@@ -51,7 +52,7 @@ pub fn add_airflow_config(
5152
}
5253

5354
append_authentication_config(config, authentication_config)?;
54-
append_authorization_config(config, authorization_config)?;
55+
append_authorization_config(config, authorization_config, product_version);
5556

5657
Ok(())
5758
}
@@ -275,32 +276,30 @@ fn append_oidc_config(
275276
fn append_authorization_config(
276277
config: &mut BTreeMap<String, String>,
277278
authorization_config: &AirflowAuthorizationResolved,
278-
) -> Result<(), Error> {
279-
if let Some(opa_config) = &authorization_config.opa {
280-
append_opa_config(config, opa_config)?;
279+
product_version: &str,
280+
) {
281+
// See `env_vars::authorization_env_vars` for why we only care about Airflow 2
282+
if !product_version.starts_with("2.") {
283+
return;
281284
}
285+
let Some(opa_config) = &authorization_config.opa else {
286+
return;
287+
};
282288

283-
Ok(())
284-
}
285-
286-
fn append_opa_config(
287-
config: &mut BTreeMap<String, String>,
288-
opa_config: &OpaConfigResolved,
289-
) -> Result<(), Error> {
290-
config.insert(
291-
AirflowConfigOptions::AuthOpaRequestUrl.to_string(),
292-
opa_config.connection_string.to_owned(),
293-
);
294-
config.insert(
295-
AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(),
296-
opa_config.cache_entry_time_to_live.as_secs().to_string(),
297-
);
298-
config.insert(
299-
AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(),
300-
opa_config.cache_max_entries.to_string(),
301-
);
302-
303-
Ok(())
289+
config.extend([
290+
(
291+
AirflowConfigOptions::AuthOpaRequestUrl.to_string(),
292+
opa_config.connection_string.to_owned(),
293+
),
294+
(
295+
AirflowConfigOptions::AuthOpaCacheTtlInSec.to_string(),
296+
opa_config.cache_entry_time_to_live.as_secs().to_string(),
297+
),
298+
(
299+
AirflowConfigOptions::AuthOpaCacheMaxsize.to_string(),
300+
opa_config.cache_max_entries.to_string(),
301+
),
302+
]);
304303
}
305304

306305
#[cfg(test)]
@@ -325,6 +324,8 @@ mod tests {
325324
},
326325
};
327326

327+
const TEST_AIRFLOW_VERSION: &str = "3.0.1";
328+
328329
#[test]
329330
fn test_auth_db_config() {
330331
let authentication_config = AirflowClientAuthenticationDetailsResolved {
@@ -337,7 +338,13 @@ mod tests {
337338
let authorization_config = AirflowAuthorizationResolved { opa: None };
338339

339340
let mut result = BTreeMap::new();
340-
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
341+
add_airflow_config(
342+
&mut result,
343+
&authentication_config,
344+
&authorization_config,
345+
TEST_AIRFLOW_VERSION,
346+
)
347+
.expect("Ok");
341348

342349
assert_eq!(
343350
BTreeMap::from([
@@ -382,7 +389,13 @@ mod tests {
382389
let authorization_config = AirflowAuthorizationResolved { opa: None };
383390

384391
let mut result = BTreeMap::new();
385-
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
392+
add_airflow_config(
393+
&mut result,
394+
&authentication_config,
395+
&authorization_config,
396+
TEST_AIRFLOW_VERSION,
397+
)
398+
.expect("Ok");
386399

387400
assert_eq!(BTreeMap::from([
388401
("AUTH_LDAP_ALLOW_SELF_SIGNED".into(), "false".into()),
@@ -468,7 +481,13 @@ mod tests {
468481
let authorization_config = AirflowAuthorizationResolved { opa: None };
469482

470483
let mut result = BTreeMap::new();
471-
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
484+
add_airflow_config(
485+
&mut result,
486+
&authentication_config,
487+
&authorization_config,
488+
TEST_AIRFLOW_VERSION,
489+
)
490+
.expect("Ok");
472491

473492
assert_eq!(
474493
BTreeMap::from([
@@ -532,16 +551,16 @@ mod tests {
532551
};
533552

534553
let mut result = BTreeMap::new();
535-
add_airflow_config(&mut result, &authentication_config, &authorization_config).expect("Ok");
554+
add_airflow_config(
555+
&mut result,
556+
&authentication_config,
557+
&authorization_config,
558+
TEST_AIRFLOW_VERSION,
559+
)
560+
.expect("Ok");
536561

537562
assert_eq!(
538563
BTreeMap::from([
539-
("AUTH_OPA_CACHE_MAXSIZE".into(), "1000".into()),
540-
("AUTH_OPA_CACHE_TTL_IN_SEC".into(), "30".into()),
541-
(
542-
"AUTH_OPA_REQUEST_URL".into(),
543-
"http://opa:8081/v1/data/airflow".into()
544-
),
545564
("AUTH_ROLES_SYNC_AT_LOGIN".into(), "false".into()),
546565
("AUTH_TYPE".into(), "AUTH_DB".into()),
547566
("AUTH_USER_REGISTRATION".into(), "true".into()),

rust/operator-binary/src/crd/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ pub enum AirflowConfigOptions {
128128
AuthLdapTlsKeyfile,
129129
AuthLdapTlsCacertfile,
130130
AuthLdapAllowSelfSigned,
131+
// OPA configs for Airflow 2
132+
// Airflow 3 configs need to be passed via env variables!
133+
// See `env_vars::authorization_env_vars` for details
131134
AuthOpaCacheMaxsize,
132135
AuthOpaCacheTtlInSec,
133136
AuthOpaRequestUrl,

rust/operator-binary/src/env_vars.rs

Lines changed: 118 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ use crate::{
2727
};
2828

2929
const AIRFLOW_CORE_AUTH_MANAGER: &str = "AIRFLOW__CORE__AUTH_MANAGER";
30+
// Airflow 3 envs
31+
const AIRFLOW_CORE_AUTH_OPA_REQUEST_URL: &str = "AIRFLOW__CORE__AUTH_OPA_REQUEST_URL";
32+
const AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC";
33+
const AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE: &str = "AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE";
34+
3035
const AIRFLOW_LOGGING_LOGGING_CONFIG_CLASS: &str = "AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS";
3136
const AIRFLOW_METRICS_STATSD_ON: &str = "AIRFLOW__METRICS__STATSD_ON";
3237
const AIRFLOW_METRICS_STATSD_HOST: &str = "AIRFLOW__METRICS__STATSD_HOST";
@@ -231,7 +236,10 @@ pub fn build_airflow_statefulset_envs(
231236
}
232237
AirflowRole::Webserver => {
233238
let mut vars = authentication_env_vars(auth_config);
234-
vars.extend(authorization_env_vars(authorization_config));
239+
vars.extend(authorization_env_vars(
240+
authorization_config,
241+
&resolved_product_image.product_version,
242+
));
235243
env.extend(vars.into_iter().map(|var| (var.name.to_owned(), var)));
236244
}
237245
_ => {}
@@ -554,15 +562,45 @@ fn authentication_env_vars(
554562
.collect()
555563
}
556564

557-
fn authorization_env_vars(authorization_config: &AirflowAuthorizationResolved) -> Vec<EnvVar> {
558-
let mut env = vec![];
559-
560-
if authorization_config.opa.is_some() {
561-
env.push(EnvVar {
562-
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
563-
value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()),
564-
..Default::default()
565-
});
565+
/// Constructs the needed authorization env vars for the specific Airflow version.
566+
///
567+
/// `AIRFLOW__CORE__AUTH_MANAGER` always needs to be set as env var.
568+
///
569+
/// Airflow 2 needs to OPA settings in the `webserver_config.py` such as `AUTH_OPA_REQUEST_URL`.
570+
/// Airflow 3 needs to OPA settings as env variables such as `AIRFLOW__CORE__AUTH_OPA_REQUEST_URL`.
571+
fn authorization_env_vars(
572+
authorization_config: &AirflowAuthorizationResolved,
573+
product_version: &str,
574+
) -> Vec<EnvVar> {
575+
let Some(opa) = &authorization_config.opa else {
576+
return vec![];
577+
};
578+
579+
let mut env = vec![EnvVar {
580+
name: AIRFLOW_CORE_AUTH_MANAGER.into(),
581+
value: Some("opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".to_string()),
582+
..Default::default()
583+
}];
584+
if product_version.starts_with("2.") {
585+
// OPA config needs to go into `webserver_config.py`
586+
} else {
587+
env.extend([
588+
EnvVar {
589+
name: AIRFLOW_CORE_AUTH_OPA_REQUEST_URL.into(),
590+
value: Some(opa.connection_string.to_owned()),
591+
..Default::default()
592+
},
593+
EnvVar {
594+
name: AIRFLOW_CORE_AUTH_OPA_CACHE_TTL_IN_SEC.into(),
595+
value: Some(opa.cache_entry_time_to_live.as_secs().to_string()),
596+
..Default::default()
597+
},
598+
EnvVar {
599+
name: AIRFLOW_CORE_AUTH_OPA_CACHE_MAXSIZE.into(),
600+
value: Some(opa.cache_max_entries.to_string()),
601+
..Default::default()
602+
},
603+
]);
566604
}
567605

568606
env
@@ -600,3 +638,73 @@ fn execution_server_env_vars(airflow: &v1alpha1::AirflowCluster) -> BTreeMap<Str
600638

601639
env
602640
}
641+
642+
#[cfg(test)]
643+
mod tests {
644+
645+
use stackable_operator::time::Duration;
646+
647+
use super::*;
648+
use crate::crd::authorization::OpaConfigResolved;
649+
650+
#[test]
651+
fn test_airflow_2_authorization_env_vars() {
652+
let authorization_config = get_test_authorization_config();
653+
let authorization_env_vars = authorization_env_vars(&authorization_config, "2.10.5");
654+
let authorization_env_vars = authorization_env_vars
655+
.into_iter()
656+
.map(|env| (env.name, env.value.expect("env var value must be present")))
657+
.collect::<Vec<_>>();
658+
659+
assert_eq!(
660+
authorization_env_vars,
661+
[(
662+
"AIRFLOW__CORE__AUTH_MANAGER".into(),
663+
"opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into()
664+
),]
665+
);
666+
}
667+
668+
#[test]
669+
fn test_airflow_3_authorization_env_vars() {
670+
let authorization_config = get_test_authorization_config();
671+
let authorization_env_vars = authorization_env_vars(&authorization_config, "3.0.1");
672+
let authorization_env_vars = authorization_env_vars
673+
.into_iter()
674+
.map(|env| (env.name, env.value.expect("env var value must be present")))
675+
.collect::<Vec<_>>();
676+
677+
assert_eq!(
678+
authorization_env_vars,
679+
[
680+
(
681+
"AIRFLOW__CORE__AUTH_MANAGER".into(),
682+
"opa_auth_manager.opa_fab_auth_manager.OpaFabAuthManager".into()
683+
),
684+
(
685+
"AIRFLOW__CORE__AUTH_OPA_REQUEST_URL".into(),
686+
"http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".into()
687+
),
688+
(
689+
"AIRFLOW__CORE__AUTH_OPA_CACHE_TTL_IN_SEC".into(),
690+
"30".into()
691+
),
692+
(
693+
"AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE".into(),
694+
"1000".into()
695+
),
696+
]
697+
);
698+
}
699+
700+
fn get_test_authorization_config() -> AirflowAuthorizationResolved {
701+
AirflowAuthorizationResolved {
702+
opa: Some(OpaConfigResolved {
703+
connection_string:
704+
"http://opa-server.default.svc.cluster.local:8081/v1/data/airflow".to_string(),
705+
cache_entry_time_to_live: Duration::from_secs(30),
706+
cache_max_entries: 1000,
707+
}),
708+
}
709+
}
710+
}

tests/templates/kuttl/opa/20-assert.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ apiVersion: kuttl.dev/v1beta1
33
kind: TestAssert
44
timeout: 300
55
commands:
6-
- script: kubectl -n $NAMESPACE rollout status daemonset opa-server-default --timeout 300s
6+
- script: kubectl -n $NAMESPACE wait --for=condition=available --timeout=10m opacluster/test-opa

tests/templates/kuttl/opa/20-install-opa.yaml.j2

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ metadata:
77
apiVersion: opa.stackable.tech/v1alpha1
88
kind: OpaCluster
99
metadata:
10-
name: opa
10+
# The OpaCluster is intentionally not only called "opa" to ensure that our custom OPA URL is
11+
# used and not some default value of "opa".
12+
name: test-opa
1113
spec:
1214
image:
1315
{% if test_scenario['values']['opa-latest'].find(",") > 0 %}

tests/templates/kuttl/opa/30-install-airflow.yaml.j2

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ spec:
3434
clusterConfig:
3535
authorization:
3636
opa:
37-
configMapName: opa
37+
configMapName: test-opa
3838
package: airflow
3939
cache:
4040
entryTimeToLive: 5s
@@ -54,7 +54,9 @@ spec:
5454
configOverrides:
5555
webserver_config.py:
5656
WTF_CSRF_ENABLED: "False" # Allow "POST /login/" without CSRF token
57-
AUTH_OPA_CACHE_MAXSIZE_DEFAULT: "0" # disable decision caching for easy debugging
57+
AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 2: Disable decision caching for easy debugging
58+
envOverrides:
59+
AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" # Airflow 3: Disable decision caching for easy debugging
5860
roleGroups:
5961
default:
6062
replicas: 1

0 commit comments

Comments
 (0)