Skip to content

Commit c97e5d8

Browse files
committed
it builds
1 parent 32b79a1 commit c97e5d8

File tree

7 files changed

+731
-94
lines changed

7 files changed

+731
-94
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 591 additions & 0 deletions
Large diffs are not rendered by default.

rust/operator-binary/src/connect/affinity.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use stackable_operator::{
33
k8s_openapi::api::core::v1::PodAntiAffinity,
44
};
55

6-
use crate::connect::crd::CONNECT_ROLE_NAME;
7-
use crate::crd::constants::APP_NAME;
6+
use crate::{connect::crd::CONNECT_ROLE_NAME, crd::constants::APP_NAME};
87

98
pub fn affinity(cluster_name: &str) -> StackableAffinityFragment {
109
let affinity_between_role_pods =

rust/operator-binary/src/connect/controller.rs

Lines changed: 42 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ use stackable_operator::{
1616
},
1717
},
1818
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
19-
commons::product_image_selection::ResolvedProductImage,
19+
commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources},
2020
k8s_openapi::{
2121
api::{
2222
apps::v1::{StatefulSet, StatefulSetSpec},
2323
core::v1::{
2424
ConfigMap, PodSecurityContext, Service, ServiceAccount, ServicePort, ServiceSpec,
2525
},
26-
rbac::v1::{ClusterRole, RoleBinding, RoleRef, Subject},
2726
},
2827
apimachinery::pkg::apis::meta::v1::LabelSelector,
2928
DeepMerge,
@@ -47,29 +46,28 @@ use stackable_operator::{
4746
};
4847
use strum::{EnumDiscriminants, IntoStaticStr};
4948

50-
use crate::{
51-
connect::crd::{
49+
use super::{
50+
crd::{
5251
v1alpha1, ConnectConfig, SparkConnectServerContainer, CONNECT_CONTAINER_NAME,
53-
CONNECT_CONTROLLER_NAME, CONNECT_ROLE_NAME,
52+
CONNECT_CONTROLLER_NAME, CONNECT_GRPC_PORT, CONNECT_ROLE_NAME, CONNECT_UI_PORT,
5453
},
55-
connect::pdb::add_pdbs,
54+
pdb::add_pdbs,
55+
};
56+
use crate::{
5657
crd::{
5758
constants::{
58-
ACCESS_KEY_ID, APP_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE,
59-
METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, SPARK_CLUSTER_ROLE,
60-
SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME, SPARK_UID,
61-
STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG,
59+
APP_NAME, JVM_SECURITY_PROPERTIES_FILE, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT,
60+
OPERATOR_NAME, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, SPARK_IMAGE_BASE_NAME,
61+
SPARK_UID, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG,
6262
VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG,
6363
VOLUME_MOUNT_PATH_LOG_CONFIG,
6464
},
65-
tlscerts, to_spark_env_sh_string,
65+
to_spark_env_sh_string,
6666
},
6767
product_logging::{self, resolve_vector_aggregator_address},
6868
Ctx,
6969
};
7070

71-
use super::crd::{CONNECT_GRPC_PORT, CONNECT_UI_PORT};
72-
7371
#[derive(Snafu, Debug, EnumDiscriminants)]
7472
#[strum_discriminants(derive(IntoStaticStr))]
7573
#[allow(clippy::enum_variant_names)]
@@ -127,12 +125,6 @@ pub enum Error {
127125
#[snafu(display("failed to resolve and merge config for role and role group"))]
128126
FailedToResolveConfig { source: crate::connect::crd::Error },
129127

130-
#[snafu(display("number of cleaner rolegroups exceeds 1"))]
131-
TooManyCleanerRoleGroups,
132-
133-
#[snafu(display("number of cleaner replicas exceeds 1"))]
134-
TooManyCleanerReplicas,
135-
136128
#[snafu(display("failed to create cluster resources"))]
137129
CreateClusterResources {
138130
source: stackable_operator::cluster_resources::Error,
@@ -156,7 +148,7 @@ pub enum Error {
156148
ConfigureLogging { source: LoggingError },
157149

158150
#[snafu(display("cannot retrieve role group"))]
159-
CannotRetrieveRoleGroup { source: crate::connect::crd::Error },
151+
CannotRetrieveRoleGroup { source: super::crd::Error },
160152

161153
#[snafu(display(
162154
"Connect server : failed to serialize [{JVM_SECURITY_PROPERTIES_FILE}] for group {}",
@@ -200,7 +192,12 @@ pub enum Error {
200192
},
201193

202194
#[snafu(display("failed to merge environment config and/or overrides"))]
203-
MergeEnv { source: crate::connect::crd::Error },
195+
MergeEnv { source: super::crd::Error },
196+
197+
#[snafu(display("failed to build RBAC resources"))]
198+
BuildRbacResources {
199+
source: stackable_operator::commons::rbac::Error,
200+
},
204201
}
205202

206203
type Result<T, E = Error> = std::result::Result<T, E>;
@@ -250,8 +247,15 @@ pub async fn reconcile(
250247
.context(ResolveVectorAggregatorAddressSnafu)?;
251248

252249
// Use a dedicated service account for connect server pods.
253-
let (serviceaccount, rolebinding) =
254-
build_connect_role_serviceaccount(scs, &resolved_product_image.app_version_label)?;
250+
let (serviceaccount, rolebinding) = build_rbac_resources(
251+
scs,
252+
APP_NAME,
253+
cluster_resources
254+
.get_required_labels()
255+
.context(GetRequiredLabelsSnafu)?,
256+
)
257+
.context(BuildRbacResourcesSnafu)?;
258+
255259
let serviceaccount = cluster_resources
256260
.add(client, serviceaccount)
257261
.await
@@ -587,7 +591,7 @@ fn build_stateful_set(
587591

588592
#[allow(clippy::result_large_err)]
589593
fn build_service(
590-
shs: &v1alpha1::SparkConnectServer,
594+
scs: &v1alpha1::SparkConnectServer,
591595
app_version_label: &str,
592596
role: &str,
593597
group: Option<&RoleGroupRef<v1alpha1::SparkConnectServer>>,
@@ -604,38 +608,43 @@ fn build_service(
604608
Some("None".to_string()),
605609
),
606610
None => (
607-
format!("{}-{}", shs.metadata.name.as_ref().unwrap(), role),
608-
shs.spec.cluster_config.listener_class.k8s_service_type(),
611+
format!("{}-{}", scs.metadata.name.as_ref().unwrap(), role),
612+
scs.spec.cluster_config.listener_class.k8s_service_type(),
609613
None,
610614
),
611615
};
612616

613617
let selector = match group {
614-
Some(rgr) => Labels::role_group_selector(shs, APP_NAME, &rgr.role, &rgr.role_group)
618+
Some(rgr) => Labels::role_group_selector(scs, APP_NAME, &rgr.role, &rgr.role_group)
615619
.context(LabelBuildSnafu)?
616620
.into(),
617-
None => Labels::role_selector(shs, APP_NAME, role)
621+
None => Labels::role_selector(scs, APP_NAME, role)
618622
.context(LabelBuildSnafu)?
619623
.into(),
620624
};
621625

622626
Ok(Service {
623627
metadata: ObjectMetaBuilder::new()
624-
.name_and_namespace(shs)
628+
.name_and_namespace(scs)
625629
.name(service_name)
626-
.ownerreference_from_resource(shs, None, Some(true))
630+
.ownerreference_from_resource(scs, None, Some(true))
627631
.context(ObjectMissingMetadataForOwnerRefSnafu)?
628-
.with_recommended_labels(labels(shs, app_version_label, &group_name))
632+
.with_recommended_labels(labels(scs, app_version_label, &group_name))
629633
.context(MetadataBuildSnafu)?
630634
.with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?)
631635
.build(),
632636
spec: Some(ServiceSpec {
633637
type_: Some(service_type),
634638
cluster_ip: service_cluster_ip,
635639
ports: Some(vec![
640+
ServicePort {
641+
name: Some(String::from("grpc")),
642+
port: CONNECT_GRPC_PORT,
643+
..ServicePort::default()
644+
},
636645
ServicePort {
637646
name: Some(String::from("http")),
638-
port: 18080,
647+
port: CONNECT_UI_PORT,
639648
..ServicePort::default()
640649
},
641650
ServicePort {
@@ -651,50 +660,8 @@ fn build_service(
651660
})
652661
}
653662

654-
// TODO: This function should be replaced with operator-rs build_rbac_resources.
655-
// See: https://github.com/stackabletech/spark-k8s-operator/issues/499
656-
#[allow(clippy::result_large_err)]
657-
fn build_connect_role_serviceaccount(
658-
shs: &v1alpha1::SparkConnectServer,
659-
app_version_label: &str,
660-
) -> Result<(ServiceAccount, RoleBinding)> {
661-
let sa = ServiceAccount {
662-
metadata: ObjectMetaBuilder::new()
663-
.name_and_namespace(shs)
664-
.ownerreference_from_resource(shs, None, Some(true))
665-
.context(ObjectMissingMetadataForOwnerRefSnafu)?
666-
.with_recommended_labels(labels(shs, app_version_label, connect_CONTROLLER_NAME))
667-
.context(MetadataBuildSnafu)?
668-
.build(),
669-
..ServiceAccount::default()
670-
};
671-
let binding = RoleBinding {
672-
metadata: ObjectMetaBuilder::new()
673-
.name_and_namespace(shs)
674-
.ownerreference_from_resource(shs, None, Some(true))
675-
.context(ObjectMissingMetadataForOwnerRefSnafu)?
676-
.with_recommended_labels(labels(shs, app_version_label, connect_CONTROLLER_NAME))
677-
.context(MetadataBuildSnafu)?
678-
.build(),
679-
role_ref: RoleRef {
680-
api_group: <ClusterRole as stackable_operator::k8s_openapi::Resource>::GROUP // need to fully qualify because of "Resource" name clash
681-
.to_string(),
682-
kind: <ClusterRole as stackable_operator::k8s_openapi::Resource>::KIND.to_string(),
683-
name: SPARK_CLUSTER_ROLE.to_string(),
684-
},
685-
subjects: Some(vec![Subject {
686-
api_group: Some(
687-
<ServiceAccount as stackable_operator::k8s_openapi::Resource>::GROUP.to_string(),
688-
),
689-
kind: <ServiceAccount as stackable_operator::k8s_openapi::Resource>::KIND.to_string(),
690-
name: sa.name_any(),
691-
namespace: sa.namespace(),
692-
}]),
693-
};
694-
Ok((sa, binding))
695-
}
696-
697663
// TODO: revisit this
664+
#[allow(clippy::result_large_err)]
698665
fn spark_defaults(
699666
_cs: &v1alpha1::SparkConnectServer,
700667
_rolegroupref: &RoleGroupRef<v1alpha1::SparkConnectServer>,

rust/operator-binary/src/connect/crd.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,13 @@ use stackable_operator::{
3131
use stackable_versioned::versioned;
3232
use strum::{Display, EnumIter};
3333

34-
use crate::crd::constants::{
35-
JVM_SECURITY_PROPERTIES_FILE, OPERATOR_NAME, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME,
36-
VOLUME_MOUNT_PATH_LOG,
34+
use crate::{
35+
connect::{affinity::affinity, jvm::construct_jvm_args},
36+
crd::constants::{
37+
JVM_SECURITY_PROPERTIES_FILE, OPERATOR_NAME, SPARK_DEFAULTS_FILE_NAME,
38+
SPARK_ENV_SH_FILE_NAME, VOLUME_MOUNT_PATH_LOG,
39+
},
3740
};
38-
use crate::{connect::affinity::affinity, connect::jvm::construct_jvm_args};
3941

4042
pub const CONNECT_CONTROLLER_NAME: &str = "connect";
4143
pub const CONNECT_FULL_CONTROLLER_NAME: &str =

rust/operator-binary/src/connect/jvm.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ use stackable_operator::role_utils::{
33
self, GenericRoleConfig, JavaCommonConfig, JvmArgumentOverrides, Role,
44
};
55

6-
use crate::connect::crd::ConnectConfigFragment;
7-
use crate::crd::constants::{
8-
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, METRICS_PORT, VOLUME_MOUNT_PATH_CONFIG,
9-
VOLUME_MOUNT_PATH_LOG_CONFIG,
6+
use crate::{
7+
connect::crd::ConnectConfigFragment,
8+
crd::constants::{
9+
JVM_SECURITY_PROPERTIES_FILE, LOG4J2_CONFIG_FILE, METRICS_PORT, VOLUME_MOUNT_PATH_CONFIG,
10+
VOLUME_MOUNT_PATH_LOG_CONFIG,
11+
},
1012
};
1113

1214
#[derive(Snafu, Debug)]

rust/operator-binary/src/connect/pdb.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use stackable_operator::{
44
commons::pdb::PdbConfig, kube::ResourceExt,
55
};
66

7-
use crate::connect::crd::{v1alpha1, CONNECT_CONTROLLER_NAME, CONNECT_ROLE_NAME};
8-
use crate::crd::constants::{APP_NAME, OPERATOR_NAME};
7+
use crate::{
8+
connect::crd::{v1alpha1, CONNECT_CONTROLLER_NAME, CONNECT_ROLE_NAME},
9+
crd::constants::{APP_NAME, OPERATOR_NAME},
10+
};
911

1012
#[derive(Snafu, Debug)]
1113
pub enum Error {

rust/operator-binary/src/main.rs

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::sync::Arc;
22

33
use clap::{crate_description, crate_version, Parser};
4-
use futures::{pin_mut, StreamExt};
4+
use connect::crd::{SparkConnectServer, CONNECT_FULL_CONTROLLER_NAME};
5+
use futures::{pin_mut, select, StreamExt};
56
use history::history_controller;
67
use product_config::ProductConfigManager;
78
use stackable_operator::{
@@ -70,6 +71,8 @@ async fn main() -> anyhow::Result<()> {
7071
.print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?;
7172
SparkHistoryServer::merged_crd(SparkHistoryServer::V1Alpha1)?
7273
.print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?;
74+
SparkConnectServer::merged_crd(SparkConnectServer::V1Alpha1)?
75+
.print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?;
7376
}
7477
Command::Run(ProductOperatorRun {
7578
product_config,
@@ -246,13 +249,84 @@ async fn main() -> anyhow::Result<()> {
246249
},
247250
);
248251

249-
pin_mut!(app_controller, pod_driver_controller, history_controller);
250-
// kube-runtime's Controller will tokio::spawn each reconciliation, so this only concerns the internal watch machinery
251-
futures::future::select(
252-
futures::future::select(app_controller, pod_driver_controller),
253-
history_controller,
252+
// ==============================
253+
// Create new object because Ctx cannot be cloned
254+
let ctx = Ctx {
255+
client: client.clone(),
256+
product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?,
257+
};
258+
let connect_event_recorder = Arc::new(Recorder::new(
259+
client.as_kube_client(),
260+
Reporter {
261+
controller: CONNECT_FULL_CONTROLLER_NAME.to_string(),
262+
instance: None,
263+
},
264+
));
265+
let connect_controller = Controller::new(
266+
watch_namespace
267+
.get_api::<DeserializeGuard<connect::crd::v1alpha1::SparkConnectServer>>(
268+
&client,
269+
),
270+
watcher::Config::default(),
271+
)
272+
.owns(
273+
watch_namespace
274+
.get_api::<DeserializeGuard<connect::crd::v1alpha1::SparkConnectServer>>(
275+
&client,
276+
),
277+
watcher::Config::default(),
278+
)
279+
.owns(
280+
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
281+
watcher::Config::default(),
282+
)
283+
.owns(
284+
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
285+
watcher::Config::default(),
286+
)
287+
.owns(
288+
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
289+
watcher::Config::default(),
254290
)
255-
.await;
291+
.shutdown_on_signal()
292+
.run(
293+
connect::controller::reconcile,
294+
connect::controller::error_policy,
295+
Arc::new(ctx),
296+
)
297+
.instrument(info_span!("connect_controller"))
298+
// We can let the reporting happen in the background
299+
.for_each_concurrent(
300+
16, // concurrency limit
301+
|result| {
302+
// The event_recorder needs to be shared across all invocations, so that
303+
// events are correctly aggregated
304+
let connect_event_recorder = connect_event_recorder.clone();
305+
async move {
306+
report_controller_reconciled(
307+
&connect_event_recorder,
308+
CONNECT_FULL_CONTROLLER_NAME,
309+
&result,
310+
)
311+
.await;
312+
}
313+
},
314+
);
315+
316+
//
317+
pin_mut!(
318+
app_controller,
319+
pod_driver_controller,
320+
history_controller,
321+
connect_controller
322+
);
323+
// kube-runtime's Controller will tokio::spawn each reconciliation, so this only concerns the internal watch machinery
324+
select! {
325+
r1 = app_controller => r1,
326+
r2 = pod_driver_controller => r2,
327+
r3 = history_controller => r3,
328+
r4 = connect_controller => r4,
329+
};
256330
}
257331
}
258332
Ok(())

0 commit comments

Comments
 (0)