Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/stackable-operator/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

### Changed

- Don't parse `/etc/resolv.conf` to auto-detect the Kubernetes cluster domain in case it is not explicitly configured.
Instead the operator will default to `cluster.local`. We revert this now after some concerns where raised, we will
create a follow-up decision instead addressing how we will continue with this ([#896]).

### Fixed

- Fix Kubernetes cluster domain parsing from resolv.conf, e.g. on AWS EKS.
We now only consider Kubernetes services domains instead of all domains (which could include non-Kubernetes domains) ([#895]).

[#895]: https://github.com/stackabletech/operator-rs/pull/895
[#896]: https://github.com/stackabletech/operator-rs/pull/896

## [0.79.0] - 2024-10-18

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

16 changes: 14 additions & 2 deletions crates/stackable-operator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ use clap::Args;
use product_config::ProductConfigManager;
use snafu::{ResultExt, Snafu};

use crate::{logging::TracingTarget, namespace::WatchNamespace};
use crate::{commons::networking::DomainName, logging::TracingTarget, namespace::WatchNamespace};

pub const AUTHOR: &str = "Stackable GmbH - [email protected]";

Expand Down Expand Up @@ -179,7 +179,8 @@ pub enum Command<Run: Args = ProductOperatorRun> {
/// common: ProductOperatorRun {
/// product_config: ProductConfigPath::from("bar".as_ref()),
/// watch_namespace: WatchNamespace::One("foobar".to_string()),
/// tracing_target: TracingTarget::None
/// tracing_target: TracingTarget::None,
/// kubernetes_cluster_domain: None,
/// },
/// }));
/// ```
Expand All @@ -205,12 +206,20 @@ pub struct ProductOperatorRun {
/// Provides the path to a product-config file
#[arg(long, short = 'p', value_name = "FILE", default_value = "", env)]
pub product_config: ProductConfigPath,

/// Provides a specific namespace to watch (instead of watching all namespaces)
#[arg(long, env, default_value = "")]
pub watch_namespace: WatchNamespace,

/// Tracing log collector system
#[arg(long, env, default_value_t, value_enum)]
pub tracing_target: TracingTarget,

/// Kubernetes cluster domain, usually this is `cluster.local`.
// We are not using a default value here, as operators will probably do an more advanced
// auto-detection of the cluster domain in case it is not specified in the future.
#[arg(long, env)]
pub kubernetes_cluster_domain: Option<DomainName>,
}

/// A path to a [`ProductConfigManager`] spec file
Expand Down Expand Up @@ -384,6 +393,7 @@ mod tests {
product_config: ProductConfigPath::from("bar".as_ref()),
watch_namespace: WatchNamespace::One("foo".to_string()),
tracing_target: TracingTarget::None,
kubernetes_cluster_domain: None,
}
);

Expand All @@ -395,6 +405,7 @@ mod tests {
product_config: ProductConfigPath::from("bar".as_ref()),
watch_namespace: WatchNamespace::All,
tracing_target: TracingTarget::None,
kubernetes_cluster_domain: None,
}
);

Expand All @@ -407,6 +418,7 @@ mod tests {
product_config: ProductConfigPath::from("bar".as_ref()),
watch_namespace: WatchNamespace::One("foo".to_string()),
tracing_target: TracingTarget::None,
kubernetes_cluster_domain: None,
}
);
}
Expand Down
86 changes: 50 additions & 36 deletions crates/stackable-operator/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
use crate::kvp::LabelSelectorExt;
use crate::utils::cluster_domain::{self, retrieve_cluster_domain, KUBERNETES_CLUSTER_DOMAIN};
use std::{
convert::TryFrom,
fmt::{Debug, Display},
};

use either::Either;
use futures::StreamExt;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, Resource, ResourceExt};
use kube::client::Client as KubeClient;
use kube::core::Status;
use kube::runtime::wait::delete::delete_and_finalize;
use kube::runtime::{watcher, WatchStreamExt};
use kube::{Api, Config};
use serde::de::DeserializeOwned;
use serde::Serialize;
use k8s_openapi::{
apimachinery::pkg::apis::meta::v1::LabelSelector, ClusterResourceScope, NamespaceResourceScope,
};
use kube::{
api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, Resource, ResourceExt},
client::Client as KubeClient,
core::Status,
runtime::{wait::delete::delete_and_finalize, watcher, WatchStreamExt},
Api, Config,
};
use serde::{de::DeserializeOwned, Serialize};
use snafu::{OptionExt, ResultExt, Snafu};
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
use tracing::trace;

use crate::{
commons::networking::DomainName, kvp::LabelSelectorExt,
utils::cluster_info::KubernetesClusterInfo,
};

pub type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -78,9 +84,6 @@ pub enum Error {

#[snafu(display("unable to create kubernetes client"))]
CreateKubeClient { source: kube::Error },

#[snafu(display("unable to to resolve kubernetes cluster domain"))]
ResolveKubernetesClusterDomain { source: cluster_domain::Error },
}

/// This `Client` can be used to access Kubernetes.
Expand All @@ -93,13 +96,16 @@ pub struct Client {
delete_params: DeleteParams,
/// Default namespace as defined in the kubeconfig this client has been created from.
pub default_namespace: String,

pub kubernetes_cluster_info: KubernetesClusterInfo,
}

impl Client {
pub fn new(
client: KubeClient,
field_manager: Option<String>,
default_namespace: String,
kubernetes_cluster_info: KubernetesClusterInfo,
) -> Self {
Client {
client,
Expand All @@ -113,6 +119,7 @@ impl Client {
},
delete_params: DeleteParams::default(),
default_namespace,
kubernetes_cluster_info,
}
}

Expand Down Expand Up @@ -519,7 +526,7 @@ impl Client {
/// #[tokio::main]
/// async fn main(){
///
/// let client: Client = initialize_operator(None).await.expect("Unable to construct client.");
/// let client: Client = initialize_operator(&None, None).await.expect("Unable to construct client.");
/// let watcher_config: watcher::Config =
/// watcher::Config::default().fields(&format!("metadata.name=nonexistent-pod"));
///
Expand Down Expand Up @@ -626,38 +633,45 @@ where
}
}

pub async fn initialize_operator(field_manager: Option<String>) -> Result<Client> {
let _ = KUBERNETES_CLUSTER_DOMAIN
.set(retrieve_cluster_domain().context(ResolveKubernetesClusterDomainSnafu)?);
create_client(field_manager).await
}

async fn create_client(field_manager: Option<String>) -> Result<Client> {
pub async fn initialize_operator(
cli_kubernetes_cluster_domain: &Option<DomainName>,
field_manager: Option<String>,
) -> Result<Client> {
let kubeconfig: Config = kube::Config::infer()
.await
.map_err(kube::Error::InferConfig)
.context(InferKubeConfigSnafu)?;
let default_namespace = kubeconfig.default_namespace.clone();
let client = kube::Client::try_from(kubeconfig).context(CreateKubeClientSnafu)?;
Ok(Client::new(client, field_manager, default_namespace))
let cluster_info = KubernetesClusterInfo::new(cli_kubernetes_cluster_domain);

Ok(Client::new(
client,
field_manager,
default_namespace,
cluster_info,
))
}

#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, time::Duration};

use futures::StreamExt;
use k8s_openapi::api::core::v1::{Container, Pod, PodSpec};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{ObjectMeta, PostParams, ResourceExt};
use kube::runtime::watcher;
use kube::runtime::watcher::Event;
use std::collections::BTreeMap;
use std::time::Duration;
use k8s_openapi::{
api::core::v1::{Container, Pod, PodSpec},
apimachinery::pkg::apis::meta::v1::LabelSelector,
};
use kube::{
api::{ObjectMeta, PostParams, ResourceExt},
runtime::watcher::{self, Event},
};
use tokio::time::error::Elapsed;

#[tokio::test]
#[ignore = "Tests depending on Kubernetes are not ran by default"]
async fn k8s_test_wait_created() {
let client = super::create_client(None)
let client = super::initialize_operator(&None, None)
.await
.expect("KUBECONFIG variable must be configured.");

Expand Down Expand Up @@ -735,7 +749,7 @@ mod tests {
#[tokio::test]
#[ignore = "Tests depending on Kubernetes are not ran by default"]
async fn k8s_test_wait_created_timeout() {
let client = super::create_client(None)
let client = super::initialize_operator(&None, None)
.await
.expect("KUBECONFIG variable must be configured.");

Expand All @@ -755,7 +769,7 @@ mod tests {
#[tokio::test]
#[ignore = "Tests depending on Kubernetes are not ran by default"]
async fn k8s_test_list_with_label_selector() {
let client = super::create_client(None)
let client = super::initialize_operator(&None, None)
.await
.expect("KUBECONFIG variable must be configured.");

Expand Down
Loading
Loading