Skip to content

Commit cd44a77

Browse files
committed
implement executor affinity and resource properties
1 parent 7d0483a commit cd44a77

File tree

7 files changed

+135
-16
lines changed

7 files changed

+135
-16
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,6 +1922,36 @@ spec:
19221922
config:
19231923
default: {}
19241924
properties:
1925+
affinity:
1926+
default:
1927+
nodeAffinity: null
1928+
nodeSelector: null
1929+
podAffinity: null
1930+
podAntiAffinity: null
1931+
description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement).
1932+
properties:
1933+
nodeAffinity:
1934+
description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node)
1935+
nullable: true
1936+
type: object
1937+
x-kubernetes-preserve-unknown-fields: true
1938+
nodeSelector:
1939+
additionalProperties:
1940+
type: string
1941+
description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node)
1942+
nullable: true
1943+
type: object
1944+
podAffinity:
1945+
description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node)
1946+
nullable: true
1947+
type: object
1948+
x-kubernetes-preserve-unknown-fields: true
1949+
podAntiAffinity:
1950+
description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node)
1951+
nullable: true
1952+
type: object
1953+
x-kubernetes-preserve-unknown-fields: true
1954+
type: object
19251955
logging:
19261956
default:
19271957
containers: {}

rust/operator-binary/src/connect/controller.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,13 @@ pub async fn reconcile(
282282
})?;
283283

284284
let executor_pod_template = serde_yaml::to_string(
285-
&executor::executor_pod_template(scs, &executor_config, &executor_config_map)
286-
.context(ExecutorPodTemplateSnafu)?,
285+
&executor::executor_pod_template(
286+
scs,
287+
&executor_config,
288+
&resolved_product_image,
289+
&executor_config_map,
290+
)
291+
.context(ExecutorPodTemplateSnafu)?,
287292
)
288293
.context(ExecutorPodTemplateSerdeSnafu)?;
289294

