Skip to content

Commit 5dcc16b

Browse files
authored
environmentd: pipe internal metadata around in http/ws router for self managed auth (#34328)
fixes MaterializeInc/database-issues#9680 <!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent b24fdfd commit 5dcc16b

File tree

4 files changed

+235
-17
lines changed

4 files changed

+235
-17
lines changed

src/environmentd/src/http.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use mz_ore::metrics::MetricsRegistry;
5252
use mz_ore::now::{NowFn, SYSTEM_TIME, epoch_to_uuid_v7};
5353
use mz_ore::str::StrExt;
5454
use mz_pgwire_common::{ConnectionCounter, ConnectionHandle};
55-
use mz_repr::user::ExternalUserMetadata;
55+
use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
5656
use mz_server_core::listeners::{AllowedRoles, AuthenticatorKind, HttpRoutesEnabled};
5757
use mz_server_core::{Connection, ConnectionHandler, ReloadingSslContext, Server};
5858
use mz_sql::session::metadata::SessionMetadata;
@@ -529,9 +529,11 @@ async fn x_materialize_user_header_auth(mut req: Request, next: Next) -> impl In
529529
)));
530530
}
531531
};
532+
let superuser = matches!(username.as_str(), SYSTEM_USER_NAME);
532533
req.extensions_mut().insert(AuthedUser {
533534
name: username,
534535
external_metadata_rx: None,
536+
internal_metadata: Some(InternalUserMetadata { superuser }),
535537
});
536538
}
537539
Ok(next.run(req).await)
@@ -549,6 +551,7 @@ enum ConnProtocol {
549551
pub struct AuthedUser {
550552
name: String,
551553
external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
554+
internal_metadata: Option<InternalUserMetadata>,
552555
}
553556

554557
pub struct AuthedClient {
@@ -577,8 +580,7 @@ impl AuthedClient {
577580
user: user.name,
578581
client_ip: Some(peer_addr),
579582
external_metadata_rx: user.external_metadata_rx,
580-
//TODO(dov): Add support for internal user metadata when we support auth here
581-
internal_user_metadata: None,
583+
internal_user_metadata: user.internal_metadata,
582584
helm_chart_version,
583585
});
584586
let connection_guard = active_connection_counter.allocate_connection(session.user())?;
@@ -741,16 +743,22 @@ pub async fn handle_login(
741743
let Ok(adapter_client) = adapter_client_rx.clone().await else {
742744
return StatusCode::INTERNAL_SERVER_ERROR;
743745
};
744-
if let Err(err) = adapter_client.authenticate(&username, &password).await {
745-
warn!(?err, "HTTP login failed authentication");
746-
return StatusCode::UNAUTHORIZED;
746+
let auth_response = match adapter_client.authenticate(&username, &password).await {
747+
Ok(auth_response) => auth_response,
748+
Err(err) => {
749+
warn!(?err, "HTTP login failed authentication");
750+
return StatusCode::UNAUTHORIZED;
751+
}
747752
};
748753

749754
// Create session data
750755
let session_data = TowerSessionData {
751756
username,
752757
created_at: SystemTime::now(),
753758
last_activity: SystemTime::now(),
759+
internal_metadata: InternalUserMetadata {
760+
superuser: auth_response.superuser,
761+
},
754762
};
755763
// Store session data
756764
let session = session.and_then(|Extension(session)| Some(session));
@@ -807,6 +815,7 @@ async fn http_auth(
807815
req.extensions_mut().insert(AuthedUser {
808816
name: session_data.username,
809817
external_metadata_rx: None,
818+
internal_metadata: Some(session_data.internal_metadata),
810819
});
811820
return Ok(next.run(req).await);
812821
}
@@ -966,22 +975,21 @@ async fn auth(
966975
allowed_roles: AllowedRoles,
967976
include_www_authenticate_header: bool,
968977
) -> Result<AuthedUser, AuthError> {
969-
// TODO pass session data here?
970-
let (name, external_metadata_rx) = match authenticator {
978+
let (name, external_metadata_rx, internal_metadata) = match authenticator {
971979
Authenticator::Frontegg(frontegg) => match creds {
972980
Some(Credentials::Password { username, password }) => {
973981
let auth_session = frontegg.authenticate(&username, &password.0).await?;
974982
let name = auth_session.user().into();
975983
let external_metadata_rx = Some(auth_session.external_metadata_rx());
976-
(name, external_metadata_rx)
984+
(name, external_metadata_rx, None)
977985
}
978986
Some(Credentials::Token { token }) => {
979987
let claims = frontegg.validate_access_token(&token, None)?;
980988
let (_, external_metadata_rx) = watch::channel(ExternalUserMetadata {
981989
user_id: claims.user_id,
982990
admin: claims.is_admin,
983991
});
984-
(claims.user, Some(external_metadata_rx))
992+
(claims.user, Some(external_metadata_rx), None)
985993
}
986994
None => {
987995
return Err(AuthError::MissingHttpAuthentication {
@@ -991,10 +999,14 @@ async fn auth(
991999
},
9921000
Authenticator::Password(adapter_client) => match creds {
9931001
Some(Credentials::Password { username, password }) => {
994-
if let Err(_) = adapter_client.authenticate(&username, &password).await {
995-
return Err(AuthError::InvalidCredentials);
996-
}
997-
(username, None)
1002+
let auth_response = adapter_client
1003+
.authenticate(&username, &password)
1004+
.await
1005+
.map_err(|_| AuthError::InvalidCredentials)?;
1006+
let internal_metadata = InternalUserMetadata {
1007+
superuser: auth_response.superuser,
1008+
};
1009+
(username, None, Some(internal_metadata))
9981010
}
9991011
_ => {
10001012
return Err(AuthError::MissingHttpAuthentication {
@@ -1018,7 +1030,7 @@ async fn auth(
10181030
Some(Credentials::Password { username, .. }) => username,
10191031
_ => HTTP_DEFAULT_USER.name.to_owned(),
10201032
};
1021-
(name, None)
1033+
(name, None, None)
10221034
}
10231035
};
10241036

@@ -1027,6 +1039,7 @@ async fn auth(
10271039
Ok(AuthedUser {
10281040
name,
10291041
external_metadata_rx,
1042+
internal_metadata,
10301043
})
10311044
}
10321045

@@ -1093,6 +1106,7 @@ pub struct TowerSessionData {
10931106
username: String,
10941107
created_at: SystemTime,
10951108
last_activity: SystemTime,
1109+
internal_metadata: InternalUserMetadata,
10961110
}
10971111

10981112
#[cfg(test)]

src/environmentd/src/http/sql.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@ pub async fn handle_sql_ws(
311311
Some(AuthedUser {
312312
name: session_data.username,
313313
external_metadata_rx: None,
314+
internal_metadata: Some(session_data.internal_metadata),
314315
})
315316
} else {
316317
None

src/environmentd/tests/auth.rs

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3631,3 +3631,206 @@ async fn test_password_auth_http() {
36313631
}
36323632
}
36333633
}
3634+
3635+
/// Tests that the superuser flag is correctly propagated through HTTP/WebSocket authentication.
3636+
/// This is a regression test for a bug where WebSocket connections always had superuser=false
3637+
/// because internal_user_metadata was hardcoded to None.
3638+
#[mz_ore::test(tokio::test(flavor = "multi_thread", worker_threads = 1))]
3639+
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `OPENSSL_init_ssl` on OS `linux`
3640+
async fn test_password_auth_http_superuser() {
3641+
let metrics_registry = MetricsRegistry::new();
3642+
3643+
let server = test_util::TestHarness::default()
3644+
.with_system_parameter_default(
3645+
"log_filter".to_string(),
3646+
"mz_frontegg_auth=debug,info".to_string(),
3647+
)
3648+
.with_system_parameter_default("enable_password_auth".to_string(), "true".to_string())
3649+
.with_password_auth(Password("mz_system_password".to_owned()))
3650+
.with_metrics_registry(metrics_registry)
3651+
.start()
3652+
.await;
3653+
3654+
let mz_system_client = server
3655+
.connect()
3656+
.no_tls()
3657+
.user("mz_system")
3658+
.password("mz_system_password")
3659+
.await
3660+
.unwrap();
3661+
mz_system_client
3662+
.execute(
3663+
"CREATE ROLE superuser_role WITH LOGIN SUPERUSER PASSWORD 'super_pass'",
3664+
&[],
3665+
)
3666+
.await
3667+
.unwrap();
3668+
mz_system_client
3669+
.execute(
3670+
"CREATE ROLE normal_role WITH LOGIN PASSWORD 'normal_pass'",
3671+
&[],
3672+
)
3673+
.await
3674+
.unwrap();
3675+
3676+
let ws_url: Uri = format!("ws://{}/api/experimental/sql", server.http_local_addr())
3677+
.parse()
3678+
.unwrap();
3679+
let login_url: Uri = format!("http://{}/api/login", server.http_local_addr())
3680+
.parse()
3681+
.unwrap();
3682+
3683+
let http_client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
3684+
.pool_idle_timeout(Duration::from_secs(10))
3685+
.build_http();
3686+
3687+
fn check_superuser_via_ws_basic(ws_url: &Uri, user: &str, password: &str) -> bool {
3688+
let ws_request = ClientRequestBuilder::new(ws_url.clone());
3689+
let (mut ws, _resp) = tungstenite::connect(ws_request).unwrap();
3690+
3691+
let auth = WebSocketAuth::Basic {
3692+
user: user.to_string(),
3693+
password: Password(password.to_string()),
3694+
options: BTreeMap::default(),
3695+
};
3696+
ws.send(Message::Text(serde_json::to_string(&auth).unwrap()))
3697+
.unwrap();
3698+
3699+
loop {
3700+
let resp = ws.read().unwrap();
3701+
if let Message::Text(msg) = resp {
3702+
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
3703+
if matches!(msg, WebSocketResponse::ReadyForQuery(_)) {
3704+
break;
3705+
}
3706+
}
3707+
}
3708+
3709+
ws.send(Message::Text(
3710+
r#"{"query": "SHOW is_superuser"}"#.to_owned(),
3711+
))
3712+
.unwrap();
3713+
3714+
loop {
3715+
let resp = ws.read().unwrap();
3716+
if let Message::Text(msg) = resp {
3717+
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
3718+
if let WebSocketResponse::Row(row) = msg {
3719+
let value = row[0].as_str().unwrap();
3720+
return value == "on";
3721+
}
3722+
}
3723+
}
3724+
}
3725+
3726+
async fn check_superuser_via_ws_session(
3727+
http_client: &hyper_util::client::legacy::Client<
3728+
hyper_util::client::legacy::connect::HttpConnector,
3729+
String,
3730+
>,
3731+
login_url: &Uri,
3732+
ws_url: &Uri,
3733+
user: &str,
3734+
password: &str,
3735+
) -> bool {
3736+
let login_body = format!(r#"{{"username":"{}","password":"{}"}}"#, user, password);
3737+
let login_response = http_client
3738+
.request(
3739+
Request::post(login_url.clone())
3740+
.header("Content-Type", "application/json")
3741+
.body(login_body)
3742+
.unwrap(),
3743+
)
3744+
.await
3745+
.unwrap();
3746+
3747+
assert_eq!(login_response.status(), StatusCode::OK);
3748+
3749+
let session_cookie = login_response
3750+
.headers()
3751+
.get(SET_COOKIE)
3752+
.unwrap()
3753+
.to_str()
3754+
.unwrap()
3755+
.split("; ")
3756+
.find(|v| v.starts_with("mz_session="))
3757+
.unwrap();
3758+
3759+
let ws_request =
3760+
ClientRequestBuilder::new(ws_url.clone()).with_header("Cookie", session_cookie);
3761+
let (mut ws, _resp) = tungstenite::connect(ws_request).unwrap();
3762+
3763+
ws.send(Message::Text(r#"{"options": {}}"#.to_owned()))
3764+
.unwrap();
3765+
3766+
loop {
3767+
let resp = ws.read().unwrap();
3768+
if let Message::Text(msg) = resp {
3769+
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
3770+
if matches!(msg, WebSocketResponse::ReadyForQuery(_)) {
3771+
break;
3772+
}
3773+
}
3774+
}
3775+
3776+
ws.send(Message::Text(
3777+
r#"{"query": "SHOW is_superuser"}"#.to_owned(),
3778+
))
3779+
.unwrap();
3780+
3781+
loop {
3782+
let resp = ws.read().unwrap();
3783+
if let Message::Text(msg) = resp {
3784+
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
3785+
if let WebSocketResponse::Row(row) = msg {
3786+
let value = row[0].as_str().unwrap();
3787+
return value == "on";
3788+
}
3789+
}
3790+
}
3791+
}
3792+
3793+
// Superuser via WebSocket with Basic auth should have is_superuser=on
3794+
assert!(
3795+
check_superuser_via_ws_basic(&ws_url, "superuser_role", "super_pass"),
3796+
"superuser_role should have is_superuser=on via WebSocket Basic auth"
3797+
);
3798+
3799+
// Non-superuser via WebSocket with Basic auth should have is_superuser=off
3800+
assert!(
3801+
!check_superuser_via_ws_basic(&ws_url, "normal_role", "normal_pass"),
3802+
"normal_role should have is_superuser=off via WebSocket Basic auth"
3803+
);
3804+
3805+
// Superuser via WebSocket with session cookie should have is_superuser=on
3806+
assert!(
3807+
check_superuser_via_ws_session(
3808+
&http_client,
3809+
&login_url,
3810+
&ws_url,
3811+
"superuser_role",
3812+
"super_pass"
3813+
)
3814+
.await,
3815+
"superuser_role should have is_superuser=on via WebSocket session auth"
3816+
);
3817+
3818+
// Non-superuser via WebSocket with session cookie should have is_superuser=off
3819+
assert!(
3820+
!check_superuser_via_ws_session(
3821+
&http_client,
3822+
&login_url,
3823+
&ws_url,
3824+
"normal_role",
3825+
"normal_pass"
3826+
)
3827+
.await,
3828+
"normal_role should have is_superuser=off via WebSocket session auth"
3829+
);
3830+
3831+
// mz_system (internal user) via Basic auth should have is_superuser=on
3832+
assert!(
3833+
check_superuser_via_ws_basic(&ws_url, "mz_system", "mz_system_password"),
3834+
"mz_system should have is_superuser=on via WebSocket Basic auth"
3835+
);
3836+
}

src/repr/src/user.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10-
use serde::Serialize;
10+
use serde::{Deserialize, Serialize};
1111
use uuid::Uuid;
1212

1313
/// Metadata about a user in an external system.
@@ -19,7 +19,7 @@ pub struct ExternalUserMetadata {
1919
pub admin: bool,
2020
}
2121

22-
#[derive(Debug, Clone, Serialize)]
22+
#[derive(Debug, Clone, Serialize, Deserialize)]
2323
pub struct InternalUserMetadata {
2424
pub superuser: bool,
2525
}

0 commit comments

Comments
 (0)