Skip to content

Commit 6113819

Browse files
committed
cleanup, liveliness probe, do not use the iceberg test for now
1 parent 9d54f20 commit 6113819

File tree

8 files changed

+103
-20
lines changed

8 files changed

+103
-20
lines changed

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1880,11 +1880,11 @@ spec:
18801880
type: array
18811881
clusterConfig:
18821882
default:
1883-
listenerClass: cluster-internal
1883+
listenerClass: external-unstable
18841884
description: Global Spark connect server configuration that applies to all roles and role groups.
18851885
properties:
18861886
listenerClass:
1887-
default: cluster-internal
1887+
default: external-unstable
18881888
description: |-
18891889
This field controls which type of Service the Operator creates for this ConnectServer:
18901890

rust/operator-binary/src/connect/controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ pub async fn reconcile(
312312
name: scs.name_unchecked(),
313313
})?;
314314

315-
let args = server::command_args(&scs.spec.args, &resolved_product_image.product_version);
315+
let args = server::command_args(&scs.spec.args);
316316
let deployment = server::build_deployment(
317317
scs,
318318
&server_config,

rust/operator-binary/src/connect/crd.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ pub mod versioned {
198198
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
199199
#[serde(rename_all = "PascalCase")]
200200
pub enum CurrentlySupportedListenerClasses {
201-
#[default]
202201
#[serde(rename = "cluster-internal")]
203202
ClusterInternal,
203+
#[default]
204204
#[serde(rename = "external-unstable")]
205205
ExternalUnstable,
206206
#[serde(rename = "external-stable")]

rust/operator-binary/src/connect/executor.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,6 @@ pub fn executor_properties(
223223
"spark.executor.defaultJavaOptions".to_string(),
224224
Some(executor_jvm_args(scs, config)?),
225225
),
226-
(
227-
"spark.executor.extraClassPath".to_string(),
228-
Some("/stackable/spark/extra-jars/*".to_string()),
229-
),
230226
(
231227
"spark.kubernetes.executor.podTemplateFile".to_string(),
232228
Some(format!("{VOLUME_MOUNT_PATH_CONFIG}/{POD_TEMPLATE_FILE}")),

rust/operator-binary/src/connect/server.rs

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::collections::{BTreeMap, HashMap};
22

33
use snafu::{OptionExt, ResultExt, Snafu};
44
use stackable_operator::{
5-
builder,
65
builder::{
6+
self,
77
configmap::ConfigMapBuilder,
88
meta::ObjectMetaBuilder,
99
pod::{
@@ -17,11 +17,11 @@ use stackable_operator::{
1717
api::{
1818
apps::v1::{Deployment, DeploymentSpec},
1919
core::v1::{
20-
ConfigMap, EnvVar, PodSecurityContext, Service, ServiceAccount, ServicePort,
21-
ServiceSpec,
20+
ConfigMap, EnvVar, HTTPGetAction, PodSecurityContext, Probe, Service,
21+
ServiceAccount, ServicePort, ServiceSpec,
2222
},
2323
},
24-
apimachinery::pkg::apis::meta::v1::LabelSelector,
24+
apimachinery::pkg::{apis::meta::v1::LabelSelector, util::intstr::IntOrString},
2525
},
2626
kube::{ResourceExt, runtime::reflector::ObjectRef},
2727
kvp::{Label, Labels},
@@ -253,7 +253,9 @@ pub fn build_deployment(
253253
.add_volume_mount(VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_PATH_CONFIG)
254254
.context(AddVolumeMountSnafu)?
255255
.add_volume_mount(VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_PATH_LOG)
256-
.context(AddVolumeMountSnafu)?;
256+
.context(AddVolumeMountSnafu)?
257+
.readiness_probe(probe())
258+
.liveness_probe(probe());
257259

258260
// Add custom log4j config map volumes if configured
259261
if let Some(cm_name) = config.log_config_map() {
@@ -339,10 +341,18 @@ pub fn build_service(
339341
app_version_label: &str,
340342
service_cluster_ip: Option<String>,
341343
) -> Result<Service, Error> {
342-
let (service_name, service_type) = match service_cluster_ip.clone() {
344+
let (service_name, service_type, publish_not_ready_addresses) = match service_cluster_ip.clone()
345+
{
343346
Some(_) => (
347+
// These are the properties of the headless driver service used for the internal
348+
// communication with the executors as recommended by the Spark docs.
349+
//
350+
// The flag `publish_not_ready_addresses` *must* be `true` to allow for readiness
351+
// probes. Without it, the driver runs into a deadlock beacuse the Pod cannot become
352+
// "ready" until the Service is "ready" and vice versa.
344353
object_name(&scs.name_any(), SparkConnectRole::Server),
345354
"ClusterIP".to_string(),
355+
Some(true),
346356
),
347357
None => (
348358
format!(
@@ -351,6 +361,7 @@ pub fn build_service(
351361
SparkConnectRole::Server
352362
),
353363
scs.spec.cluster_config.listener_class.k8s_service_type(),
364+
Some(false),
354365
),
355366
};
356367

@@ -393,14 +404,15 @@ pub fn build_service(
393404
},
394405
]),
395406
selector: Some(selector),
407+
publish_not_ready_addresses,
396408
..ServiceSpec::default()
397409
}),
398410
status: None,
399411
})
400412
}
401413

402414
#[allow(clippy::result_large_err)]
403-
pub fn command_args(user_args: &[String], spark_version: &str) -> Vec<String> {
415+
pub fn command_args(user_args: &[String]) -> Vec<String> {
404416
let mut command = vec![
405417
// ---------- start containerdebug
406418
format!(
@@ -411,7 +423,6 @@ pub fn command_args(user_args: &[String], spark_version: &str) -> Vec<String> {
411423
"--deploy-mode client".to_string(), // 'cluster' mode not supported
412424
"--master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}"
413425
.to_string(),
414-
format!("--jars /stackable/spark/connect/spark-connect_2.12-{spark_version}.jar"),
415426
format!("--properties-file {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME}"),
416427
];
417428

@@ -461,6 +472,7 @@ pub fn server_properties(
461472
pi: &ResolvedProductImage,
462473
) -> Result<BTreeMap<String, Option<String>>, Error> {
463474
let spark_image = pi.image.clone();
475+
let spark_version = pi.product_version.clone();
464476
let service_account_name = service_account.name_unchecked();
465477
let namespace = driver_service
466478
.namespace()
@@ -498,7 +510,7 @@ pub fn server_properties(
498510
),
499511
(
500512
"spark.driver.extraClassPath".to_string(),
501-
Some("/stackable/spark/extra-jars/*".to_string()),
513+
Some(format!("/stackable/spark/extra-jars/*:/stackable/spark/connect/spark-connect_2.12-{spark_version}.jar")),
502514
),
503515
]
504516
.into();
@@ -538,3 +550,16 @@ fn server_jvm_args(
538550
name: scs.name_any(),
539551
})
540552
}
553+
554+
fn probe() -> Probe {
555+
Probe {
556+
http_get: Some(HTTPGetAction {
557+
port: IntOrString::Int(CONNECT_UI_PORT),
558+
scheme: Some("HTTP".to_string()),
559+
path: Some("/metrics".to_string()),
560+
..Default::default()
561+
}),
562+
failure_threshold: Some(10),
563+
..Probe::default()
564+
}
565+
}

tests/templates/kuttl/spark-connect/10-assert.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ apiVersion: kuttl.dev/v1beta1
1414
kind: TestAssert
1515
timeout: 90
1616
commands:
17-
# Test that two spark-connect executors are running.
17+
# Test that spark connect executors are running.
1818
# Sleep to prevent the following spark connect app from failing
1919
# while the spark-connect server is busy setting up the executors.
2020
- script: |
2121
sleep 10
2222
EXECUTOR_COUNT=$(kubectl get pods -n $NAMESPACE --selector 'spark-app-name=spark-connect-server' --field-selector='status.phase=Running' -o NAME|wc -l)
23-
test 3 -eq "$EXECUTOR_COUNT"
23+
test 1 -eq "$EXECUTOR_COUNT"

tests/templates/kuttl/spark-connect/10-deploy-spark-connect.yaml.j2

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ spec:
3535
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
3636
vectorAggregatorConfigMapName: vector-aggregator-discovery
3737
{% endif %}
38+
args:
39+
- --packages org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-connect'].split('.')[:2]) }}_2.12:1.8.1
3840
server:
3941
podOverrides:
4042
spec:
@@ -55,10 +57,16 @@ spec:
5557
configMap: spark-connect-log-config
5658
configOverrides:
5759
spark-defaults.conf:
58-
spark.executor.instances: "3"
60+
spark.jars.ivy: /tmp/ivy2
61+
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
62+
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
63+
spark.sql.catalog.local.type: hadoop
64+
spark.sql.catalog.local.warehouse: /tmp/warehouse
65+
spark.sql.defaultCatalog: local
5966
executor:
6067
configOverrides:
6168
spark-defaults.conf:
69+
spark.executor.instances: "1"
6270
spark.executor.memoryOverhead: "1m"
6371
config:
6472
logging:

tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,44 @@
1+
---
2+
apiVersion: v1
3+
kind: ConfigMap
4+
metadata:
5+
name: simple-connect-app
6+
labels:
7+
stackable.tech/vendor: Stackable
8+
data:
9+
simple-connect-app.py: |
10+
import sys
11+
12+
from pyspark.sql import SparkSession
13+
from pyspark.sql.types import *
14+
15+
remote = sys.argv[1]
16+
spark = (SparkSession.builder
17+
.remote(remote)
18+
.appName("simple-connect-app")
19+
.getOrCreate())
20+
21+
schema = StructType([
22+
StructField("id", LongType(), True),
23+
StructField("data", StringType(), True)
24+
])
25+
26+
27+
# create table
28+
df = spark.createDataFrame([], schema)
29+
df.writeTo("local.db.table").create()
30+
31+
# append to table
32+
data = [
33+
(1,"one"),
34+
(2,"two"),
35+
(3,"three"),
36+
(4,"four")
37+
]
38+
39+
df = spark.createDataFrame(data, schema)
40+
df.writeTo("local.db.table").append()
41+
142
---
243
apiVersion: batch/v1
344
kind: Job
@@ -10,6 +51,10 @@ spec:
1051
spec:
1152
restartPolicy: OnFailure
1253
activeDeadlineSeconds: 100
54+
volumes:
55+
- name: script
56+
configMap:
57+
name: simple-connect-app
1358
containers:
1459
- name: simple-connect-app
1560
{% if test_scenario['values']['spark-connect-client'].find(",") > 0 %}
@@ -18,6 +63,12 @@ spec:
1863
image: oci.stackable.tech/sdp/spark-connect-client:{{ test_scenario['values']['spark-connect-client'] }}-stackable0.0.0-dev
1964
{% endif %}
2065
imagePullPolicy: IfNotPresent
66+
#
67+
# TODO: cannot use the PySpark job from the ConfigMap because it breaks
68+
# with a "iceberg.SparkWrite$WriterFactory" ClassNotfoundException.
69+
# Use the app bundled within spark-connect-client instead
70+
# "/app/simple-connect-app.py",
71+
#
2172
command:
2273
[
2374
"/usr/bin/python",
@@ -31,3 +82,6 @@ spec:
3182
requests:
3283
cpu: 200m
3384
memory: 128Mi
85+
volumeMounts:
86+
- name: script
87+
mountPath: /app

0 commit comments

Comments
 (0)