Skip to content

Commit b8d356e

Browse files
committed
create separate headless services and fix discovery cm
1 parent 3431358 commit b8d356e

File tree

7 files changed

+271
-158
lines changed

7 files changed

+271
-158
lines changed

rust/operator-binary/src/crd/mod.rs

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ use stackable_operator::{
3232
schemars::{self, JsonSchema},
3333
status::condition::{ClusterCondition, HasStatusCondition},
3434
time::Duration,
35-
utils::{
36-
COMMON_BASH_TRAP_FUNCTIONS, cluster_info::KubernetesClusterInfo,
37-
crds::raw_object_list_schema,
38-
},
35+
utils::{COMMON_BASH_TRAP_FUNCTIONS, crds::raw_object_list_schema},
3936
versioned::versioned,
4037
};
4138
use strum::{Display, EnumDiscriminants, EnumIter, EnumString, IntoStaticStr};
@@ -470,25 +467,6 @@ impl v1alpha1::DruidCluster {
470467
.collect()
471468
}
472469

473-
/// The name of the role-level load-balanced Kubernetes `Service`
474-
pub fn role_service_name(&self, role: &DruidRole) -> Option<String> {
475-
Some(format!("{}-{}-headless", self.metadata.name.clone()?, role))
476-
}
477-
478-
/// The fully-qualified domain name of the role-level load-balanced Kubernetes `Service`
479-
pub fn role_service_fqdn(
480-
&self,
481-
role: &DruidRole,
482-
cluster_info: &KubernetesClusterInfo,
483-
) -> Option<String> {
484-
Some(format!(
485-
"{service_name}.{namespace}.svc.{cluster_domain}",
486-
service_name = self.role_service_name(role)?,
487-
namespace = self.metadata.namespace.as_ref()?,
488-
cluster_domain = cluster_info.cluster_domain,
489-
))
490-
}
491-
492470
/// If an s3 connection for ingestion is given, as well as an s3 connection for deep storage, they need to be the same.
493471
/// This function returns the resolved connection, or raises an Error if the connections are not identical.
494472
pub async fn get_s3_connection(
@@ -1665,32 +1643,6 @@ where
16651643

16661644
#[cfg(test)]
16671645
mod tests {
1668-
use stackable_operator::commons::networking::DomainName;
1669-
1670-
use super::*;
1671-
1672-
#[test]
1673-
fn test_service_name_generation() {
1674-
let cluster = deserialize_yaml_file::<v1alpha1::DruidCluster>(
1675-
"test/resources/crd/role_service/druid_cluster.yaml",
1676-
);
1677-
let dummy_cluster_info = KubernetesClusterInfo {
1678-
cluster_domain: DomainName::try_from("cluster.local").unwrap(),
1679-
};
1680-
1681-
assert_eq!(cluster.metadata.name, Some("testcluster".to_string()));
1682-
1683-
assert_eq!(
1684-
cluster.role_service_name(&DruidRole::Router),
1685-
Some("testcluster-router-headless".to_string())
1686-
);
1687-
1688-
assert_eq!(
1689-
cluster.role_service_fqdn(&DruidRole::Router, &dummy_cluster_info),
1690-
Some("testcluster-router-headless.default.svc.cluster.local".to_string())
1691-
)
1692-
}
1693-
16941646
pub fn deserialize_yaml_str<'a, T: serde::de::Deserialize<'a>>(value: &'a str) -> T {
16951647
let deserializer = serde_yaml::Deserializer::from_str(value);
16961648
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap()

rust/operator-binary/src/crd/security.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use stackable_operator::{
1515
},
1616
crd::listener,
1717
k8s_openapi::{
18-
api::core::v1::{ContainerPort, Probe, TCPSocketAction},
18+
api::core::v1::{ContainerPort, Probe, ServicePort, TCPSocketAction},
1919
apimachinery::pkg::util::intstr::IntOrString,
2020
},
2121
time::Duration,
@@ -58,8 +58,8 @@ const PLAINTEXT_PORT: &str = "druid.plaintextPort";
5858
const ENABLE_TLS_PORT: &str = "druid.enableTlsPort";
5959
const TLS_PORT: &str = "druid.tlsPort";
6060
// Port names
61-
const PLAINTEXT_PORT_NAME: &str = "http";
62-
const TLS_PORT_NAME: &str = "https";
61+
pub const PLAINTEXT_PORT_NAME: &str = "http";
62+
pub const TLS_PORT_NAME: &str = "https";
6363
// Client side (Druid) TLS
6464
const CLIENT_HTTPS_KEY_STORE_PATH: &str = "druid.client.https.keyStorePath";
6565
const CLIENT_HTTPS_KEY_STORE_TYPE: &str = "druid.client.https.keyStoreType";
@@ -162,6 +162,18 @@ impl DruidTlsSecurity {
162162
.collect()
163163
}
164164

165+
pub fn service_ports(&self, role: &DruidRole) -> Vec<ServicePort> {
166+
self.exposed_ports(role)
167+
.into_iter()
168+
.map(|(name, val)| ServicePort {
169+
name: Some(name),
170+
port: val.into(),
171+
protocol: Some("TCP".to_string()),
172+
..ServicePort::default()
173+
})
174+
.collect()
175+
}
176+
165177
pub fn listener_ports(
166178
&self,
167179
role: &DruidRole,

rust/operator-binary/src/discovery.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
//! Discovery for Druid. We make Druid discoverable by putting a connection string to the router service
22
//! inside a config map. We only provide a connection string to the router service, since it serves as
33
//! a gateway to the cluster for client queries.
4-
use snafu::{OptionExt, ResultExt, Snafu};
4+
use snafu::{ResultExt, Snafu};
55
use stackable_operator::{
66
builder::{configmap::ConfigMapBuilder, meta::ObjectMetaBuilder},
77
commons::product_image_selection::ResolvedProductImage,
8+
crd::listener::v1alpha1::Listener,
89
k8s_openapi::api::core::v1::ConfigMap,
910
kube::{Resource, ResourceExt, runtime::reflector::ObjectRef},
10-
utils::cluster_info::KubernetesClusterInfo,
1111
};
1212

1313
use crate::{
1414
DRUID_CONTROLLER_NAME,
1515
crd::{DruidRole, build_recommended_labels, security::DruidTlsSecurity, v1alpha1},
16+
listener::build_listener_connection_string,
1617
};
1718

1819
#[derive(Snafu, Debug)]
@@ -35,47 +36,45 @@ pub enum Error {
3536
AddRecommendedLabels {
3637
source: stackable_operator::builder::meta::Error,
3738
},
39+
40+
#[snafu(display("failed to configure listener discovery configmap"))]
41+
ListenerConfiguration { source: crate::listener::Error },
3842
}
3943

4044
/// Builds discovery [`ConfigMap`]s for connecting to a [`v1alpha1::DruidCluster`].
4145
pub async fn build_discovery_configmaps(
4246
druid: &v1alpha1::DruidCluster,
4347
owner: &impl Resource<DynamicType = ()>,
44-
cluster_info: &KubernetesClusterInfo,
4548
resolved_product_image: &ResolvedProductImage,
4649
druid_tls_security: &DruidTlsSecurity,
50+
listener: Listener,
4751
) -> Result<Vec<ConfigMap>, Error> {
4852
let name = owner.name_unchecked();
4953
Ok(vec![build_discovery_configmap(
5054
druid,
5155
owner,
52-
cluster_info,
5356
resolved_product_image,
5457
druid_tls_security,
5558
&name,
59+
listener,
5660
)?])
5761
}
5862

5963
/// Build a discovery [`ConfigMap`] containing information about how to connect to a certain [`v1alpha1::DruidCluster`].
6064
fn build_discovery_configmap(
6165
druid: &v1alpha1::DruidCluster,
6266
owner: &impl Resource<DynamicType = ()>,
63-
cluster_info: &KubernetesClusterInfo,
6467
resolved_product_image: &ResolvedProductImage,
6568
druid_tls_security: &DruidTlsSecurity,
6669
name: &str,
70+
listener: Listener,
6771
) -> Result<ConfigMap, Error> {
68-
let router_host = format!(
69-
"{}:{}",
70-
druid
71-
.role_service_fqdn(&DruidRole::Router, cluster_info)
72-
.with_context(|| NoServiceFqdnSnafu)?,
73-
if druid_tls_security.tls_enabled() {
74-
DruidRole::Router.get_https_port()
75-
} else {
76-
DruidRole::Router.get_http_port()
77-
}
78-
);
72+
let router_host = build_listener_connection_string(
73+
listener,
74+
druid_tls_security,
75+
&DruidRole::Router.to_string(),
76+
)
77+
.context(ListenerConfigurationSnafu)?;
7978
let sqlalchemy_conn_str = format!("druid://{}/druid/v2/sql", router_host);
8079
let avatica_conn_str = format!(
8180
"jdbc:avatica:remote:url=http://{}/druid/v2/sql/avatica/",

0 commit comments

Comments
 (0)