diff --git a/Cargo.lock b/Cargo.lock index fe4dfa0cf6..5bf35b52dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6300,11 +6300,13 @@ dependencies = [ "async-dropper", "async-trait", "bon", + "bytemuck", "bytes", "dashmap", "flume 0.12.0", "futures", "futures-util", + "iggy_binary_protocol", "iggy_common", "mockall", "quinn", @@ -11328,6 +11330,7 @@ dependencies = [ "async_zip", "axum", "axum-server", + "bytemuck", "bytes", "chrono", "clap", diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml index cb5aabe613..8b00608fde 100644 --- a/core/common/Cargo.toml +++ b/core/common/Cargo.toml @@ -27,6 +27,9 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "README.md" +[features] +vsr = [] + [dependencies] aes-gcm = { workspace = true } aligned-vec = { workspace = true } diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs b/core/common/src/traits/binary_impls/personal_access_tokens.rs index f85f34d363..b7a46d84f3 100644 --- a/core/common/src/traits/binary_impls/personal_access_tokens.rs +++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs @@ -25,17 +25,30 @@ use crate::{ }; use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::codes::LOGIN_REGISTER_WITH_PAT_CODE; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::codes::LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE; use iggy_binary_protocol::codes::{ CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE, - GET_PERSONAL_ACCESS_TOKENS_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, + GET_PERSONAL_ACCESS_TOKENS_CODE, }; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::requests::personal_access_tokens::LoginWithPersonalAccessTokenRequest; use iggy_binary_protocol::requests::personal_access_tokens::{ CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, - GetPersonalAccessTokensRequest, LoginWithPersonalAccessTokenRequest, + GetPersonalAccessTokensRequest, }; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::requests::users::LoginRegisterWithPatRequest; use iggy_binary_protocol::responses::personal_access_tokens::create_personal_access_token::RawPersonalAccessTokenResponse; use iggy_binary_protocol::responses::personal_access_tokens::get_personal_access_tokens::GetPersonalAccessTokensResponse; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::responses::users::LoginRegisterResponse; +#[cfg(not(feature = "vsr"))] use iggy_binary_protocol::responses::users::login_user::IdentityResponse; +#[cfg(feature = "vsr")] +use secrecy::SecretString; #[async_trait::async_trait] impl PersonalAccessTokenClient for B { @@ -90,16 +103,63 @@ impl PersonalAccessTokenClient for B { &self, token: &str, ) -> Result { + #[cfg(feature = "vsr")] + { + let client_id = self.get_vsr_client_id().await?; + let response = match self + .send_raw_with_response( + LOGIN_REGISTER_WITH_PAT_CODE, + LoginRegisterWithPatRequest { + client_id, + token: SecretString::from(token.to_string()), + version: Some(env!("CARGO_PKG_VERSION").to_string()), + client_context: Some(String::new()), + } + .to_bytes(), + ) + .await + { + Ok(response) => response, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + let wire_resp = match super::decode_response::(&response) { + Ok(wire_resp) => wire_resp, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + if let Err(error) = self.bind_vsr_session(wire_resp.session).await { + self.reset_vsr_session().await?; + return Err(error); + } + self.set_state(ClientState::Authenticated).await; + self.publish_event(DiagnosticEvent::SignedIn).await; + return Ok(IdentityInfo { + user_id: wire_resp.user_id, + access_token: None, + }); + } + + #[cfg(not(feature = "vsr"))] let wire_token = WireName::new(token).map_err(|_| IggyError::InvalidFormat)?; + #[cfg(not(feature = "vsr"))] let response = self .send_raw_with_response( LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, LoginWithPersonalAccessTokenRequest { token: wire_token }.to_bytes(), ) .await?; + #[cfg(not(feature = "vsr"))] self.set_state(ClientState::Authenticated).await; + #[cfg(not(feature = "vsr"))] self.publish_event(DiagnosticEvent::SignedIn).await; + #[cfg(not(feature = "vsr"))] let wire_resp = super::decode_response::(&response)?; + #[cfg(not(feature = "vsr"))] Ok(IdentityInfo::from(wire_resp)) } } diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index 24a877a4c9..06ec4a7bc2 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -24,16 +24,29 @@ use crate::{ }; use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::codes::LOGIN_REGISTER_CODE; use iggy_binary_protocol::codes::{ CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, GET_USERS_CODE, - LOGIN_USER_CODE, LOGOUT_USER_CODE, UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE, + UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE, }; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::codes::{LOGIN_USER_CODE, LOGOUT_USER_CODE}; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::requests::users::LoginRegisterRequest; use iggy_binary_protocol::requests::users::{ ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, GetUserRequest, GetUsersRequest, - LoginUserRequest, LogoutUserRequest, UpdatePermissionsRequest, UpdateUserRequest, + UpdatePermissionsRequest, UpdateUserRequest, }; +#[cfg(not(feature = "vsr"))] +use iggy_binary_protocol::requests::users::{LoginUserRequest, LogoutUserRequest}; +#[cfg(feature = "vsr")] +use iggy_binary_protocol::responses::users::LoginRegisterResponse; +#[cfg(not(feature = "vsr"))] use iggy_binary_protocol::responses::users::login_user::IdentityResponse; use iggy_binary_protocol::responses::users::{GetUsersResponse, UserDetailsResponse}; +#[cfg(feature = "vsr")] +use secrecy::SecretString; #[async_trait::async_trait] impl UserClient for B { @@ -169,7 +182,52 @@ impl UserClient for B { } async fn login_user(&self, username: &str, password: &str) -> Result { + #[cfg(feature = "vsr")] + { + let wire_name = WireName::new(username).map_err(|_| IggyError::InvalidFormat)?; + let client_id = self.get_vsr_client_id().await?; + let response = match self + .send_raw_with_response( + LOGIN_REGISTER_CODE, + LoginRegisterRequest { + client_id, + username: wire_name, + password: SecretString::from(password.to_string()), + version: Some(env!("CARGO_PKG_VERSION").to_string()), + client_context: Some(String::new()), + } + .to_bytes(), + ) + .await + { + Ok(response) => response, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + let wire_resp = match super::decode_response::(&response) { + Ok(wire_resp) => wire_resp, + Err(error) => { + self.reset_vsr_session().await?; + return Err(error); + } + }; + if let Err(error) = self.bind_vsr_session(wire_resp.session).await { + self.reset_vsr_session().await?; + return Err(error); + } + self.set_state(ClientState::Authenticated).await; + self.publish_event(DiagnosticEvent::SignedIn).await; + return Ok(IdentityInfo { + user_id: wire_resp.user_id, + access_token: None, + }); + } + + #[cfg(not(feature = "vsr"))] let wire_name = WireName::new(username).map_err(|_| IggyError::InvalidFormat)?; + #[cfg(not(feature = "vsr"))] let response = self .send_raw_with_response( LOGIN_USER_CODE, @@ -182,18 +240,32 @@ impl UserClient for B { .to_bytes(), ) .await?; + #[cfg(not(feature = "vsr"))] self.set_state(ClientState::Authenticated).await; + #[cfg(not(feature = "vsr"))] self.publish_event(DiagnosticEvent::SignedIn).await; + #[cfg(not(feature = "vsr"))] let wire_resp = super::decode_response::(&response)?; + #[cfg(not(feature = "vsr"))] Ok(IdentityInfo::from(wire_resp)) } async fn logout_user(&self) -> Result<(), IggyError> { + #[cfg(feature = "vsr")] + { + return Err(IggyError::FeatureUnavailable); + } + + #[cfg(not(feature = "vsr"))] fail_if_not_authenticated(self).await?; + #[cfg(not(feature = "vsr"))] self.send_raw_with_response(LOGOUT_USER_CODE, LogoutUserRequest.to_bytes()) .await?; + #[cfg(not(feature = "vsr"))] self.set_state(ClientState::Connected).await; + #[cfg(not(feature = "vsr"))] self.publish_event(DiagnosticEvent::SignedOut).await; + #[cfg(not(feature = "vsr"))] Ok(()) } } diff --git a/core/common/src/traits/binary_transport.rs b/core/common/src/traits/binary_transport.rs index 4f6af3ba8d..99dd733932 100644 --- a/core/common/src/traits/binary_transport.rs +++ b/core/common/src/traits/binary_transport.rs @@ -29,4 +29,19 @@ pub trait BinaryTransport { async fn publish_event(&self, event: DiagnosticEvent); async fn send_raw_with_response(&self, code: u32, payload: Bytes) -> Result; fn get_heartbeat_interval(&self) -> IggyDuration; + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result { + Err(IggyError::FeatureUnavailable) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, _session: u64) -> Result<(), IggyError> { + Err(IggyError::FeatureUnavailable) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + Err(IggyError::FeatureUnavailable) + } } diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs index 7d8e68f7d8..d1e8110256 100644 --- a/core/harness_derive/src/attrs.rs +++ b/core/harness_derive/src/attrs.rs @@ -182,6 +182,9 @@ pub struct ServerAttrs { /// Dynamic config overrides using dot-notation paths. pub config_overrides: Vec, + /// Executable path or bare cargo binary name to launch for the test server. + pub executable_path: Option, + /// Path to a TOML config file for the server. pub config_path: Option, @@ -440,6 +443,11 @@ fn parse_server_attrs(input: ParseStream) -> syn::Result { let lit: LitStr = input.parse()?; server.config_path = Some(lit.value()); } + "executable_path" => { + input.parse::()?; + let lit: LitStr = input.parse()?; + server.executable_path = Some(lit.value()); + } _ => { input.parse::()?; let value = parse_config_value(input)?; @@ -627,6 +635,15 @@ mod tests { assert!(matches!(&segment_size.value, ConfigValue::Static(s) if s == "1MiB")); } + #[test] + fn parse_server_executable_path() { + let attrs: IggyTestAttrs = syn::parse_quote!(server(executable_path = "iggy-server-ng")); + assert_eq!( + attrs.server.executable_path.as_deref(), + Some("iggy-server-ng") + ); + } + #[test] fn parse_server_matrix() { let attrs: IggyTestAttrs = syn::parse_quote!(server(segment.size = ["512B", "1MiB"])); diff --git a/core/harness_derive/src/codegen.rs b/core/harness_derive/src/codegen.rs index 7f6e23a5e4..4acbd8c259 100644 --- a/core/harness_derive/src/codegen.rs +++ b/core/harness_derive/src/codegen.rs @@ -424,6 +424,10 @@ fn generate_harness_setup( // Always add extra_envs (may be empty) server_builder_calls.push(quote!(.extra_envs(__extra_envs))); + if let Some(ref executable_path) = attrs.server.executable_path { + server_builder_calls.push(quote!(.executable_path(#executable_path))); + } + // If a config_path is specified, inject IGGY_CONFIG_PATH into extra_envs let config_path_setup = if let Some(ref config_path) = attrs.server.config_path { quote! { diff --git a/core/integration-vsr/src/lib.rs b/core/integration-vsr/src/lib.rs new file mode 100644 index 0000000000..31bd66e6e6 --- /dev/null +++ b/core/integration-vsr/src/lib.rs @@ -0,0 +1,17 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index 9795e41daf..373b9268eb 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -26,6 +26,7 @@ publish = false # inside the docker containers. This is a temporary workaround (hopefully). [features] ci-qemu = [] +vsr = ["iggy/vsr"] [dependencies] assert_cmd = { workspace = true } diff --git a/core/integration/src/harness/config/server.rs b/core/integration/src/harness/config/server.rs index 7925e12e95..e5ffdb9b7e 100644 --- a/core/integration/src/harness/config/server.rs +++ b/core/integration/src/harness/config/server.rs @@ -56,10 +56,12 @@ mod tests { fn test_server_config_builder() { let config = TestServerConfig::builder() .quic_enabled(false) + .executable_path("iggy-server-ng") .extra_envs(HashMap::from([("FOO".to_string(), "BAR".to_string())])) .build(); assert!(!config.quic_enabled); + assert_eq!(config.executable_path.as_deref(), Some("iggy-server-ng")); assert_eq!(config.extra_envs.get("FOO"), Some(&"BAR".to_string())); } diff --git a/core/integration/src/harness/handle/server.rs b/core/integration/src/harness/handle/server.rs index 9752da94dc..466d6808f3 100644 --- a/core/integration/src/harness/handle/server.rs +++ b/core/integration/src/harness/handle/server.rs @@ -93,6 +93,26 @@ impl std::fmt::Debug for ServerHandle { } impl ServerHandle { + fn default_server_binary() -> &'static str { + #[cfg(feature = "vsr")] + { + "iggy-server-ng" + } + + #[cfg(not(feature = "vsr"))] + { + "iggy-server" + } + } + + fn launched_binary(&self) -> String { + if let Some(path) = &self.config.executable_path { + path.clone() + } else { + Self::default_server_binary().to_string() + } + } + pub fn tcp_addr(&self) -> Option { self.addrs.tcp } @@ -402,7 +422,7 @@ impl ServerHandle { { let (stdout, stderr) = self.collect_logs(); return Err(TestBinaryError::ProcessCrashed { - binary: "iggy-server".to_string(), + binary: self.launched_binary(), exit_code: status.code(), stdout, stderr, @@ -427,7 +447,7 @@ impl ServerHandle { } Err(TestBinaryError::StartupTimeout { - binary: "iggy-server".to_string(), + binary: self.launched_binary(), timeout_secs: MAX_PORT_WAIT_DURATION_S, }) } @@ -776,19 +796,32 @@ impl TestBinary for ServerHandle { } #[allow(deprecated)] + let launched_binary = self.launched_binary(); let mut command = if let Some(ref path) = self.config.executable_path { - Command::new(path) + let path_ref = Path::new(path); + if path_ref.components().count() == 1 && !path_ref.exists() { + Command::cargo_bin(path).map_err(|e| TestBinaryError::ProcessSpawn { + binary: launched_binary.clone(), + source: std::io::Error::other(e.to_string()), + })? + } else { + Command::new(path) + } } else { - Command::cargo_bin("iggy-server").map_err(|e| TestBinaryError::ProcessSpawn { - binary: "iggy-server".to_string(), + Command::cargo_bin(Self::default_server_binary()).map_err(|e| { + TestBinaryError::ProcessSpawn { + binary: launched_binary.clone(), source: std::io::Error::other(e.to_string()), + } })? }; command.env("IGGY_SYSTEM_PATH", data_path.display().to_string()); command.envs(&self.envs); - // TODO(hubcio): Remove --follower flag when proper clustering is implemented + // Legacy clustering elects node 0 externally and requires explicit followers. + // VSR/server-ng elects its own primary and should see symmetric node startup. + #[cfg(not(feature = "vsr"))] if self.server_id > 0 { command.arg("--follower"); } @@ -829,7 +862,7 @@ impl TestBinary for ServerHandle { } let child = command.spawn().map_err(|e| TestBinaryError::ProcessSpawn { - binary: "iggy-server".to_string(), + binary: launched_binary, source: e, })?; self.child_handle = Some(child); diff --git a/core/integration/src/harness/orchestrator/builder.rs b/core/integration/src/harness/orchestrator/builder.rs index 058bd6c960..0326f58533 100644 --- a/core/integration/src/harness/orchestrator/builder.rs +++ b/core/integration/src/harness/orchestrator/builder.rs @@ -250,7 +250,7 @@ fn build_servers( return Ok(Vec::new()); }; - let node_count = cluster_node_count.unwrap_or(1); + let node_count = cluster_node_count.unwrap_or_else(default_cluster_node_count); if node_count == 1 { return Ok(vec![ServerHandle::with_config(config, context.clone())]); @@ -296,6 +296,18 @@ fn build_servers( Ok(servers) } +fn default_cluster_node_count() -> usize { + #[cfg(feature = "vsr")] + { + 3 + } + + #[cfg(not(feature = "vsr"))] + { + 1 + } +} + fn build_cluster_envs( node_index: usize, cluster_name: &str, @@ -311,6 +323,11 @@ fn build_cluster_envs( envs.insert("IGGY_CLUSTER_ENABLED".to_string(), "true".to_string()); envs.insert("IGGY_CLUSTER_NAME".to_string(), cluster_name.to_string()); + #[cfg(feature = "vsr")] + envs.insert( + "IGGY_MESSAGE_BUS_RECONNECT_PERIOD".to_string(), + "100ms".to_string(), + ); // Node identity is supplied via `--replica-id` on the command line by // ServerHandle::spawn; every cluster env var emitted here is identical // across all spawned servers. @@ -329,6 +346,12 @@ fn build_cluster_envs( tcp.port().to_string(), ); } + if let Some(tcp_replica) = addrs.tcp_replica { + envs.insert( + format!("IGGY_CLUSTER_NODES_{i}_PORTS_TCP_REPLICA"), + tcp_replica.port().to_string(), + ); + } if let Some(http) = addrs.http { envs.insert( format!("IGGY_CLUSTER_NODES_{i}_PORTS_HTTP"), diff --git a/core/integration/src/harness/orchestrator/harness.rs b/core/integration/src/harness/orchestrator/harness.rs index 3018cb39fd..5fc3ceb71b 100644 --- a/core/integration/src/harness/orchestrator/harness.rs +++ b/core/integration/src/harness/orchestrator/harness.rs @@ -28,9 +28,15 @@ use crate::harness::handle::{ use crate::harness::traits::{Restartable, TestBinary}; use futures::executor::block_on; use iggy::prelude::{ClientWrapper, IggyClient}; +#[cfg(feature = "vsr")] +use iggy_common::Client; use iggy_common::TransportProtocol; use std::path::Path; use std::sync::Arc; +#[cfg(feature = "vsr")] +use std::time::{Duration, Instant}; +#[cfg(feature = "vsr")] +use tokio::time::{sleep, timeout}; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -152,6 +158,8 @@ impl TestHarness { server.start()?; } + self.wait_for_cluster_ready().await?; + if let Some(seed_fn) = seed { let client = self.tcp_root_client().await?; seed_fn(client) @@ -166,6 +174,51 @@ impl TestHarness { Ok(()) } + async fn wait_for_cluster_ready(&self) -> Result<(), TestBinaryError> { + #[cfg(not(feature = "vsr"))] + { + Ok(()) + } + + #[cfg(feature = "vsr")] + { + if self.servers.len() <= 1 { + return Ok(()); + } + + const CLUSTER_READY_TIMEOUT: Duration = Duration::from_secs(15); + const CLUSTER_READY_RETRY_INTERVAL: Duration = Duration::from_millis(200); + const LOGIN_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(750); + + let deadline = Instant::now() + CLUSTER_READY_TIMEOUT; + let mut last_error = None; + + while Instant::now() < deadline { + match timeout(LOGIN_ATTEMPT_TIMEOUT, self.tcp_root_client()).await { + Ok(Ok(client)) => { + let _ = client.disconnect().await; + return Ok(()); + } + Ok(Err(error)) => { + last_error = Some(error.to_string()); + sleep(CLUSTER_READY_RETRY_INTERVAL).await; + } + Err(_) => { + last_error = Some("login attempt timed out".to_string()); + sleep(CLUSTER_READY_RETRY_INTERVAL).await; + } + } + } + + Err(TestBinaryError::InvalidState { + message: format!( + "Timed out waiting for VSR cluster readiness: {}", + last_error.unwrap_or_else(|| "unknown error".to_string()) + ), + }) + } + } + async fn start_dependents(&mut self) -> Result<(), TestBinaryError> { for server in &mut self.servers { server.start_dependents().await?; diff --git a/core/integration/src/harness/port_reserver.rs b/core/integration/src/harness/port_reserver.rs index cc59a6d026..8f658c1794 100644 --- a/core/integration/src/harness/port_reserver.rs +++ b/core/integration/src/harness/port_reserver.rs @@ -155,6 +155,7 @@ impl ReservedPort { /// Pre-allocated ports for all enabled protocols. pub struct PortReserver { tcp: Option, + tcp_replica: Option, quic: Option, http: Option, websocket: Option, @@ -184,6 +185,7 @@ impl SinglePortReserver { #[derive(Debug, Clone)] pub struct ProtocolAddresses { pub tcp: Option, + pub tcp_replica: Option, pub quic: Option, pub http: Option, pub websocket: Option, @@ -226,6 +228,7 @@ impl PortReserver { config: &TestServerConfig, ) -> Result { let tcp = Some(ReservedPort::tcp(ip_kind)?); + let tcp_replica = Some(ReservedPort::tcp(ip_kind)?); let quic = if config.quic_enabled { Some(ReservedPort::udp(ip_kind)?) @@ -247,6 +250,7 @@ impl PortReserver { Ok(Self { tcp, + tcp_replica, quic, http, websocket, @@ -257,6 +261,7 @@ impl PortReserver { pub fn addresses(&self) -> ProtocolAddresses { ProtocolAddresses { tcp: self.tcp.as_ref().map(ReservedPort::addr), + tcp_replica: self.tcp_replica.as_ref().map(ReservedPort::addr), quic: self.quic.as_ref().map(ReservedPort::addr), http: self.http.as_ref().map(ReservedPort::addr), websocket: self.websocket.as_ref().map(ReservedPort::addr), @@ -268,6 +273,9 @@ impl PortReserver { if let Some(tcp) = self.tcp { tcp.release(); } + if let Some(tcp_replica) = self.tcp_replica { + tcp_replica.release(); + } if let Some(quic) = self.quic { quic.release(); } diff --git a/core/integration/tests/mod.rs b/core/integration/tests/mod.rs index 7c543a3338..4163aa175a 100644 --- a/core/integration/tests/mod.rs +++ b/core/integration/tests/mod.rs @@ -31,14 +31,22 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, fmt}; +#[cfg(not(feature = "vsr"))] mod cli; +#[cfg(not(feature = "vsr"))] mod cluster; +#[cfg(not(feature = "vsr"))] mod config_provider; +#[cfg(not(feature = "vsr"))] mod connectors; +#[cfg(not(feature = "vsr"))] mod data_integrity; +#[cfg(not(feature = "vsr"))] mod mcp; mod sdk; +#[cfg(not(feature = "vsr"))] mod server; +#[cfg(not(feature = "vsr"))] mod state; lazy_static! { diff --git a/core/integration/tests/sdk/hello_world.rs b/core/integration/tests/sdk/hello_world.rs new file mode 100644 index 0000000000..0c3746c215 --- /dev/null +++ b/core/integration/tests/sdk/hello_world.rs @@ -0,0 +1,37 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::*; +use integration::iggy_harness; + +#[cfg(not(feature = "vsr"))] +#[iggy_harness] +async fn hello_world(harness: &TestHarness) { + let client = harness.root_client().await.unwrap(); + client.ping().await.unwrap(); +} + +#[cfg(feature = "vsr")] +#[iggy_harness(test_client_transport = [Tcp, WebSocket])] +async fn hello_world(harness: &TestHarness) { + let client = harness.new_client().await.unwrap(); + client + .login_user(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD) + .await + .unwrap(); +} diff --git a/core/integration/tests/sdk/mod.rs b/core/integration/tests/sdk/mod.rs index 6d94bfa61f..09a6a86979 100644 --- a/core/integration/tests/sdk/mod.rs +++ b/core/integration/tests/sdk/mod.rs @@ -16,4 +16,6 @@ * under the License. */ +mod hello_world; +#[cfg(not(feature = "vsr"))] mod producer; diff --git a/core/message_bus/src/client_listener/quic.rs b/core/message_bus/src/client_listener/quic.rs index 262c030757..806ec502ae 100644 --- a/core/message_bus/src/client_listener/quic.rs +++ b/core/message_bus/src/client_listener/quic.rs @@ -41,10 +41,12 @@ use crate::lifecycle::ShutdownToken; use crate::transports::quic::{QuicTransportListener, accept_handshake}; use crate::{AcceptedQuicClientFn, AcceptedQuicConn}; +use compio::net::UdpSocket; use compio::runtime::JoinHandle; -use compio_quic::{Endpoint, ServerConfig}; +use compio_quic::{Endpoint, EndpointConfig, ServerConfig}; use futures::FutureExt; use iggy_common::IggyError; +use socket2::{Domain, Protocol, Socket, Type}; use std::net::SocketAddr; use std::time::Duration; use tracing::{debug, error, info}; @@ -59,13 +61,35 @@ use tracing::{debug, error, info}; /// /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] -pub async fn bind( +pub fn bind( addr: SocketAddr, server_config: ServerConfig, ) -> Result<(Endpoint, SocketAddr), IggyError> { - let endpoint = Endpoint::server(addr, server_config) - .await + // QUIC remains shard-0 terminal; this only enables coexistence with the + // harness's placeholder UDP socket during process startup. + // + // TODO: remove `SO_REUSEPORT` again once the integration harness stops + // holding placeholder reservation sockets open across child startup. + let socket = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP)) + .map_err(|e| IggyError::IoError(e.to_string()))?; + socket + .set_reuse_address(true) + .map_err(|e| IggyError::IoError(e.to_string()))?; + #[cfg(unix)] + socket + .set_reuse_port(true) + .map_err(|e| IggyError::IoError(e.to_string()))?; + socket + .bind(&addr.into()) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; + socket + .set_nonblocking(true) + .map_err(|e| IggyError::IoError(e.to_string()))?; + + let std_socket: std::net::UdpSocket = socket.into(); + let socket = UdpSocket::from_std(std_socket).map_err(|e| IggyError::IoError(e.to_string()))?; + let endpoint = Endpoint::new(socket, EndpointConfig::default(), Some(server_config), None) + .map_err(|e| IggyError::IoError(e.to_string()))?; let actual = endpoint .local_addr() .map_err(|e| IggyError::IoError(e.to_string()))?; diff --git a/core/message_bus/src/client_listener/tcp.rs b/core/message_bus/src/client_listener/tcp.rs index e8b3ab8cec..8edbc1c53d 100644 --- a/core/message_bus/src/client_listener/tcp.rs +++ b/core/message_bus/src/client_listener/tcp.rs @@ -28,7 +28,8 @@ use crate::AcceptedClientFn; use crate::lifecycle::ShutdownToken; -use compio::net::{SocketOpts, TcpListener}; +use crate::socket_opts::bind_reusable_tcp_listener; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -41,13 +42,8 @@ use tracing::{debug, error, info}; /// /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] -pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { - // `SO_REUSEPORT` intentionally not set: only shard 0 binds the client - // listener. The shard-0 coordinator round-robins accepts to owning - // shards via `ShardFramePayload::ClientConnectionSetup`. - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await +pub fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/client_listener/tcp_tls.rs b/core/message_bus/src/client_listener/tcp_tls.rs index 7e70044248..fce2f8ca27 100644 --- a/core/message_bus/src/client_listener/tcp_tls.rs +++ b/core/message_bus/src/client_listener/tcp_tls.rs @@ -39,8 +39,9 @@ use crate::AcceptedTlsClientFn; use crate::lifecycle::ShutdownToken; +use crate::socket_opts::bind_reusable_tcp_listener; use crate::transports::tls::{TlsServerCredentials, install_default_crypto_provider}; -use compio::net::{SocketOpts, TcpListener}; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -70,7 +71,7 @@ use tracing::{debug, error, info}; /// from `credentials` (cert / key mismatch). /// - [`IggyError::CannotBindToSocket`] if the TCP bind fails. #[allow(clippy::future_not_send)] -pub async fn bind( +pub fn bind( addr: SocketAddr, credentials: TlsServerCredentials, ) -> Result<(TcpListener, Arc, SocketAddr), IggyError> { @@ -85,9 +86,7 @@ pub async fn bind( // cannot enable it accidentally. cfg.max_early_data_size = 0; - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/client_listener/ws.rs b/core/message_bus/src/client_listener/ws.rs index 90a6aa2668..f6863e9e89 100644 --- a/core/message_bus/src/client_listener/ws.rs +++ b/core/message_bus/src/client_listener/ws.rs @@ -36,7 +36,8 @@ use crate::AcceptedWsClientFn; use crate::lifecycle::ShutdownToken; -use compio::net::{SocketOpts, TcpListener}; +use crate::socket_opts::bind_reusable_tcp_listener; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -55,13 +56,8 @@ use tracing::{debug, error, info}; /// /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] -pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { - // `SO_REUSEPORT` intentionally not set: only shard 0 binds the WS - // listener. The shard-0 coordinator round-robins accepts to owning - // shards via `ShardFramePayload::ClientWsConnectionSetup`. - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await +pub fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/client_listener/wss.rs b/core/message_bus/src/client_listener/wss.rs index 26ad26d1c2..f686c65f8b 100644 --- a/core/message_bus/src/client_listener/wss.rs +++ b/core/message_bus/src/client_listener/wss.rs @@ -44,8 +44,9 @@ //! use crate::AcceptedWssClientFn; use crate::lifecycle::ShutdownToken; +use crate::socket_opts::bind_reusable_tcp_listener; use crate::transports::tls::{TlsServerCredentials, install_default_crypto_provider}; -use compio::net::{SocketOpts, TcpListener}; +use compio::net::TcpListener; use futures::FutureExt; use iggy_common::IggyError; use std::net::SocketAddr; @@ -65,7 +66,7 @@ use tracing::{debug, error, info}; /// from `credentials` (cert / key mismatch). /// - [`IggyError::CannotBindToSocket`] if the TCP bind fails. #[allow(clippy::future_not_send)] -pub async fn bind( +pub fn bind( addr: SocketAddr, credentials: TlsServerCredentials, ) -> Result<(TcpListener, Arc, SocketAddr), IggyError> { @@ -76,9 +77,7 @@ pub async fn bind( .map_err(|e| IggyError::IoError(format!("WSS server config build failed: {e}")))?; cfg.max_early_data_size = 0; - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/replica/io.rs b/core/message_bus/src/replica/io.rs index 3e5da9fbc8..0cde3bb120 100644 --- a/core/message_bus/src/replica/io.rs +++ b/core/message_bus/src/replica/io.rs @@ -207,7 +207,7 @@ pub async fn start_on_shard_zero( ); let (replica_listener, replica_bound) = bind_replica_listener(replica_listen_addr).await?; - let (clients_listener, client_bound) = client_listener::tcp::bind(client_listen_addr).await?; + let (clients_listener, client_bound) = client_listener::tcp::bind(client_listen_addr)?; let token_for_replica = bus.token(); let on_accepted_replica_for_listener = on_accepted_replica.clone(); @@ -236,7 +236,7 @@ pub async fn start_on_shard_zero( let ws_bound = match (ws_listen_addr, on_accepted_ws_client) { (Some(addr), Some(on_accepted_ws)) => { - let (ws_listener, ws_bound) = client_listener::ws::bind(addr).await?; + let (ws_listener, ws_bound) = client_listener::ws::bind(addr)?; let token_for_ws = bus.token(); let ws_handle = compio::runtime::spawn(async move { client_listener::ws::run(ws_listener, token_for_ws, on_accepted_ws).await; @@ -257,7 +257,7 @@ pub async fn start_on_shard_zero( .map_err(|e| { IggyError::IoError(format!("QUIC server config build failed: {e}")) })?; - let (endpoint, quic_bound) = client_listener::quic::bind(addr, server_config).await?; + let (endpoint, quic_bound) = client_listener::quic::bind(addr, server_config)?; let token_for_quic = bus.token(); let handshake_grace = bus.config().handshake_grace; let quic_handle = compio::runtime::spawn(async move { @@ -285,7 +285,7 @@ pub async fn start_on_shard_zero( ) { (Some(addr), Some(creds), Some(on_accepted_tls)) => { let (listener, server_config, tls_bound) = - client_listener::tcp_tls::bind(addr, creds).await?; + client_listener::tcp_tls::bind(addr, creds)?; let token_for_tls = bus.token(); let tls_handle = compio::runtime::spawn(async move { client_listener::tcp_tls::run( @@ -308,7 +308,7 @@ pub async fn start_on_shard_zero( let wss_bound = match (wss_listen_addr, wss_credentials, on_accepted_wss_client) { (Some(addr), Some(creds), Some(on_accepted_wss)) => { let (listener, server_config, wss_bound) = - client_listener::wss::bind(addr, creds).await?; + client_listener::wss::bind(addr, creds)?; let token_for_wss = bus.token(); let wss_handle = compio::runtime::spawn(async move { client_listener::wss::run(listener, server_config, token_for_wss, on_accepted_wss) diff --git a/core/message_bus/src/replica/listener.rs b/core/message_bus/src/replica/listener.rs index 854f0c9982..6a125de74d 100644 --- a/core/message_bus/src/replica/listener.rs +++ b/core/message_bus/src/replica/listener.rs @@ -83,8 +83,9 @@ use crate::framing; use crate::lifecycle::ShutdownToken; +use crate::socket_opts::bind_reusable_tcp_listener; use crate::{AcceptedReplicaFn, GenericHeader, Message}; -use compio::net::{SocketOpts, TcpListener, TcpStream}; +use compio::net::{TcpListener, TcpStream}; use compio::runtime::JoinHandle; use futures::FutureExt; use iggy_binary_protocol::Command2; @@ -108,12 +109,7 @@ pub type MessageHandler = Rc)>; /// Returns [`IggyError::CannotBindToSocket`] if the bind fails. #[allow(clippy::future_not_send)] pub async fn bind(addr: SocketAddr) -> Result<(TcpListener, SocketAddr), IggyError> { - // `SO_REUSEPORT` intentionally not set: only shard 0 binds the replica - // listener. Kernel-level accept distribution would fight the shard-0 - // coordinator's explicit round-robin allocation. - let opts = SocketOpts::new().nodelay(true); - let listener = TcpListener::bind_with_options(addr, &opts) - .await + let listener = bind_reusable_tcp_listener(addr) .map_err(|_| IggyError::CannotBindToSocket(addr.to_string()))?; let actual = listener .local_addr() diff --git a/core/message_bus/src/socket_opts.rs b/core/message_bus/src/socket_opts.rs index d5736dbe30..8c86952126 100644 --- a/core/message_bus/src/socket_opts.rs +++ b/core/message_bus/src/socket_opts.rs @@ -23,9 +23,10 @@ //! than by `SO_KEEPALIVE`. Only `TCP_NODELAY` toggling lives in this //! module today. -use compio::net::TcpStream; -use socket2::SockRef; +use compio::net::{TcpListener, TcpStream}; +use socket2::{Domain, Protocol, SockRef, Socket, Type}; use std::io; +use std::net::SocketAddr; /// Disable Nagle on a per-connection socket. /// @@ -43,3 +44,67 @@ use std::io; pub fn apply_nodelay_for_connection(stream: &TcpStream) -> io::Result<()> { SockRef::from(stream).set_tcp_nodelay(true) } + +/// Build a TCP listener compatible with the harness's pre-bound reservation +/// sockets. +/// +/// Only shard 0 binds the real listener; `SO_REUSEPORT` is enabled here solely +/// so the server process can claim a port already held open by the integration +/// harness during startup. +/// +/// TODO: remove `SO_REUSEPORT` again once the integration harness stops +/// holding placeholder reservation sockets open across child startup. +pub fn bind_reusable_tcp_listener(addr: SocketAddr) -> io::Result { + let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; + socket.set_reuse_address(true)?; + #[cfg(all( + unix, + not(any(target_os = "illumos", target_os = "solaris", target_os = "cygwin")) + ))] + socket.set_reuse_port(true)?; + socket.bind(&addr.into())?; + socket.listen(128)?; + socket.set_nonblocking(true)?; + + let std_listener: std::net::TcpListener = socket.into(); + TcpListener::from_std(std_listener) +} + +#[cfg(test)] +mod tests { + use super::bind_reusable_tcp_listener; + use compio::runtime::Runtime; + use socket2::{Domain, Protocol, Socket, Type}; + use std::net::{Ipv4Addr, SocketAddr}; + + #[test] + fn reusable_tcp_listener_can_bind_over_reserved_port() { + let reserve_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0); + let socket = Socket::new( + Domain::for_address(reserve_addr), + Type::STREAM, + Some(Protocol::TCP), + ) + .expect("reserve socket"); + socket.set_reuse_address(true).expect("reserve reuseaddr"); + #[cfg(all( + unix, + not(any(target_os = "illumos", target_os = "solaris", target_os = "cygwin")) + ))] + socket.set_reuse_port(true).expect("reserve reuseport"); + socket.bind(&reserve_addr.into()).expect("reserve bind"); + socket.listen(1).expect("reserve listen"); + let addr = socket + .local_addr() + .expect("reserve local addr") + .as_socket() + .expect("socket addr"); + + let runtime = Runtime::new().expect("runtime"); + runtime.enter(|| { + let listener = bind_reusable_tcp_listener(addr) + .expect("second listener should bind on reserved port"); + assert_eq!(listener.local_addr().expect("listener addr"), addr); + }); + } +} diff --git a/core/message_bus/src/transports/quic.rs b/core/message_bus/src/transports/quic.rs index b4b13ffda3..a464b102cb 100644 --- a/core/message_bus/src/transports/quic.rs +++ b/core/message_bus/src/transports/quic.rs @@ -429,6 +429,8 @@ async fn reader_task( } Err(e) => { debug!(%label, %peer, "quic reader: read error: {e:?}"); + let _keep_in_tx_alive = &in_tx; + shutdown_fut.await; return; } } diff --git a/core/message_bus/tests/graceful_shutdown.rs b/core/message_bus/tests/graceful_shutdown.rs index 18038039c7..6d9cf3b006 100644 --- a/core/message_bus/tests/graceful_shutdown.rs +++ b/core/message_bus/tests/graceful_shutdown.rs @@ -32,7 +32,7 @@ use std::time::Duration; async fn drains_all_clients_within_timeout() { let bus = Rc::new(IggyMessageBus::new(0)); let on_request: RequestHandler = Rc::new(|_, _| {}); - let (listener, addr) = bind(loopback()).await.unwrap(); + let (listener, addr) = bind(loopback()).unwrap(); let token = bus.token(); let accept_delegate = install_clients_locally(bus.clone(), on_request); @@ -87,7 +87,7 @@ async fn drains_all_clients_within_timeout() { async fn connection_drain_precedes_slow_background() { let bus = Rc::new(IggyMessageBus::new(0)); let on_request: RequestHandler = Rc::new(|_, _| {}); - let (listener, addr) = bind(loopback()).await.unwrap(); + let (listener, addr) = bind(loopback()).unwrap(); let token = bus.token(); let accept_delegate = install_clients_locally(bus.clone(), on_request); diff --git a/core/message_bus/tests/quic_client_roundtrip.rs b/core/message_bus/tests/quic_client_roundtrip.rs index 2a42ba0404..1feee8d9fa 100644 --- a/core/message_bus/tests/quic_client_roundtrip.rs +++ b/core/message_bus/tests/quic_client_roundtrip.rs @@ -83,7 +83,7 @@ async fn request_reply_round_trip() { let (cert, key) = self_signed(); let server_cfg = server_config_with_cert(vec![cert.clone()], key, &QuicTuning::default()) .expect("server config"); - let (endpoint, server_addr) = bind(loopback(), server_cfg).await.expect("bind"); + let (endpoint, server_addr) = bind(loopback(), server_cfg).expect("bind"); let token = bus.token(); let on_accepted = install_quic_clients_locally(bus.clone(), on_request); @@ -160,7 +160,7 @@ async fn slow_handshake_does_not_block_subsequent_accept() { let (cert, key) = self_signed(); let server_cfg = server_config_with_cert(vec![cert.clone()], key, &QuicTuning::default()) .expect("server config"); - let (endpoint, server_addr) = bind(loopback(), server_cfg).await.expect("bind"); + let (endpoint, server_addr) = bind(loopback(), server_cfg).expect("bind"); let token = bus.token(); let on_accepted = install_quic_clients_locally(bus.clone(), on_request); diff --git a/core/message_bus/tests/tcp_client_roundtrip.rs b/core/message_bus/tests/tcp_client_roundtrip.rs index 6c8cd59859..b6a22cdf09 100644 --- a/core/message_bus/tests/tcp_client_roundtrip.rs +++ b/core/message_bus/tests/tcp_client_roundtrip.rs @@ -49,7 +49,7 @@ async fn request_reply_round_trip() { .detach(); }); - let (listener, addr) = bind(loopback()).await.expect("bind"); + let (listener, addr) = bind(loopback()).expect("bind"); let token = bus.token(); let accept_delegate = install_clients_locally(bus.clone(), on_request); let accept_handle = compio::runtime::spawn(async move { @@ -88,7 +88,7 @@ async fn unexpected_command_is_ignored() { let _ = tx.try_send(()); }); - let (listener, addr) = bind(loopback()).await.unwrap(); + let (listener, addr) = bind(loopback()).unwrap(); let token = bus.token(); let accept_delegate = install_clients_locally(bus.clone(), on_request); let accept_handle = compio::runtime::spawn(async move { diff --git a/core/message_bus/tests/tcp_tls_client_listener.rs b/core/message_bus/tests/tcp_tls_client_listener.rs index 1621e99238..b217229b24 100644 --- a/core/message_bus/tests/tcp_tls_client_listener.rs +++ b/core/message_bus/tests/tcp_tls_client_listener.rs @@ -68,7 +68,7 @@ async fn tcp_tls_client_listener_accepts_and_round_trips() { let creds = self_signed_for_loopback(); let cert_chain = creds.cert_chain.clone(); let (listener, server_cfg, server_addr) = - bind(loopback(), creds).await.expect("tls listener bind"); + bind(loopback(), creds).expect("tls listener bind"); let token = bus.token(); let on_accepted = install_tls_clients_locally(Rc::clone(&bus), on_request); let accept_handle = compio::runtime::spawn(async move { @@ -149,7 +149,7 @@ async fn slow_tls_handshake_evicts_registry() { let creds = self_signed_for_loopback(); let (listener, server_cfg, server_addr) = - bind(loopback(), creds).await.expect("tls listener bind"); + bind(loopback(), creds).expect("tls listener bind"); let token = bus.token(); let on_accepted = install_tls_clients_locally(Rc::clone(&bus), on_request); let accept_handle = compio::runtime::spawn(async move { diff --git a/core/message_bus/tests/ws_client_roundtrip.rs b/core/message_bus/tests/ws_client_roundtrip.rs index e936e33948..951c109e96 100644 --- a/core/message_bus/tests/ws_client_roundtrip.rs +++ b/core/message_bus/tests/ws_client_roundtrip.rs @@ -67,7 +67,7 @@ async fn handshake_succeeds_and_round_trip_completes() { .detach(); }); - let (listener, server_addr) = bind(loopback()).await.expect("bind"); + let (listener, server_addr) = bind(loopback()).expect("bind"); let token = bus.token(); let on_accepted = install_ws_clients_locally(bus.clone(), on_request); let accept_handle = compio::runtime::spawn(async move { @@ -124,7 +124,7 @@ async fn handshake_succeeds_without_subprotocol_header() { let bus = Rc::new(IggyMessageBus::new(0)); let on_request: RequestHandler = Rc::new(|_, _| {}); - let (listener, server_addr) = bind(loopback()).await.expect("bind"); + let (listener, server_addr) = bind(loopback()).expect("bind"); let token = bus.token(); let on_accepted = install_ws_clients_locally(bus.clone(), on_request); let accept_handle = compio::runtime::spawn(async move { diff --git a/core/message_bus/tests/wss_client_listener.rs b/core/message_bus/tests/wss_client_listener.rs index cfed3e03a0..01a25ef181 100644 --- a/core/message_bus/tests/wss_client_listener.rs +++ b/core/message_bus/tests/wss_client_listener.rs @@ -69,7 +69,7 @@ async fn wss_client_listener_accepts_and_round_trips() { let creds = self_signed_for_loopback(); let cert_chain = creds.cert_chain.clone(); let (listener, server_cfg, server_addr) = - bind(loopback(), creds).await.expect("wss listener bind"); + bind(loopback(), creds).expect("wss listener bind"); let token = bus.token(); let on_accepted = install_wss_clients_locally(Rc::clone(&bus), on_request); let accept_handle = compio::runtime::spawn(async move { @@ -148,7 +148,7 @@ async fn slow_handshake_evicts_registry() { let creds = self_signed_for_loopback(); let (listener, server_cfg, server_addr) = - bind(loopback(), creds).await.expect("wss listener bind"); + bind(loopback(), creds).expect("wss listener bind"); let token = bus.token(); let on_accepted = install_wss_clients_locally(Rc::clone(&bus), on_request); let accept_handle = compio::runtime::spawn(async move { diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 3e5487b488..5ce922bd9c 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -696,6 +696,10 @@ where let _ = sender.send(reply.clone()); } + if prepare_header.operation == Operation::Register { + continue; + } + let generic_reply = reply.into_generic(); let reply_buffers = freeze_client_reply(generic_reply); emit_sim_event(SimEventKind::ClientReplyEmitted, &event); @@ -842,6 +846,14 @@ where "submit_register_in_process: gate flipped between check and dispatch" ); self.on_replicate(prepare).await; + let mut loopback = Vec::new(); + consensus.drain_loopback_into(&mut loopback); + for message in loopback { + let prepare_ok: Message = message + .try_into_typed() + .expect("metadata loopback queue must only contain PrepareOk messages"); + self.on_ack(prepare_ok).await; + } match receiver.await { Ok(reply) => Ok(reply.header().commit), diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index adde8a848b..705f48bdb3 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -21,7 +21,7 @@ use crate::stm::snapshot::Snapshotable; use crate::{collect_handlers, define_state, impl_fill_restore}; use ahash::AHashMap; use bytes::Bytes; -use iggy_binary_protocol::WireIdentifier; +use iggy_binary_protocol::primitives::permissions::{WireGlobalPermissions, WirePermissions}; use iggy_binary_protocol::requests::personal_access_tokens::{ CreatePersonalAccessTokenRequest, DeletePersonalAccessTokenRequest, }; @@ -29,6 +29,7 @@ use iggy_binary_protocol::requests::users::{ ChangePasswordRequest, CreateUserRequest, DeleteUserRequest, UpdatePermissionsRequest, UpdateUserRequest, }; +use iggy_binary_protocol::{WireIdentifier, WireName}; use iggy_common::{ GlobalPermissions, IggyExpiry, IggyTimestamp, Permissions, PersonalAccessToken, StreamPermissions, UserId, UserStatus, @@ -121,6 +122,50 @@ impl UsersInner { } } +impl Users { + #[must_use] + pub fn read(&self, f: F) -> R + where + F: FnOnce(&UsersInner) -> R, + { + self.inner.read(f) + } + + /// Ensures a root user exists in an empty user set. + /// + /// # Panics + /// + /// Panics if `username` is not a valid wire-format username. + pub fn ensure_root_user(&self, username: &str, password_hash: &str) { + if self.read(|users| !users.items.is_empty()) { + return; + } + + let username = WireName::new(username).expect("root username must be valid"); + self.inner + .do_apply(UsersCommand::CreateUser(CreateUserRequest { + username, + password: password_hash.to_string(), + status: UserStatus::Active.as_code(), + permissions: Some(WirePermissions { + global: WireGlobalPermissions { + manage_servers: true, + read_servers: true, + manage_users: true, + read_users: true, + manage_streams: true, + read_streams: true, + manage_topics: true, + read_topics: true, + poll_messages: true, + send_messages: true, + }, + streams: Vec::new(), + }), + })); + } +} + // TODO(hubcio): Serialize proper reply (e.g. assigned user ID) instead of empty Bytes. impl StateHandler for CreateUserRequest { type State = UsersInner; diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index 2056ae56d8..091779e226 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -28,16 +28,21 @@ documentation = "https://iggy.apache.org/docs" repository = "https://github.com/apache/iggy" readme = "README.md" +[features] +vsr = ["iggy_common/vsr"] + [dependencies] async-broadcast = { workspace = true } async-dropper = { workspace = true } async-trait = { workspace = true } bon = { workspace = true } +bytemuck = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } flume = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } quinn = { workspace = true } reqwest = { workspace = true } diff --git a/core/sdk/src/lib.rs b/core/sdk/src/lib.rs index 35f0e1b732..9e738728ef 100644 --- a/core/sdk/src/lib.rs +++ b/core/sdk/src/lib.rs @@ -28,4 +28,6 @@ pub mod quic; pub mod session; pub mod stream_builder; pub mod tcp; +#[cfg(feature = "vsr")] +mod vsr; pub mod websocket; diff --git a/core/sdk/src/quic/quic_client.rs b/core/sdk/src/quic/quic_client.rs index 0824e962a4..a4d4a590b8 100644 --- a/core/sdk/src/quic/quic_client.rs +++ b/core/sdk/src/quic/quic_client.rs @@ -18,6 +18,8 @@ use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::AutoLogin; +#[cfg(feature = "vsr")] +use crate::session::ConsensusSession; use iggy_common::{BinaryClient, BinaryTransport, Client, PersonalAccessTokenClient, UserClient}; use crate::prelude::{IggyDuration, IggyError, IggyTimestamp, QuicClientConfig}; @@ -41,7 +43,9 @@ use tokio::sync::Mutex; use tokio::time::sleep; use tracing::{error, info, trace, warn}; +#[cfg(not(feature = "vsr"))] const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +#[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "Iggy"; @@ -56,6 +60,8 @@ pub struct QuicClient { pub(crate) connected_at: Mutex>, leader_redirection_state: Mutex, pub(crate) current_server_address: Mutex, + #[cfg(feature = "vsr")] + consensus_session: Arc>, } unsafe impl Send for QuicClient {} @@ -139,6 +145,32 @@ impl BinaryTransport for QuicClient { fn get_heartbeat_interval(&self) -> IggyDuration { self.config.heartbeat_interval } + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result { + Ok(self.consensus_session.lock().await.client_id()) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> { + if session == 0 { + return Err(IggyError::InvalidConfiguration); + } + + let mut consensus_session = self.consensus_session.lock().await; + if consensus_session.is_bound() { + return Err(IggyError::InvalidConfiguration); + } + + consensus_session.bind(session); + Ok(()) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + *self.consensus_session.lock().await = ConsensusSession::new(); + Ok(()) + } } impl BinaryClient for QuicClient {} @@ -205,6 +237,8 @@ impl QuicClient { connected_at: Mutex::new(None), leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), current_server_address: Mutex::new(server_address), + #[cfg(feature = "vsr")] + consensus_session: Arc::new(Mutex::new(ConsensusSession::new())), }) } @@ -234,34 +268,43 @@ impl QuicClient { return Err(IggyError::EmptyResponse); } - let status = u32::from_le_bytes( - buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - if status != 0 { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status) + #[cfg(feature = "vsr")] + { + crate::vsr::decode_response(&buffer) + } + + #[cfg(not(feature = "vsr"))] + { + let status = u32::from_le_bytes( + buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, ); + if status != 0 { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status) + ); - return Err(IggyError::from_code(status)); - } + return Err(IggyError::from_code(status)); + } - let length = u32::from_le_bytes( - buffer[4..RESPONSE_INITIAL_BYTES_LENGTH] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - trace!("Status: OK. Response length: {}", length); - if length <= 1 { - return Ok(Bytes::new()); - } + let length = u32::from_le_bytes( + buffer[4..RESPONSE_INITIAL_BYTES_LENGTH] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + trace!("Status: OK. Response length: {}", length); + if length <= 1 { + return Ok(Bytes::new()); + } - Ok(Bytes::copy_from_slice( - &buffer[RESPONSE_INITIAL_BYTES_LENGTH..RESPONSE_INITIAL_BYTES_LENGTH + length as usize], - )) + Ok(Bytes::copy_from_slice( + &buffer[RESPONSE_INITIAL_BYTES_LENGTH + ..RESPONSE_INITIAL_BYTES_LENGTH + length as usize], + )) + } } async fn connect(&self) -> Result<(), IggyError> { @@ -469,6 +512,8 @@ impl QuicClient { } self.endpoint.wait_idle().await; + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.set_state(ClientState::Shutdown).await; self.publish_event(DiagnosticEvent::Shutdown).await; info!("{NAME} QUIC client has been shutdown."); @@ -487,6 +532,8 @@ impl QuicClient { self.set_state(ClientState::Disconnected).await; self.connection.lock().await.take(); self.endpoint.wait_idle().await; + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); info!( @@ -521,26 +568,42 @@ impl QuicClient { let connection = self.connection.clone(); let response_buffer_size = self.config.response_buffer_size; + #[cfg(feature = "vsr")] + let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `connection` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { let connection = connection.lock().await; if let Some(connection) = connection.as_ref() { + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = consensus_session.lock().await; + crate::vsr::encode_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; let (mut send, mut recv) = connection.open_bi().await.map_err(|error| { error!("Failed to open a bidirectional stream: {error}"); IggyError::QuicError })?; trace!("Sending a QUIC request with code: {code}"); + #[cfg(feature = "vsr")] + send.write_all(&request).await.map_err(|error| { + error!("Failed to write VSR request: {error}"); + IggyError::QuicError + })?; + #[cfg(not(feature = "vsr"))] send.write_all(&(payload_length as u32).to_le_bytes()) .await .map_err(|error| { error!("Failed to write payload length: {error}"); IggyError::QuicError })?; + #[cfg(not(feature = "vsr"))] send.write_all(&code.to_le_bytes()).await.map_err(|error| { error!("Failed to write payload code: {error}"); IggyError::QuicError })?; + #[cfg(not(feature = "vsr"))] send.write_all(&payload).await.map_err(|error| { error!("Failed to write payload: {error}"); IggyError::QuicError diff --git a/core/sdk/src/tcp/tcp_client.rs b/core/sdk/src/tcp/tcp_client.rs index 059ca4d02f..3d5d3c0b40 100644 --- a/core/sdk/src/tcp/tcp_client.rs +++ b/core/sdk/src/tcp/tcp_client.rs @@ -19,16 +19,19 @@ use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; use crate::prelude::Client; use crate::prelude::TcpClientConfig; +#[cfg(feature = "vsr")] +use crate::session::ConsensusSession; use crate::tcp::tcp_connection_stream::TcpConnectionStream; use crate::tcp::tcp_connection_stream_kind::ConnectionStreamKind; use crate::tcp::tcp_tls_connection_stream::TcpTlsConnectionStream; use async_broadcast::{Receiver, Sender, broadcast}; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; +#[cfg(not(feature = "vsr"))] +use iggy_common::IggyErrorDiscriminants; use iggy_common::{ AutoLogin, ClientState, ConnectionString, ConnectionStringUtils, Credentials, DiagnosticEvent, - IggyDuration, IggyError, IggyErrorDiscriminants, IggyTimestamp, TcpConnectionStringOptions, - TransportProtocol, + IggyDuration, IggyError, IggyTimestamp, TcpConnectionStringOptions, TransportProtocol, }; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use rustls::pki_types::{CertificateDer, ServerName, pem::PemObject}; @@ -42,7 +45,9 @@ use tokio::time::sleep; use tokio_rustls::{TlsConnector, TlsStream}; use tracing::{error, info, trace, warn}; +#[cfg(not(feature = "vsr"))] const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +#[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "Iggy"; @@ -58,6 +63,8 @@ pub struct TcpClient { pub(crate) connected_at: Mutex>, leader_redirection_state: Mutex, pub(crate) current_server_address: Mutex, + #[cfg(feature = "vsr")] + consensus_session: Arc>, } impl Default for TcpClient { @@ -144,6 +151,32 @@ impl BinaryTransport for TcpClient { fn get_heartbeat_interval(&self) -> IggyDuration { self.config.heartbeat_interval } + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result { + Ok(self.consensus_session.lock().await.client_id()) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> { + if session == 0 { + return Err(IggyError::InvalidConfiguration); + } + + let mut consensus_session = self.consensus_session.lock().await; + if consensus_session.is_bound() { + return Err(IggyError::InvalidConfiguration); + } + + consensus_session.bind(session); + Ok(()) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + *self.consensus_session.lock().await = ConsensusSession::new(); + Ok(()) + } } impl BinaryClient for TcpClient {} @@ -203,9 +236,12 @@ impl TcpClient { connected_at: Mutex::new(None), leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), current_server_address: Mutex::new(server_address), + #[cfg(feature = "vsr")] + consensus_session: Arc::new(Mutex::new(ConsensusSession::new())), }) } + #[cfg(not(feature = "vsr"))] async fn handle_response( status: u32, length: u32, @@ -510,6 +546,8 @@ impl TcpClient { info!("{NAME} client: {client_address} is disconnecting from server..."); self.set_state(ClientState::Disconnected).await; self.stream.lock().await.take(); + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); info!("{NAME} client: {client_address} has disconnected from server at: {now}."); @@ -527,6 +565,8 @@ impl TcpClient { if let Some(mut stream) = stream { stream.shutdown().await?; } + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.set_state(ClientState::Shutdown).await; self.publish_event(DiagnosticEvent::Shutdown).await; info!("{NAME} TCP client: {client_address} has been shutdown."); @@ -551,43 +591,103 @@ impl TcpClient { } let stream = self.stream.clone(); + #[cfg(feature = "vsr")] + let consensus_session = self.consensus_session.clone(); // SAFETY: we run code holding the `stream` lock in a task so we can't be cancelled while holding the lock. tokio::spawn(async move { let mut stream = stream.lock().await; if let Some(stream) = stream.as_mut() { + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = consensus_session.lock().await; + crate::vsr::encode_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(feature = "vsr")] + trace!( + "Sending a TCP VSR request of size {} with code: {code}", + request.len() + ); + #[cfg(not(feature = "vsr"))] trace!("Sending a TCP request of size {payload_length} with code: {code}"); + #[cfg(feature = "vsr")] + stream.write(&request).await?; + #[cfg(not(feature = "vsr"))] stream.write(&(payload_length as u32).to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] stream.write(&code.to_le_bytes()).await?; + #[cfg(not(feature = "vsr"))] stream.write(&payload).await?; stream.flush().await?; trace!("Sent a TCP request with code: {code}, waiting for a response..."); - let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { - error!( - "Failed to read response for TCP request with code: {code}: {error}", - code = code, - error = error - ); - IggyError::Disconnected - })?; + #[cfg(feature = "vsr")] + { + let mut response_header = [0u8; iggy_binary_protocol::HEADER_SIZE]; + let read_bytes = stream.read(&mut response_header).await.map_err(|error| { + error!( + "Failed to read VSR response header for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + + if read_bytes != iggy_binary_protocol::HEADER_SIZE { + error!("Received an invalid or empty VSR response header."); + return Err(IggyError::EmptyResponse); + } + + let response_size = crate::vsr::response_size(&response_header)?; + let mut response = BytesMut::with_capacity(response_size); + response.put_slice(&response_header); + + if response_size > iggy_binary_protocol::HEADER_SIZE { + let body_size = response_size - iggy_binary_protocol::HEADER_SIZE; + let mut body = vec![0u8; body_size]; + stream.read(&mut body).await.map_err(|error| { + error!( + "Failed to read VSR response body for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + response.put_slice(&body); + } - if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { - error!("Received an invalid or empty response."); - return Err(IggyError::EmptyResponse); + return crate::vsr::decode_response(&response.freeze()); } - let status = u32::from_le_bytes( - response_buffer[..4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let length = u32::from_le_bytes( - response_buffer[4..] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - return TcpClient::handle_response(status, length, stream).await; + #[cfg(not(feature = "vsr"))] + { + let mut response_buffer = [0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + let read_bytes = stream.read(&mut response_buffer).await.map_err(|error| { + error!( + "Failed to read response for TCP request with code: {code}: {error}", + code = code, + error = error + ); + IggyError::Disconnected + })?; + + if read_bytes != RESPONSE_INITIAL_BYTES_LENGTH { + error!("Received an invalid or empty response."); + return Err(IggyError::EmptyResponse); + } + + let status = u32::from_le_bytes( + response_buffer[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + let length = u32::from_le_bytes( + response_buffer[4..] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ); + return TcpClient::handle_response(status, length, stream).await; + } } error!("Cannot send data. Client is not connected."); diff --git a/core/sdk/src/vsr.rs b/core/sdk/src/vsr.rs new file mode 100644 index 0000000000..1e4b19b333 --- /dev/null +++ b/core/sdk/src/vsr.rs @@ -0,0 +1,254 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::session::ConsensusSession; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy_binary_protocol::codec::WireDecode; +use iggy_binary_protocol::codes::{ + DELETE_CONSUMER_OFFSET_2_CODE, DELETE_CONSUMER_OFFSET_CODE, DELETE_SEGMENTS_CODE, + LOGIN_REGISTER_CODE, LOGIN_REGISTER_WITH_PAT_CODE, SEND_MESSAGES_CODE, + STORE_CONSUMER_OFFSET_2_CODE, STORE_CONSUMER_OFFSET_CODE, +}; +use iggy_binary_protocol::consensus::{ + Command2, HEADER_SIZE, Operation, ReplyHeader, RequestHeader, read_size_field, +}; +use iggy_binary_protocol::requests::consumer_offsets::{ + DeleteConsumerOffset2Request, DeleteConsumerOffsetRequest, StoreConsumerOffset2Request, + StoreConsumerOffsetRequest, +}; +use iggy_binary_protocol::requests::messages::SendMessagesHeader; +use iggy_binary_protocol::requests::segments::DeleteSegmentsRequest; +use iggy_binary_protocol::{WireIdentifier, WirePartitioning}; +use iggy_common::sharding::IggyNamespace; +use iggy_common::{IggyError, IggyTimestamp}; + +pub(crate) fn encode_request( + session: &mut ConsensusSession, + code: u32, + payload: &Bytes, +) -> Result { + let (operation, request_id, session_id) = match code { + LOGIN_REGISTER_CODE | LOGIN_REGISTER_WITH_PAT_CODE => { + (Operation::Register, session.register_request_id(), 0) + } + _ => { + let operation = + Operation::from_command_code(code).ok_or(IggyError::FeatureUnavailable)?; + let session_id = session.session().ok_or(IggyError::Unauthenticated)?; + (operation, session.next_request_id(), session_id) + } + }; + let namespace = namespace_for_request(code, payload, operation)?; + let total_size = HEADER_SIZE + .checked_add(payload.len()) + .ok_or(IggyError::InvalidConfiguration)?; + let header = RequestHeader { + command: Command2::Request, + operation, + size: total_size as u32, + client: session.client_id(), + request: request_id, + session: session_id, + namespace, + timestamp: IggyTimestamp::now().as_micros(), + ..Default::default() + }; + + let mut request = BytesMut::with_capacity(total_size); + request.put_slice(bytemuck::bytes_of(&header)); + request.put_slice(payload); + Ok(request.freeze()) +} + +pub(crate) fn response_size(header: &[u8]) -> Result { + let size = read_size_field(header).ok_or(IggyError::InvalidCommand)? as usize; + if size < HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + Ok(size) +} + +pub(crate) fn decode_response(response: &[u8]) -> Result { + if response.len() < HEADER_SIZE { + return Err(IggyError::EmptyResponse); + } + + let header = bytemuck::checked::try_from_bytes::(&response[..HEADER_SIZE]) + .map_err(|_| IggyError::InvalidCommand)?; + if header.command != Command2::Reply { + return Err(IggyError::InvalidCommand); + } + + let total_size = header.size as usize; + if total_size < HEADER_SIZE || response.len() < total_size { + return Err(IggyError::InvalidCommand); + } + + Ok(Bytes::copy_from_slice(&response[HEADER_SIZE..total_size])) +} + +fn namespace_for_request( + code: u32, + payload: &Bytes, + operation: Operation, +) -> Result { + // Control-plane requests do not target a concrete stream/topic/partition shard. + // They are encoded with namespace 0 and routed through metadata/shard 0. + if operation == Operation::Register || operation.is_metadata() { + return Ok(0); + } + + let namespace = match code { + SEND_MESSAGES_CODE => { + if payload.len() < 4 { + return Err(IggyError::InvalidCommand); + } + let metadata_length = u32::from_le_bytes( + payload[..4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ) as usize; + if payload.len() < 4 + metadata_length { + return Err(IggyError::InvalidCommand); + } + let header = SendMessagesHeader::decode_from(&payload[4..4 + metadata_length]) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partitioning(&header.stream_id, &header.topic_id, &header.partitioning)? + } + STORE_CONSUMER_OFFSET_CODE => { + let request = StoreConsumerOffsetRequest::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + DELETE_CONSUMER_OFFSET_CODE => { + let request = DeleteConsumerOffsetRequest::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + STORE_CONSUMER_OFFSET_2_CODE => { + let request = StoreConsumerOffset2Request::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + DELETE_CONSUMER_OFFSET_2_CODE => { + let request = DeleteConsumerOffset2Request::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition(&request.stream_id, &request.topic_id, request.partition_id)? + } + DELETE_SEGMENTS_CODE => { + let request = DeleteSegmentsRequest::decode_from(payload) + .map_err(|_| IggyError::InvalidCommand)?; + namespace_from_partition( + &request.stream_id, + &request.topic_id, + Some(request.partition_id), + )? + } + _ => return Err(IggyError::FeatureUnavailable), + }; + + Ok(namespace) +} + +fn namespace_from_partitioning( + stream_id: &WireIdentifier, + topic_id: &WireIdentifier, + partitioning: &WirePartitioning, +) -> Result { + let WirePartitioning::PartitionId(partition_id) = partitioning else { + return Err(IggyError::FeatureUnavailable); + }; + namespace_from_partition(stream_id, topic_id, Some(*partition_id)) +} + +fn namespace_from_partition( + stream_id: &WireIdentifier, + topic_id: &WireIdentifier, + partition_id: Option, +) -> Result { + let stream_id = stream_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; + let topic_id = topic_id.as_u32().ok_or(IggyError::FeatureUnavailable)?; + let partition_id = partition_id.ok_or(IggyError::FeatureUnavailable)?; + Ok(IggyNamespace::new(stream_id as usize, topic_id as usize, partition_id as usize).inner()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::session::ConsensusSession; + use iggy_binary_protocol::codes::{CREATE_STREAM_CODE, GET_STREAM_CODE}; + use iggy_binary_protocol::consensus::{Message, RequestHeader, iobuf::Owned}; + use iggy_binary_protocol::requests::streams::CreateStreamRequest; + use iggy_binary_protocol::requests::users::LoginRegisterRequest; + use iggy_binary_protocol::{WireEncode, WireName}; + use secrecy::SecretString; + + fn decode_request(bytes: &Bytes) -> Message { + Message::try_from(Owned::<4096>::copy_from_slice(bytes.as_ref())).unwrap() + } + + #[test] + fn register_request_uses_zero_request_and_session() { + let mut session = ConsensusSession::with_client_id(7); + let request = LoginRegisterRequest { + client_id: 7, + username: WireName::new("admin").unwrap(), + password: SecretString::from("secret"), + version: None, + client_context: None, + }; + + let bytes = encode_request(&mut session, LOGIN_REGISTER_CODE, &request.to_bytes()).unwrap(); + let request = decode_request(&bytes); + let header = request.header(); + + assert_eq!(header.operation, Operation::Register); + assert_eq!(header.request, 0); + assert_eq!(header.session, 0); + assert_eq!(header.client, 7); + assert_eq!(header.namespace, 0); + } + + #[test] + fn replicated_request_increments_request_counter() { + let mut session = ConsensusSession::with_client_id(42); + let _ = session.register_request_id(); + session.bind(99); + let payload = CreateStreamRequest { + name: WireName::new("stream").unwrap(), + } + .to_bytes(); + + let first = encode_request(&mut session, CREATE_STREAM_CODE, &payload).unwrap(); + let second = encode_request(&mut session, CREATE_STREAM_CODE, &payload).unwrap(); + + assert_eq!(decode_request(&first).header().request, 1); + assert_eq!(decode_request(&second).header().request, 2); + assert_eq!(decode_request(&second).header().session, 99); + assert_eq!(decode_request(&second).header().namespace, 0); + } + + #[test] + fn unsupported_non_replicated_request_is_rejected() { + let mut session = ConsensusSession::new(); + assert!(matches!( + encode_request(&mut session, GET_STREAM_CODE, &Bytes::new()), + Err(IggyError::FeatureUnavailable) + )); + } +} diff --git a/core/sdk/src/websocket/websocket_client.rs b/core/sdk/src/websocket/websocket_client.rs index 5dd187a885..395c76884d 100644 --- a/core/sdk/src/websocket/websocket_client.rs +++ b/core/sdk/src/websocket/websocket_client.rs @@ -17,6 +17,8 @@ */ use crate::leader_aware::{LeaderRedirectionState, check_and_redirect_to_leader}; +#[cfg(feature = "vsr")] +use crate::session::ConsensusSession; use crate::websocket::websocket_connection_stream::WebSocketConnectionStream; use crate::websocket::websocket_stream_kind::WebSocketStreamKind; use crate::websocket::websocket_tls_connection_stream::WebSocketTlsConnectionStream; @@ -26,10 +28,11 @@ use crate::prelude::Client; use async_broadcast::{Receiver, Sender, broadcast}; use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; +#[cfg(not(feature = "vsr"))] +use iggy_common::IggyErrorDiscriminants; use iggy_common::{ AutoLogin, ClientState, ConnectionString, Credentials, DiagnosticEvent, IggyDuration, - IggyError, IggyErrorDiscriminants, IggyTimestamp, WebSocketClientConfig, - WebSocketConnectionStringOptions, + IggyError, IggyTimestamp, WebSocketClientConfig, WebSocketConnectionStringOptions, }; use iggy_common::{BinaryClient, BinaryTransport, PersonalAccessTokenClient, UserClient}; use secrecy::ExposeSecret; @@ -44,7 +47,9 @@ use tokio_tungstenite::{ }; use tracing::{debug, error, info, trace, warn}; +#[cfg(not(feature = "vsr"))] const REQUEST_INITIAL_BYTES_LENGTH: usize = 4; +#[cfg(not(feature = "vsr"))] const RESPONSE_INITIAL_BYTES_LENGTH: usize = 8; const NAME: &str = "WebSocket"; @@ -58,6 +63,8 @@ pub struct WebSocketClient { pub(crate) connected_at: Mutex>, leader_redirection_state: Mutex, pub(crate) current_server_address: Mutex, + #[cfg(feature = "vsr")] + consensus_session: Arc>, } impl Default for WebSocketClient { @@ -145,6 +152,32 @@ impl BinaryTransport for WebSocketClient { fn get_heartbeat_interval(&self) -> IggyDuration { self.config.heartbeat_interval } + + #[cfg(feature = "vsr")] + async fn get_vsr_client_id(&self) -> Result { + Ok(self.consensus_session.lock().await.client_id()) + } + + #[cfg(feature = "vsr")] + async fn bind_vsr_session(&self, session: u64) -> Result<(), IggyError> { + if session == 0 { + return Err(IggyError::InvalidConfiguration); + } + + let mut consensus_session = self.consensus_session.lock().await; + if consensus_session.is_bound() { + return Err(IggyError::InvalidConfiguration); + } + + consensus_session.bind(session); + Ok(()) + } + + #[cfg(feature = "vsr")] + async fn reset_vsr_session(&self) -> Result<(), IggyError> { + *self.consensus_session.lock().await = ConsensusSession::new(); + Ok(()) + } } impl BinaryClient for WebSocketClient {} @@ -163,6 +196,8 @@ impl WebSocketClient { connected_at: Mutex::new(None), leader_redirection_state: Mutex::new(LeaderRedirectionState::new()), current_server_address: Mutex::new(server_address), + #[cfg(feature = "vsr")] + consensus_session: Arc::new(Mutex::new(ConsensusSession::new())), }) } @@ -520,6 +555,8 @@ impl WebSocketClient { self.set_state(ClientState::Disconnected).await; self.stream.lock().await.take(); + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.publish_event(DiagnosticEvent::Disconnected).await; let now = IggyTimestamp::now(); @@ -542,6 +579,8 @@ impl WebSocketClient { let _ = stream.shutdown().await; } + #[cfg(feature = "vsr")] + self.reset_vsr_session().await?; self.set_state(ClientState::Shutdown).await; self.publish_event(DiagnosticEvent::Shutdown).await; info!("{NAME} client: {client_address} has been shutdown."); @@ -571,10 +610,20 @@ impl WebSocketClient { IggyError::NotConnected })?; + #[cfg(feature = "vsr")] + let request = { + let mut consensus_session = self.consensus_session.lock().await; + crate::vsr::encode_request(&mut consensus_session, code, &payload)? + }; + #[cfg(not(feature = "vsr"))] let payload_length = payload.len() + REQUEST_INITIAL_BYTES_LENGTH; + #[cfg(not(feature = "vsr"))] let mut request = BytesMut::with_capacity(4 + REQUEST_INITIAL_BYTES_LENGTH + payload.len()); + #[cfg(not(feature = "vsr"))] request.put_u32_le(payload_length as u32); + #[cfg(not(feature = "vsr"))] request.put_u32_le(code); + #[cfg(not(feature = "vsr"))] request.put_slice(&payload); trace!( @@ -586,61 +635,83 @@ impl WebSocketClient { stream.write(&request).await?; stream.flush().await?; - let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; - stream.read(&mut response_initial_buffer).await?; + #[cfg(feature = "vsr")] + { + let mut response_header = vec![0u8; iggy_binary_protocol::HEADER_SIZE]; + stream.read(&mut response_header).await?; + + let response_size = crate::vsr::response_size(&response_header)?; + let mut response = BytesMut::with_capacity(response_size); + response.put_slice(&response_header); + + if response_size > iggy_binary_protocol::HEADER_SIZE { + let mut response_body = + vec![0u8; response_size - iggy_binary_protocol::HEADER_SIZE]; + stream.read(&mut response_body).await?; + response.put_slice(&response_body); + } - let status = u32::from_le_bytes([ - response_initial_buffer[0], - response_initial_buffer[1], - response_initial_buffer[2], - response_initial_buffer[3], - ]); + crate::vsr::decode_response(&response.freeze()) + } - let length = u32::from_le_bytes([ - response_initial_buffer[4], - response_initial_buffer[5], - response_initial_buffer[6], - response_initial_buffer[7], - ]) as usize; + #[cfg(not(feature = "vsr"))] + { + let mut response_initial_buffer = vec![0u8; RESPONSE_INITIAL_BYTES_LENGTH]; + stream.read(&mut response_initial_buffer).await?; + + let status = u32::from_le_bytes([ + response_initial_buffer[0], + response_initial_buffer[1], + response_initial_buffer[2], + response_initial_buffer[3], + ]); + + let length = u32::from_le_bytes([ + response_initial_buffer[4], + response_initial_buffer[5], + response_initial_buffer[6], + response_initial_buffer[7], + ]) as usize; + + trace!( + "Received {NAME} response status: {}, length: {} bytes", + status, length + ); - trace!( - "Received {NAME} response status: {}, length: {} bytes", - status, length - ); + if status != 0 { + // TEMP: See https://github.com/apache/iggy/pull/604 for context. + if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 + || status == IggyErrorDiscriminants::UserAlreadyExists as u32 + || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 + || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 + { + debug!( + "Received a server resource already exists response: {} ({})", + status, + IggyError::from_code_as_string(status) + ) + } else { + error!( + "Received an invalid response with status: {} ({}).", + status, + IggyError::from_code_as_string(status), + ); + } - if status != 0 { - // TEMP: See https://github.com/apache/iggy/pull/604 for context. - if status == IggyErrorDiscriminants::TopicNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::StreamNameAlreadyExists as u32 - || status == IggyErrorDiscriminants::UserAlreadyExists as u32 - || status == IggyErrorDiscriminants::PersonalAccessTokenAlreadyExists as u32 - || status == IggyErrorDiscriminants::ConsumerGroupNameAlreadyExists as u32 - { - debug!( - "Received a server resource already exists response: {} ({})", - status, - IggyError::from_code_as_string(status) - ) - } else { - error!( - "Received an invalid response with status: {} ({}).", - status, - IggyError::from_code_as_string(status), - ); + return Err(IggyError::from_code(status)); } - return Err(IggyError::from_code(status)); - } - - if length == 0 { - return Ok(Bytes::new()); - } + if length == 0 { + return Ok(Bytes::new()); + } - let mut response_buffer = vec![0u8; length]; - stream.read(&mut response_buffer).await?; + let mut response_buffer = vec![0u8; length]; + stream.read(&mut response_buffer).await?; - trace!("Received {NAME} response payload, size: {} bytes", length); - Ok(Bytes::from(response_buffer)) + trace!("Received {NAME} response payload, size: {} bytes", length); + Ok(Bytes::from(response_buffer)) + } } } diff --git a/core/server-ng/Cargo.toml b/core/server-ng/Cargo.toml index d11700a198..e11d58bffb 100644 --- a/core/server-ng/Cargo.toml +++ b/core/server-ng/Cargo.toml @@ -100,6 +100,7 @@ async_zip = { workspace = true } axum = { workspace = true } axum-server = { workspace = true } bytes = { workspace = true } +bytemuck = { workspace = true } chrono = { workspace = true } clap = { workspace = true } compio = { workspace = true } diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs index 816a464f5e..bedd73393f 100644 --- a/core/server-ng/src/bootstrap.rs +++ b/core/server-ng/src/bootstrap.rs @@ -18,14 +18,21 @@ */ use crate::config_writer::write_current_config; +use crate::login_register::LoginRegisterError; use crate::server_error::ServerNgError; +use crate::session_manager::SessionManager; +use bytes::Bytes; use configs::server_ng::ServerNgConfig; use consensus::{LocalPipeline, PartitionsHandle, Sequencer, VsrConsensus}; -use iggy_binary_protocol::RequestHeader; +use iggy_binary_protocol::requests::users::{LoginRegisterRequest, LoginRegisterWithPatRequest}; +use iggy_binary_protocol::responses::users::LoginRegisterResponse; +use iggy_binary_protocol::{ + Command2, Message, Operation, ReplyHeader, RequestHeader, WireDecode, WireEncode, +}; use iggy_common::sharding::{IggyNamespace, PartitionLocation, ShardId}; use iggy_common::{ - ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, PartitionStats, TopicStats, - sharding::LocalIdx, variadic, + ConsumerGroupOffsets, ConsumerOffsets, IggyByteSize, IggyError, IggyTimestamp, PartitionStats, + PersonalAccessToken, TopicStats, UserStatus, sharding::LocalIdx, variadic, }; use journal::Journal; use journal::prepare_journal::PrepareJournal; @@ -42,7 +49,7 @@ use message_bus::transports::tls::{ }; use message_bus::{ AcceptedClientFn, AcceptedQuicClientFn, AcceptedReplicaFn, AcceptedTlsClientFn, - AcceptedWsClientFn, IggyMessageBus, connector, + AcceptedWsClientFn, IggyMessageBus, MessageBus, connector, }; use metadata::IggyMetadata; use metadata::MuxStateMachine; @@ -56,10 +63,13 @@ use partitions::{ IggyIndexWriter, IggyPartition, IggyPartitions, MessagesWriter, PartitionsConfig, Segment, }; // TODO: decouple bootstrap/storage helpers and logging from the `server` crate. +use secrecy::ExposeSecret; use server::bootstrap::create_directories; use server::log::logger::Logging; use server::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets}; use server::streaming::segments::storage::create_segment_storage; +use server::streaming::users::user::User as LegacyUser; +use server::streaming::utils::crypto; use shard::builder::IggyShardBuilder; use shard::shards_table::PapayaShardsTable; use shard::{ @@ -220,6 +230,7 @@ pub async fn bootstrap( let recovered = recover::(Path::new(&config.system.path)) .await .map_err(ServerNgError::MetadataRecovery)?; + ensure_default_root_user(&recovered.mux_stm); let restored_op = recovered.last_applied_op.unwrap_or_else(|| { recovered .snapshot @@ -511,6 +522,17 @@ async fn hydrate_partition_log( .zip(loaded_log.storages().iter().cloned()) .enumerate() { + validate_recovered_segment( + stream_id, + topic_id, + partition_id, + segment, + &storage, + loaded_log + .indexes() + .get(segment_index) + .and_then(|indexes| indexes.as_ref()), + )?; let max_timestamp = match loaded_log .indexes() .get(segment_index) @@ -581,6 +603,34 @@ async fn hydrate_partition_log( Ok(()) } +fn validate_recovered_segment( + stream_id: usize, + topic_id: usize, + partition_id: usize, + segment: &iggy_common::Segment, + storage: &iggy_common::SegmentStorage, + indexes: Option<&server::streaming::segments::IggyIndexesMut>, +) -> Result<(), ServerNgError> { + let messages_size_bytes = storage + .messages_reader + .as_ref() + .map_or(0, |reader| u64::from(reader.file_size())); + let indexed_size_bytes = indexes.map_or(0, |indexes| u64::from(indexes.messages_size())); + if messages_size_bytes == indexed_size_bytes { + return Ok(()); + } + + Err(ServerNgError::RecoveredSegmentSizeDivergence { + stream_id, + topic_id, + partition_id, + start_offset: segment.start_offset, + end_offset: segment.end_offset, + messages_size_bytes, + indexed_size_bytes, + }) +} + fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) -> Segment { Segment { sealed: segment.sealed, @@ -1173,7 +1223,7 @@ async fn start_manual_runtime( None }; - let bound_clients = start_client_listeners(shard, config, topology, &accepted_clients).await?; + let bound_clients = start_client_listeners(shard, config, topology, &accepted_clients)?; write_current_config( config, Some(topology.self_replica_id), @@ -1218,19 +1268,14 @@ fn make_replica_message_handler(shard: &Rc) -> MessageHandler { fn make_client_request_handler(shard: &Rc) -> RequestHandler { let shard = Rc::clone(shard); + let sessions = Rc::new(RefCell::new(SessionManager::new())); Rc::new(move |client_id, message| { - let request = match message.try_into_typed::() { - Ok(request) => request, - Err(error) => { - warn!(client_id, error = %error, "dropping client request with invalid header"); - return; - } - }; - let request = request.transmute_header(|header, new_header: &mut RequestHeader| { - *new_header = header; - new_header.client = client_id; - }); - shard.dispatch(request.into_generic()); + let shard = Rc::clone(&shard); + let sessions = Rc::clone(&sessions); + compio::runtime::spawn(async move { + handle_client_request(&shard, &sessions, client_id, message).await; + }) + .detach(); }) } @@ -1245,25 +1290,331 @@ fn make_deferred_replica_message_handler(shard_handle: &ServerNgShardHandle) -> fn make_deferred_client_request_handler(shard_handle: &ServerNgShardHandle) -> RequestHandler { let shard_handle = Rc::clone(shard_handle); + let sessions = Rc::new(RefCell::new(SessionManager::new())); Rc::new(move |client_id, message| { - let Some(shard) = upgrade_shard_handle(&shard_handle) else { + let shard_handle = Rc::clone(&shard_handle); + let sessions = Rc::clone(&sessions); + compio::runtime::spawn(async move { + let Some(shard) = upgrade_shard_handle(&shard_handle) else { + return; + }; + handle_client_request(&shard, &sessions, client_id, message).await; + }) + .detach(); + }) +} + +#[allow(clippy::future_not_send)] +async fn handle_client_request( + shard: &Rc, + sessions: &Rc>, + transport_client_id: u128, + message: Message, +) { + let request = match message.try_into_typed::() { + Ok(request) => request, + Err(error) => { + warn!( + transport_client_id, + error = %error, + "dropping client request with invalid header" + ); return; - }; - let request = match message.try_into_typed::() { - Ok(request) => request, + } + }; + + ensure_transport_connection(shard, sessions, transport_client_id); + + let header = *request.header(); + if header.operation == Operation::Register && header.session == 0 && header.request == 0 { + handle_login_register_request(shard, sessions, transport_client_id, request).await; + return; + } + + let bound = sessions.borrow().get_session(transport_client_id); + let request = request.transmute_header(|header, new_header: &mut RequestHeader| { + *new_header = header; + new_header.client = transport_client_id; + if let Some((bound_client_id, bound_session)) = bound { + new_header.client = bound_client_id; + new_header.session = bound_session; + } + }); + shard.dispatch(request.into_generic()); +} + +fn ensure_transport_connection( + shard: &Rc, + sessions: &Rc>, + transport_client_id: u128, +) { + let Some(meta) = shard.bus.client_meta(transport_client_id) else { + return; + }; + sessions + .borrow_mut() + .ensure_connection(transport_client_id, meta.peer_addr); +} + +#[allow(clippy::future_not_send)] +async fn handle_login_register_request( + shard: &Rc, + sessions: &Rc>, + transport_client_id: u128, + request: Message, +) { + let body = request_body(&request); + let vsr_client_id = request.header().client; + + if let Ok(wire_request) = LoginRegisterRequest::decode_from(body) { + match verify_login_credentials( + shard, + wire_request.username.as_str(), + wire_request.password.expose_secret(), + ) { + Ok(user_id) => { + if let Err(error) = complete_login_register( + shard, + sessions, + transport_client_id, + vsr_client_id, + request.header(), + user_id, + ) + .await + { + warn!(transport_client_id, error = %error, "login/register failed"); + } + return; + } + Err(LoginRegisterError::InvalidCredentials) => {} Err(error) => { - warn!(client_id, error = %error, "dropping client request with invalid header"); + warn!(transport_client_id, error = %error, "login/register failed"); + return; + } + } + } + + if let Ok(wire_request) = LoginRegisterWithPatRequest::decode_from(body) { + match verify_pat_credentials(shard, wire_request.token.expose_secret()) { + Ok(user_id) => { + if let Err(error) = complete_login_register( + shard, + sessions, + transport_client_id, + vsr_client_id, + request.header(), + user_id, + ) + .await + { + warn!( + transport_client_id, + error = %error, + "login/register with PAT failed" + ); + } return; } + Err(error) => { + warn!( + transport_client_id, + error = %error, + "login/register with PAT failed" + ); + return; + } + } + } + + warn!( + transport_client_id, + "dropping register request with unsupported payload shape" + ); +} + +fn request_body(request: &Message) -> &[u8] { + &request.as_slice()[std::mem::size_of::()..request.header().size as usize] +} + +fn verify_login_credentials( + shard: &Rc, + username: &str, + password: &str, +) -> Result { + shard.plane.inner().0.mux_stm.inner().0.read(|users| { + let Some((_, user_id)) = users + .index + .iter() + .find(|(name, _)| name.as_ref() == username) + else { + return Err(LoginRegisterError::InvalidCredentials); }; - let request = request.transmute_header(|header, new_header: &mut RequestHeader| { - *new_header = header; - new_header.client = client_id; - }); - shard.dispatch(request.into_generic()); + let Some(user) = users.items.get(*user_id as usize) else { + return Err(LoginRegisterError::InvalidCredentials); + }; + if user.status != UserStatus::Active { + return Err(LoginRegisterError::UserInactive); + } + if !crypto::verify_password(password, user.password_hash.as_ref()) { + return Err(LoginRegisterError::InvalidCredentials); + } + Ok(user.id) + }) +} + +fn verify_pat_credentials( + shard: &Rc, + token: &str, +) -> Result { + let token_hash = PersonalAccessToken::hash_token(token); + let now = IggyTimestamp::now(); + shard.plane.inner().0.mux_stm.inner().0.read(|users| { + let Some((user_id, pat)) = + users + .personal_access_tokens + .iter() + .find_map(|(user_id, tokens)| { + tokens + .values() + .find(|pat| pat.token.as_ref() == token_hash) + .map(|pat| (*user_id, pat)) + }) + else { + return Err(LoginRegisterError::InvalidToken); + }; + if pat.is_expired(now) { + return Err(LoginRegisterError::InvalidToken); + } + let Some(user) = users.items.get(user_id as usize) else { + return Err(LoginRegisterError::InvalidToken); + }; + if user.status != UserStatus::Active { + return Err(LoginRegisterError::UserInactive); + } + Ok(user.id) }) } +#[allow(clippy::future_not_send)] +async fn complete_login_register( + shard: &Rc, + sessions: &Rc>, + transport_client_id: u128, + vsr_client_id: u128, + request_header: &RequestHeader, + user_id: u32, +) -> Result<(), LoginRegisterError> { + let existing_session = { + let sessions = sessions.borrow(); + sessions + .get_session(transport_client_id) + .map(|(_, session)| session) + }; + if let Some(session) = existing_session { + let response = LoginRegisterResponse { user_id, session }.to_bytes(); + let reply = build_login_register_reply(request_header, vsr_client_id, session, &response); + let _ = shard + .bus + .send_to_client(transport_client_id, reply.into_generic().into_frozen()) + .await; + return Ok(()); + } + + { + let mut sessions = sessions.borrow_mut(); + sessions + .login(transport_client_id, user_id) + .map_err(LoginRegisterError::Session)?; + } + + let session = match shard + .plane + .inner() + .0 + .submit_register_in_process(vsr_client_id) + .await + { + Ok(session) => session, + Err(error) => { + let _ = sessions + .borrow_mut() + .reset_to_connected(transport_client_id); + return Err(LoginRegisterError::Transient(error)); + } + }; + + { + let mut sessions = sessions.borrow_mut(); + sessions + .bind_session(transport_client_id, vsr_client_id, session) + .map_err(LoginRegisterError::Session)?; + } + + let response = LoginRegisterResponse { user_id, session }.to_bytes(); + let reply = build_login_register_reply(request_header, vsr_client_id, session, &response); + if let Err(error) = shard + .bus + .send_to_client(transport_client_id, reply.into_generic().into_frozen()) + .await + { + warn!( + transport_client_id, + error = %error, + "failed to send login/register reply" + ); + } + + Ok(()) +} + +fn ensure_default_root_user(mux_stm: &ServerNgMuxStateMachine) { + if !mux_stm.inner().0.read(|users| users.items.is_empty()) { + return; + } + + let LegacyUser { + username, password, .. + } = server::bootstrap::create_root_user(); + mux_stm.inner().0.ensure_root_user(&username, &password); +} + +fn build_login_register_reply( + request_header: &RequestHeader, + client_id: u128, + session: u64, + body: &Bytes, +) -> Message { + let total_size = std::mem::size_of::() + body.len(); + let mut reply = Message::::new(total_size); + let header_size = u32::try_from(total_size).expect("reply size must fit into u32"); + let header = bytemuck::checked::try_from_bytes_mut::( + &mut reply.as_mut_slice()[..std::mem::size_of::()], + ) + .expect("zeroed bytes are valid"); + *header = ReplyHeader { + cluster: request_header.cluster, + size: header_size, + view: request_header.view, + release: request_header.release, + command: Command2::Reply, + replica: request_header.replica, + request_checksum: request_header.request_checksum, + client: client_id, + op: session, + commit: session, + timestamp: request_header.timestamp, + request: request_header.request, + operation: request_header.operation, + namespace: request_header.namespace, + ..Default::default() + }; + if !body.is_empty() { + reply.as_mut_slice()[std::mem::size_of::()..total_size].copy_from_slice(body); + } + reply +} + fn upgrade_shard_handle(shard_handle: &ServerNgShardHandle) -> Option> { shard_handle .borrow() @@ -1403,7 +1754,7 @@ fn mint_client_meta( ClientConnMeta::new((shard_id << 112) | seq, peer_addr, transport) } -async fn start_client_listeners( +fn start_client_listeners( shard: &Rc, config: &ServerNgConfig, topology: &TcpTopology, @@ -1413,7 +1764,6 @@ async fn start_client_listeners( if config.tcp.enabled && !config.tcp.tls.enabled { let (listener, bound_addr) = client_listener::tcp::bind(topology.client_listen_addr) - .await .map_err(|source| { error!( addr = %topology.client_listen_addr, @@ -1432,8 +1782,7 @@ async fn start_client_listeners( } if let Some(ws_addr) = topology.ws_listen_addr { - let (listener, bound_addr) = - client_listener::ws::bind(ws_addr).await.map_err(|source| { + let (listener, bound_addr) = client_listener::ws::bind(ws_addr).map_err(|source| { error!(addr = %ws_addr, error = %source, "failed to bind websocket listener"); source })?; @@ -1461,7 +1810,6 @@ async fn start_client_listeners( source })?; let (endpoint, bound_addr) = client_listener::quic::bind(quic_addr, server_config) - .await .map_err(|source| { error!(addr = %quic_addr, error = %source, "failed to bind QUIC listener"); source @@ -1480,7 +1828,6 @@ async fn start_client_listeners( let credentials = load_tcp_tls_server_credentials(config)?; let (listener, tls_config, bound_addr) = client_listener::tcp_tls::bind(topology.client_listen_addr, credentials) - .await .map_err(|source| { error!( addr = %topology.client_listen_addr, diff --git a/core/server-ng/src/login_register.rs b/core/server-ng/src/login_register.rs index f152421ffa..e2a7c34ef3 100644 --- a/core/server-ng/src/login_register.rs +++ b/core/server-ng/src/login_register.rs @@ -75,7 +75,7 @@ pub async fn handle_login_register( verifier: &V, metadata: &LoginMetadata<'_, B, J, S, M>, session_manager: &mut SessionManager, - connection_id: u64, + connection_id: u128, ) -> Result where V: CredentialVerifier, @@ -117,7 +117,7 @@ pub async fn handle_login_register_with_pat( token_verifier: &T, metadata: &LoginMetadata<'_, B, J, S, M>, session_manager: &mut SessionManager, - connection_id: u64, + connection_id: u128, ) -> Result where T: TokenVerifier, @@ -156,7 +156,7 @@ async fn complete_register( user_id: u32, metadata: &LoginMetadata<'_, B, J, S, M>, session_manager: &mut SessionManager, - connection_id: u64, + connection_id: u128, ) -> Result where B: MessageBus, diff --git a/core/server-ng/src/server_error.rs b/core/server-ng/src/server_error.rs index 77e17ee1cb..664036bf22 100644 --- a/core/server-ng/src/server_error.rs +++ b/core/server-ng/src/server_error.rs @@ -55,6 +55,18 @@ pub enum ServerNgError { MissingReplicaId, #[error("cluster node for replica {replica_id} is missing tcp_replica port")] ClusterReplicaPortMissing { replica_id: u8 }, + #[error( + "recovered segment for stream {stream_id}, topic {topic_id}, partition {partition_id} at start_offset {start_offset} has message/index divergence (messages_size={messages_size_bytes}, indexed_size={indexed_size_bytes}, end_offset={end_offset})" + )] + RecoveredSegmentSizeDivergence { + stream_id: usize, + topic_id: usize, + partition_id: usize, + start_offset: u64, + end_offset: u64, + messages_size_bytes: u64, + indexed_size_bytes: u64, + }, #[error( "failed to load persisted {consumer_kind} offsets for stream {stream_id}, topic {topic_id}, partition {partition_id} from {path}" )] diff --git a/core/server-ng/src/session_manager.rs b/core/server-ng/src/session_manager.rs index 0527c45e38..894c5adc02 100644 --- a/core/server-ng/src/session_manager.rs +++ b/core/server-ng/src/session_manager.rs @@ -77,11 +77,11 @@ pub struct Connection { /// per consensus session). If a client reconnects with the same `client_id`, /// the old connection must be evicted first. pub struct SessionManager { - connections: HashMap, + connections: HashMap, /// Reverse index: `client_id` → `connection_id` for fast lookup when /// a consensus reply arrives and needs routing to the right connection. - client_to_connection: HashMap, - next_connection_id: u64, + client_to_connection: HashMap, + next_connection_id: u128, } impl SessionManager { @@ -98,12 +98,12 @@ impl SessionManager { /// /// # Panics /// Panics if the connection ID counter overflows `u64::MAX`. - pub fn add_connection(&mut self, address: SocketAddr) -> u64 { + pub fn add_connection(&mut self, address: SocketAddr) -> u128 { let id = self.next_connection_id; self.next_connection_id = self .next_connection_id .checked_add(1) - .expect("connection ID overflow (u64::MAX connections without restart)"); + .expect("connection ID overflow (u128::MAX connections without restart)"); self.connections.insert( id, Connection { @@ -114,8 +114,15 @@ impl SessionManager { id } + pub fn ensure_connection(&mut self, connection_id: u128, address: SocketAddr) { + self.connections.entry(connection_id).or_insert(Connection { + address, + state: ConnectionState::Connected, + }); + } + /// Remove a connection (disconnect). Cleans up the reverse index if bound. - pub fn remove_connection(&mut self, connection_id: u64) { + pub fn remove_connection(&mut self, connection_id: u128) { if let Some(conn) = self.connections.remove(&connection_id) && let ConnectionState::Bound { client_id, .. } = conn.state { @@ -127,7 +134,7 @@ impl SessionManager { /// /// # Errors /// Returns `Err` if the connection doesn't exist or isn't in `Connected` state. - pub fn login(&mut self, connection_id: u64, user_id: u32) -> Result<(), SessionError> { + pub fn login(&mut self, connection_id: u128, user_id: u32) -> Result<(), SessionError> { let conn = self .connections .get_mut(&connection_id) @@ -153,7 +160,7 @@ impl SessionManager { /// /// # Errors /// Returns `Err` if the connection doesn't exist or isn't `Authenticated`. - pub fn reset_to_connected(&mut self, connection_id: u64) -> Result<(), SessionError> { + pub fn reset_to_connected(&mut self, connection_id: u128) -> Result<(), SessionError> { let conn = self .connections .get_mut(&connection_id) @@ -188,7 +195,7 @@ impl SessionManager { /// (impossible in single-threaded use). pub fn bind_session( &mut self, - connection_id: u64, + connection_id: u128, client_id: u128, session: u64, ) -> Result<(), SessionError> { @@ -227,7 +234,7 @@ impl SessionManager { /// /// Returns `(client_id, session)` if the connection is `Bound`, `None` otherwise. #[must_use] - pub fn get_session(&self, connection_id: u64) -> Option<(u128, u64)> { + pub fn get_session(&self, connection_id: u128) -> Option<(u128, u64)> { let conn = self.connections.get(&connection_id)?; match conn.state { ConnectionState::Bound { @@ -239,13 +246,13 @@ impl SessionManager { /// Look up the connection ID for a client (for routing consensus replies). #[must_use] - pub fn connection_for_client(&self, client_id: u128) -> Option { + pub fn connection_for_client(&self, client_id: u128) -> Option { self.client_to_connection.get(&client_id).copied() } /// Get connection metadata. #[must_use] - pub fn get_connection(&self, connection_id: u64) -> Option<&Connection> { + pub fn get_connection(&self, connection_id: u128) -> Option<&Connection> { self.connections.get(&connection_id) } @@ -270,9 +277,9 @@ impl Default for SessionManager { #[derive(Debug)] pub enum SessionError { - ConnectionNotFound(u64), + ConnectionNotFound(u128), InvalidTransition { - connection_id: u64, + connection_id: u128, from: &'static str, to: &'static str, },