Skip to content

Commit 28dc0e8

Browse files
committed
implement user provided command line args for the connect server
1 parent bb4c852 commit 28dc0e8

File tree

7 files changed

+29
-9
lines changed

7 files changed

+29
-9
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1872,6 +1872,12 @@ spec:
18721872
spec:
18731873
description: A Spark cluster connect server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/connect-server).
18741874
properties:
1875+
args:
1876+
default: []
1877+
description: User provided command line arguments appended to the server entry point.
1878+
items:
1879+
type: string
1880+
type: array
18751881
clusterConfig:
18761882
default:
18771883
listenerClass: cluster-internal

rust/operator-binary/src/connect/controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ pub async fn reconcile(
312312
name: scs.name_unchecked(),
313313
})?;
314314

315-
let args = server::command_args(&resolved_product_image.product_version);
315+
let args = server::command_args(&scs.spec.args, &resolved_product_image.product_version);
316316
let deployment = server::build_deployment(
317317
scs,
318318
&server_config,

rust/operator-binary/src/connect/crd.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ pub mod versioned {
102102
#[serde(default)]
103103
pub cluster_operation: ClusterOperation,
104104

105+
/// User provided command line arguments appended to the server entry point.
106+
#[serde(default)]
107+
pub args: Vec<String>,
108+
105109
/// Name of the Vector aggregator discovery ConfigMap.
106110
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
107111
#[serde(skip_serializing_if = "Option::is_none")]
@@ -151,6 +155,7 @@ pub mod versioned {
151155
pub struct ServerConfig {
152156
#[fragment_attrs(serde(default))]
153157
pub resources: Resources<crate::connect::crd::ConnectStorageConfig, NoRuntimeLimits>,
158+
154159
#[fragment_attrs(serde(default))]
155160
pub logging: Logging<SparkConnectContainer>,
156161

rust/operator-binary/src/connect/executor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,10 @@ pub fn executor_properties(
252252
"spark.kubernetes.executor.limit.cores".to_string(),
253253
max.clone().map(|v| v.0),
254254
);
255-
result.insert("spark.executor.cores".to_string(), min.clone().map(|v| v.0));
255+
result.insert(
256+
"spark.kubernetes.executor.request.cores".to_string(),
257+
min.clone().map(|v| v.0),
258+
);
256259
result.insert(
257260
"spark.executor.memory".to_string(),
258261
limit.clone().map(|v| v.0),

rust/operator-binary/src/connect/server.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,8 +400,8 @@ pub fn build_service(
400400
}
401401

402402
#[allow(clippy::result_large_err)]
403-
pub fn command_args(spark_version: &str) -> Vec<String> {
404-
let command = [
403+
pub fn command_args(user_args: &[String], spark_version: &str) -> Vec<String> {
404+
let mut command = vec![
405405
// ---------- start containerdebug
406406
format!(
407407
"containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &"
@@ -415,6 +415,9 @@ pub fn command_args(spark_version: &str) -> Vec<String> {
415415
format!("--properties-file {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME}"),
416416
];
417417

418+
// User provided command line arguments
419+
command.extend_from_slice(user_args);
420+
418421
vec![command.join(" ")]
419422
}
420423

tests/templates/kuttl/spark-connect/10-assert.yaml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ status:
1212
---
1313
apiVersion: kuttl.dev/v1beta1
1414
kind: TestAssert
15-
timeout: 30
15+
timeout: 90
1616
commands:
1717
# Test that two spark-connect executors are running.
18-
# Sleep 5 to prevent the following spark connect app from failing
18+
# Sleep to prevent the following spark connect app from failing
1919
# while the spark-connect server is busy setting up the executors.
2020
- script: |
21-
sleep 5
21+
sleep 10
2222
EXECUTOR_COUNT=$(kubectl get pods -n $NAMESPACE --selector 'spark-app-name=spark-connect-server' --field-selector='status.phase=Running' -o NAME|wc -l)
23-
test 2 -eq "$EXECUTOR_COUNT"
23+
test 3 -eq "$EXECUTOR_COUNT"

tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,13 @@ spec:
5353
spark:
5454
custom:
5555
configMap: spark-connect-log-config
56+
configOverrides:
57+
spark-defaults.conf:
58+
spark.executor.instances: "3"
5659
executor:
5760
configOverrides:
5861
spark-defaults.conf:
59-
spark.executor.memoryOverhead: 1m
62+
spark.executor.memoryOverhead: "1m"
6063
config:
6164
logging:
6265
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}

0 commit comments

Comments
 (0)