rust/operator-binary/src/connect/crd.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
33
use snafu::{ResultExt, Snafu};
44
use stackable_operator::{
55
commons::{
6+
affinity::{affinity_between_role_pods, StackableAffinity, StackableAffinityFragment},
67
cluster_operation::ClusterOperation,
78
product_image_selection::ProductImage,
89
resources::{
@@ -14,8 +15,8 @@ use stackable_operator::{
1415
fragment::{self, Fragment, ValidationError},
1516
merge::Merge,
1617
},
17-
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
18-
kube::CustomResource,
18+
k8s_openapi::{api::core::v1::PodAntiAffinity, apimachinery::pkg::api::resource::Quantity},
19+
kube::{CustomResource, ResourceExt},
1920
product_logging::{
2021
self,
2122
spec::{
@@ -31,6 +32,9 @@ use stackable_operator::{
3132
use stackable_versioned::versioned;
3233
use strum::{Display, EnumIter};
3334

35+
use super::common::SparkConnectRole;
36+
use crate::crd::constants::APP_NAME;
37+
3438
pub const CONNECT_CONTROLLER_NAME: &str = "connect";
3539
pub const CONNECT_FULL_CONTROLLER_NAME: &str = concatcp!(
3640
CONNECT_CONTROLLER_NAME,
@@ -68,6 +72,7 @@ pub enum Error {
6872

6973
#[versioned(version(name = "v1alpha1"))]
7074
pub mod versioned {
75+
7176
/// A Spark cluster connect server component. This resource is managed by the Stackable operator
7277
/// for Apache Spark. Find more information on how to use it in the
7378
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/connect-server).
@@ -174,6 +179,8 @@ pub mod versioned {
174179
pub resources: Resources<crate::connect::crd::ConnectStorageConfig, NoRuntimeLimits>,
175180
#[fragment_attrs(serde(default))]
176181
pub logging: Logging<crate::connect::crd::SparkConnectContainer>,
182+
#[fragment_attrs(serde(default))]
183+
pub affinity: StackableAffinity,
177184

178185
/// Request secret (currently only autoTls certificates) lifetime from the secret operator, e.g. `7d`, or `30d`.
179186
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
@@ -304,7 +311,7 @@ impl v1alpha1::SparkConnectServer {
304311
}
305312

306313
pub fn executor_config(&self) -> Result<v1alpha1::ExecutorConfig, Error> {
307-
let defaults = v1alpha1::ExecutorConfig::default_config();
314+
let defaults = v1alpha1::ExecutorConfig::default_config(&self.name_unchecked());
308315
fragment::validate(
309316
match self.spec.executor.as_ref().map(|cc| cc.config.clone()) {
310317
Some(fragment) => {
@@ -339,20 +346,22 @@ impl v1alpha1::ExecutorConfig {
339346
// Auto TLS certificate lifetime
340347
const DEFAULT_CONNECT_SECRET_LIFETIME: Duration = Duration::from_days_unchecked(1);
341348

342-
fn default_config() -> v1alpha1::ExecutorConfigFragment {
349+
fn default_config(cluster_name: &str) -> v1alpha1::ExecutorConfigFragment {
343350
v1alpha1::ExecutorConfigFragment {
344351
resources: ResourcesFragment {
345352
cpu: CpuLimitsFragment {
346-
min: Some(Quantity("250m".to_owned())),
353+
min: Some(Quantity("1".to_owned())),
347354
max: Some(Quantity("1".to_owned())),
348355
},
349356
memory: MemoryLimitsFragment {
350-
limit: Some(Quantity("1024Mi".to_owned())),
357+
limit: Some(Quantity("1024M".to_owned())),
351358
runtime_limits: NoRuntimeLimitsFragment {},
352359
},
353360
storage: ConnectStorageConfigFragment {},
354361
},
355362
logging: product_logging::spec::default_logging(),
363+
affinity: v1alpha1::ExecutorConfig::affinity(cluster_name),
364+
356365
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
357366
}
358367
}
@@ -374,4 +383,25 @@ impl v1alpha1::ExecutorConfig {
374383
_ => None,
375384
}
376385
}
386+
387+
fn affinity(cluster_name: &str) -> StackableAffinityFragment {
388+
let affinity_between_role_pods = affinity_between_role_pods(
389+
APP_NAME,
390+
cluster_name,
391+
&SparkConnectRole::Executor.to_string(),
392+
70,
393+
);
394+
395+
StackableAffinityFragment {
396+
pod_affinity: None,
397+
pod_anti_affinity: Some(PodAntiAffinity {
398+
preferred_during_scheduling_ignored_during_execution: Some(vec![
399+
affinity_between_role_pods,
400+
]),
401+
required_during_scheduling_ignored_during_execution: None,
402+
}),
403+
node_affinity: None,
404+
node_selector: None,
405+
}
406+
}
377407
}

rust/operator-binary/src/connect/executor.rs

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use stackable_operator::{
88
meta::ObjectMetaBuilder,
99
pod::{container::ContainerBuilder, volume::VolumeBuilder, PodBuilder},
1010
},
11-
commons::product_image_selection::ResolvedProductImage,
11+
commons::{
12+
product_image_selection::ResolvedProductImage,
13+
resources::{CpuLimits, MemoryLimits, Resources},
14+
},
1215
k8s_openapi::{
1316
api::core::v1::{ConfigMap, EnvVar, PodTemplateSpec},
1417
DeepMerge,
@@ -53,11 +56,8 @@ pub enum Error {
5356
source: stackable_operator::builder::meta::Error,
5457
},
5558

56-
#[snafu(display("failed to build metadata for spark connect pod template config map {name}"))]
57-
MetadataBuild {
58-
source: builder::meta::Error,
59-
name: String,
60-
},
59+
#[snafu(display("failed to build metadata for spark connect executor pod template"))]
60+
PodTemplateMetadataBuild { source: builder::meta::Error },
6161

6262
#[snafu(display("invalid connect container name"))]
6363
InvalidContainerName {
@@ -109,6 +109,7 @@ pub enum Error {
109109
pub fn executor_pod_template(
110110
scs: &v1alpha1::SparkConnectServer,
111111
config: &v1alpha1::ExecutorConfig,
112+
pi: &ResolvedProductImage,
112113
config_map: &ConfigMap,
113114
) -> Result<PodTemplateSpec, Error> {
114115
let container_env = executor_env(
@@ -128,8 +129,19 @@ pub fn executor_pod_template(
128129
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG)
129130
.context(AddVolumeMountSnafu)?;
130131

132+
let metadata = ObjectMetaBuilder::new()
133+
.with_recommended_labels(common::labels(
134+
scs,
135+
&pi.app_version_label,
136+
&SparkConnectRole::Executor.to_string(),
137+
))
138+
.context(PodTemplateMetadataBuildSnafu)?
139+
.build();
140+
131141
let mut template = PodBuilder::new();
132142
template
143+
.metadata(metadata)
144+
.affinity(&config.affinity)
133145
.add_volume(
134146
VolumeBuilder::new(VOLUME_MOUNT_NAME_LOG)
135147
.with_empty_dir(
@@ -226,6 +238,37 @@ pub fn executor_properties(
226238
]
227239
.into();
228240

241+
// ========================================
242+
// Add executor resource properties
243+
let Resources {
244+
cpu: CpuLimits { min, max },
245+
memory: MemoryLimits {
246+
limit,
247+
runtime_limits: _,
248+
},
249+
storage: _,
250+
} = &config.resources;
251+
result.insert(
252+
"spark.kubernetes.executor.limit.cores".to_string(),
253+
max.clone().map(|v| v.0),
254+
);
255+
result.insert("spark.executor.cores".to_string(), min.clone().map(|v| v.0));
256+
result.insert(
257+
"spark.executor.memory".to_string(),
258+
limit.clone().map(|v| v.0),
259+
);
260+
// This ensures that the pod's memory limit is exactly the value
261+
// in `config.resources.memory.limit`.
262+
// By default, Spark computes an `executor.memoryOverhead` as 6-10% from the
263+
// `executor.memory`.
264+
result.insert(
265+
"spark.executor.memoryOverhead".to_string(),
266+
Some("0".to_string()),
267+
);
268+
269+
// ========================================
270+
// Add the user provided executor properties
271+
229272
let config_overrides = scs
230273
.spec
231274
.executor

tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,14 @@ spec:
5353
spark:
5454
custom:
5555
configMap: spark-connect-log-config
56+
executor:
57+
configOverrides:
58+
spark-defaults.conf:
59+
spark.executor.memoryOverhead: 1m
60+
config:
61+
logging:
62+
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
63+
containers:
64+
spark:
65+
custom:
66+
configMap: spark-connect-log-config

0 commit comments

Comments
 (0)