Skip to content

Commit 3f79af6

Browse files
committed
connect: use persistent volumes for listener and move from deplument to stateful set
Decided to move to stateful set because deployments don't support volume claim templates.
1 parent 6914675 commit 3f79af6

File tree

4 files changed

+60
-21
lines changed

4 files changed

+60
-21
lines changed

rust/operator-binary/src/connect/controller.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ use stackable_operator::{
1111
},
1212
logging::controller::ReconcilerError,
1313
status::condition::{
14-
compute_conditions, deployment::DeploymentConditionBuilder,
15-
operations::ClusterOperationsConditionBuilder,
14+
compute_conditions, operations::ClusterOperationsConditionBuilder,
15+
statefulset::StatefulSetConditionBuilder,
1616
},
1717
time::Duration,
1818
};
@@ -264,7 +264,7 @@ pub async fn reconcile(
264264
})?;
265265

266266
let args = server::command_args(&scs.spec.args);
267-
let deployment = server::build_deployment(
267+
let deployment = server::build_stateful_set(
268268
scs,
269269
&server_config,
270270
&resolved_product_image,
@@ -274,7 +274,7 @@ pub async fn reconcile(
274274
)
275275
.context(BuildServerDeploymentSnafu)?;
276276

277-
let mut ss_cond_builder = DeploymentConditionBuilder::default();
277+
let mut ss_cond_builder = StatefulSetConditionBuilder::default();
278278

279279
ss_cond_builder.add(
280280
cluster_resources

rust/operator-binary/src/connect/server.rs

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,20 @@ use stackable_operator::{
77
configmap::ConfigMapBuilder,
88
meta::ObjectMetaBuilder,
99
pod::{
10-
PodBuilder, container::ContainerBuilder, resources::ResourceRequirementsBuilder,
11-
volume::VolumeBuilder,
10+
PodBuilder,
11+
container::ContainerBuilder,
12+
resources::ResourceRequirementsBuilder,
13+
volume::{
14+
ListenerOperatorVolumeSourceBuilder, ListenerOperatorVolumeSourceBuilderError,
15+
ListenerReference, VolumeBuilder,
16+
},
1217
},
1318
},
1419
commons::product_image_selection::ResolvedProductImage,
1520
k8s_openapi::{
1621
DeepMerge,
1722
api::{
18-
apps::v1::{Deployment, DeploymentSpec},
23+
apps::v1::{StatefulSet, StatefulSetSpec},
1924
core::v1::{
2025
ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service,
2126
ServiceAccount, ServicePort, ServiceSpec,
@@ -53,6 +58,11 @@ const HTTP: &str = "http";
5358
#[derive(Snafu, Debug)]
5459
#[allow(clippy::enum_variant_names)]
5560
pub enum Error {
61+
#[snafu(display("failed to build listener volume"))]
62+
BuildListenerVolume {
63+
source: ListenerOperatorVolumeSourceBuilderError,
64+
},
65+
5666
#[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))]
5767
VectorAggregatorConfigMapMissing,
5868

@@ -188,15 +198,14 @@ pub(crate) fn server_config_map(
188198
.context(InvalidConfigMapSnafu { name: cm_name })
189199
}
190200

191-
#[allow(clippy::result_large_err)]
192-
pub(crate) fn build_deployment(
201+
pub(crate) fn build_stateful_set(
193202
scs: &v1alpha1::SparkConnectServer,
194203
config: &v1alpha1::ServerConfig,
195204
resolved_product_image: &ResolvedProductImage,
196205
service_account: &ServiceAccount,
197206
config_map: &ConfigMap,
198207
args: Vec<String>,
199-
) -> Result<Deployment, Error> {
208+
) -> Result<StatefulSet, Error> {
200209
let server_role = SparkConnectRole::Server.to_string();
201210
let recommended_object_labels =
202211
common::labels(scs, &resolved_product_image.app_version_label, &server_role);
@@ -230,12 +239,6 @@ pub(crate) fn build_deployment(
230239
.build(),
231240
)
232241
.context(AddVolumeSnafu)?
233-
.add_listener_volume_by_listener_class(
234-
LISTENER_VOLUME_NAME,
235-
&scs.spec.cluster_config.listener_class.to_string(),
236-
&recommended_labels.clone(),
237-
)
238-
.context(AddVolumeSnafu)?
239242
.security_context(PodSecurityContext {
240243
run_as_user: Some(SPARK_UID),
241244
run_as_group: Some(0),
@@ -320,13 +323,37 @@ pub(crate) fn build_deployment(
320323
}
321324
}
322325

326+
// Add listener volume
327+
let listener_class = &scs.spec.cluster_config.listener_class;
328+
let pvcs = if listener_class.discoverable() {
329+
// externally reachable listener endpoints will use persistent volumes
330+
// so that load balancers can hard-code the target addresses
331+
let pvc = ListenerOperatorVolumeSourceBuilder::new(
332+
&ListenerReference::ListenerClass(listener_class.to_string()),
333+
&recommended_labels,
334+
)
335+
.context(BuildListenerVolumeSnafu)?
336+
.build_pvc(LISTENER_VOLUME_NAME.to_string())
337+
.context(BuildListenerVolumeSnafu)?;
338+
Some(vec![pvc])
339+
} else {
340+
// non-reachable endpoints use ephemeral volumes
341+
pb.add_listener_volume_by_listener_class(
342+
LISTENER_VOLUME_NAME,
343+
&listener_class.to_string(),
344+
&recommended_labels,
345+
)
346+
.context(AddVolumeSnafu)?;
347+
None
348+
};
349+
323350
// Merge user defined pod template if available
324351
let mut pod_template = pb.build_template();
325352
if let Some(pod_overrides_spec) = scs.spec.server.as_ref().map(|s| s.pod_overrides.clone()) {
326353
pod_template.merge_from(pod_overrides_spec);
327354
}
328355

329-
Ok(Deployment {
356+
Ok(StatefulSet {
330357
metadata: ObjectMetaBuilder::new()
331358
.name_and_namespace(scs)
332359
.name(object_name(&scs.name_any(), SparkConnectRole::Server))
@@ -339,9 +366,10 @@ pub(crate) fn build_deployment(
339366
))
340367
.context(MetadataBuildSnafu)?
341368
.build(),
342-
spec: Some(DeploymentSpec {
369+
spec: Some(StatefulSetSpec {
343370
template: pod_template,
344371
replicas: Some(1),
372+
volume_claim_templates: pvcs,
345373
selector: LabelSelector {
346374
match_labels: Some(
347375
Labels::role_group_selector(
@@ -355,9 +383,9 @@ pub(crate) fn build_deployment(
355383
),
356384
..LabelSelector::default()
357385
},
358-
..DeploymentSpec::default()
386+
..StatefulSetSpec::default()
359387
}),
360-
..Deployment::default()
388+
..StatefulSet::default()
361389
})
362390
}
363391

rust/operator-binary/src/crd/listener.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,13 @@ pub enum SupportedListenerClasses {
2323
#[strum(serialize = "external-stable")]
2424
ExternalStable,
2525
}
26+
27+
impl SupportedListenerClasses {
28+
pub fn discoverable(&self) -> bool {
29+
match self {
30+
SupportedListenerClasses::ClusterInternal => false,
31+
SupportedListenerClasses::ExternalUnstable => true,
32+
SupportedListenerClasses::ExternalStable => true,
33+
}
34+
}
35+
}

tests/templates/kuttl/spark-connect/10-assert.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ kind: TestAssert
44
timeout: 300
55
---
66
apiVersion: apps/v1
7-
kind: Deployment
7+
kind: StatefulSet
88
metadata:
99
name: spark-connect-server
1010
status:
@@ -18,6 +18,7 @@ commands:
1818
# Sleep to prevent the following spark connect app from failing
1919
# while the spark-connect server is busy setting up the executors.
2020
- script: |
21+
set -xou pipefail
2122
sleep 10
2223
EXECUTOR_COUNT=$(kubectl get pods -n $NAMESPACE --selector 'spark-app-name=spark-connect-server' --field-selector='status.phase=Running' -o NAME|wc -l)
2324
test 1 -eq "$EXECUTOR_COUNT"

0 commit comments

Comments
 (0)