Skip to content

Commit ecf0c81

Browse files
committed
chore: Version SparkApplication
1 parent 460073d commit ecf0c81

File tree

9 files changed

+323
-69
lines changed

9 files changed

+323
-69
lines changed

Cargo.lock

Lines changed: 272 additions & 26 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,21 @@ edition = "2021"
1010
repository = "https://github.com/stackabletech/spark-k8s-operator"
1111

1212
[workspace.dependencies]
13+
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
14+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
15+
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
16+
1317
anyhow = "1.0"
1418
built = { version = "0.7", features = ["chrono", "git2"] }
1519
clap = "4.5"
1620
const_format = "0.2"
1721
futures = { version = "0.3", features = ["compat"] }
18-
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
1922
rstest = "0.24"
2023
semver = "1.0"
2124
serde = { version = "1.0", features = ["derive"] }
2225
serde_json = "1.0"
2326
serde_yaml = "0.9"
2427
snafu = "0.8"
25-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
2628
strum = { version = "0.26", features = ["derive"] }
2729
tokio = { version = "1.39", features = ["full"] }
2830
tracing = "0.1"

deploy/helm/spark-k8s-operator/crds/crds.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ spec:
1212
kind: SparkApplication
1313
plural: sparkapplications
1414
shortNames:
15-
- sc
15+
- sparkapp
1616
singular: sparkapplication
1717
scope: Namespaced
1818
versions:

rust/operator-binary/Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ repository.workspace = true
99
publish = false
1010

1111
[dependencies]
12+
stackable-versioned.workspace = true
13+
stackable-operator.workspace = true
14+
product-config.workspace = true
15+
1216
anyhow.workspace = true
1317
const_format.workspace = true
14-
product-config.workspace = true
1518
semver.workspace = true
1619
serde.workspace = true
1720
serde_json.workspace = true
1821
serde_yaml.workspace = true
1922
snafu.workspace = true
20-
stackable-operator.workspace = true
2123
strum.workspace = true
2224
tracing.workspace = true
2325
tracing-futures.workspace = true

rust/operator-binary/src/crd/mod.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use stackable_operator::{
4444
time::Duration,
4545
utils::crds::raw_object_list_schema,
4646
};
47+
use stackable_versioned::versioned;
4748

4849
use crate::crd::roles::{
4950
RoleConfig, RoleConfigFragment, SparkApplicationRole, SparkContainer, SparkMode, SubmitConfig,
@@ -126,20 +127,21 @@ pub struct SparkApplicationStatus {
126127
///
127128
/// The SparkApplication CRD looks a little different than the CRDs of the other products on the
128129
/// Stackable Data Platform.
129-
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
130-
#[kube(
131-
group = "spark.stackable.tech",
132-
version = "v1alpha1",
133-
kind = "SparkApplication",
134-
shortname = "sc",
135-
status = "SparkApplicationStatus",
136-
namespaced,
137-
crates(
138-
kube_core = "stackable_operator::kube::core",
139-
k8s_openapi = "stackable_operator::k8s_openapi",
140-
schemars = "stackable_operator::schemars"
130+
#[versioned(
131+
version(name = "v1alpha1"),
132+
k8s(
133+
group = "spark.stackable.tech",
134+
shortname = "sparkapp",
135+
status = "SparkApplicationStatus",
136+
namespaced,
137+
crates(
138+
kube_core = "stackable_operator::kube::core",
139+
k8s_openapi = "stackable_operator::k8s_openapi",
140+
schemars = "stackable_operator::schemars"
141+
)
141142
)
142143
)]
144+
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
143145
#[serde(rename_all = "camelCase")]
144146
pub struct SparkApplicationSpec {
145147
/// Mode: cluster or client. Currently only cluster is supported.
@@ -241,7 +243,7 @@ pub struct JobDependencies {
241243
pub exclude_packages: Vec<String>,
242244
}
243245

244-
impl SparkApplication {
246+
impl v1alpha1::SparkApplication {
245247
/// Returns if this [`SparkApplication`] has already created a Kubernetes Job doing the actual `spark-submit`.
246248
///
247249
/// This is needed because Kubernetes will remove the succeeded Job after some time. When the spark-k8s-operator is
@@ -509,7 +511,7 @@ impl SparkApplication {
509511
&'a self,
510512
app_version: &'a str,
511513
role: &'a str,
512-
) -> ObjectLabels<SparkApplication> {
514+
) -> ObjectLabels<v1alpha1::SparkApplication> {
513515
ObjectLabels {
514516
owner: self,
515517
app_name: APP_NAME,
@@ -1104,7 +1106,7 @@ mod tests {
11041106

11051107
#[test]
11061108
fn test_default_resource_limits() {
1107-
let spark_application = serde_yaml::from_str::<SparkApplication>(indoc! {"
1109+
let spark_application = serde_yaml::from_str::<v1alpha1::SparkApplication>(indoc! {"
11081110
---
11091111
apiVersion: spark.stackable.tech/v1alpha1
11101112
kind: SparkApplication
@@ -1133,7 +1135,7 @@ mod tests {
11331135

11341136
#[test]
11351137
fn test_merged_resource_limits() {
1136-
let spark_application = serde_yaml::from_str::<SparkApplication>(indoc! {r#"
1138+
let spark_application = serde_yaml::from_str::<v1alpha1::SparkApplication>(indoc! {r#"
11371139
---
11381140
apiVersion: spark.stackable.tech/v1alpha1
11391141
kind: SparkApplication
@@ -1310,7 +1312,7 @@ mod tests {
13101312

13111313
#[test]
13121314
fn test_validated_config() {
1313-
let spark_application = serde_yaml::from_str::<SparkApplication>(indoc! {r#"
1315+
let spark_application = serde_yaml::from_str::<v1alpha1::SparkApplication>(indoc! {r#"
13141316
---
13151317
apiVersion: spark.stackable.tech/v1alpha1
13161318
kind: SparkApplication
@@ -1375,7 +1377,7 @@ mod tests {
13751377

13761378
#[test]
13771379
fn test_job_volume_mounts() {
1378-
let spark_application = serde_yaml::from_str::<SparkApplication>(indoc! {r#"
1380+
let spark_application = serde_yaml::from_str::<v1alpha1::SparkApplication>(indoc! {r#"
13791381
---
13801382
apiVersion: spark.stackable.tech/v1alpha1
13811383
kind: SparkApplication

rust/operator-binary/src/crd/roles.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use stackable_operator::{
3838
};
3939
use strum::{Display, EnumIter};
4040

41-
use crate::crd::{ResolvedLogDir, SparkApplication};
41+
use crate::crd::{v1alpha1, ResolvedLogDir};
4242

4343
#[derive(Clone, Debug, Deserialize, Display, Eq, PartialEq, Serialize, JsonSchema)]
4444
#[strum(serialize_all = "kebab-case")]
@@ -155,7 +155,7 @@ impl RoleConfig {
155155
}
156156
pub fn volume_mounts(
157157
&self,
158-
spark_application: &SparkApplication,
158+
spark_application: &v1alpha1::SparkApplication,
159159
s3conn: &Option<S3ConnectionSpec>,
160160
logdir: &Option<ResolvedLogDir>,
161161
) -> Vec<VolumeMount> {
@@ -165,7 +165,7 @@ impl RoleConfig {
165165
}
166166

167167
impl Configuration for RoleConfigFragment {
168-
type Configurable = SparkApplication;
168+
type Configurable = v1alpha1::SparkApplication;
169169

170170
fn compute_env(
171171
&self,
@@ -246,7 +246,7 @@ impl SubmitConfig {
246246
}
247247

248248
impl Configuration for SubmitConfigFragment {
249-
type Configurable = SparkApplication;
249+
type Configurable = v1alpha1::SparkApplication;
250250

251251
fn compute_env(
252252
&self,

rust/operator-binary/src/main.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ use stackable_operator::{
1818
},
1919
},
2020
logging::controller::report_controller_reconciled,
21-
CustomResourceExt,
21+
shared::yaml::SerializeOptions,
22+
YamlSchema,
2223
};
2324
use tracing::info_span;
2425
use tracing_futures::Instrument;
@@ -29,7 +30,7 @@ use crate::crd::{
2930
SPARK_CONTROLLER_NAME, SPARK_FULL_CONTROLLER_NAME,
3031
},
3132
history::SparkHistoryServer,
32-
SparkApplication,
33+
v1alpha1, SparkApplication,
3334
};
3435

3536
mod crd;
@@ -63,8 +64,9 @@ async fn main() -> anyhow::Result<()> {
6364
let opts = Opts::parse();
6465
match opts.cmd {
6566
Command::Crd => {
66-
SparkApplication::print_yaml_schema(built_info::PKG_VERSION)?;
67-
SparkHistoryServer::print_yaml_schema(built_info::PKG_VERSION)?;
67+
SparkApplication::merged_crd(SparkApplication::V1Alpha1)?
68+
.print_yaml_schema(built_info::PKG_VERSION, SerializeOptions::default())?;
69+
// SparkHistoryServer::print_yaml_schema(built_info::PKG_VERSION)?;
6870
}
6971
Command::Run(ProductOperatorRun {
7072
product_config,
@@ -104,7 +106,7 @@ async fn main() -> anyhow::Result<()> {
104106
},
105107
));
106108
let app_controller = Controller::new(
107-
watch_namespace.get_api::<DeserializeGuard<SparkApplication>>(&client),
109+
watch_namespace.get_api::<DeserializeGuard<v1alpha1::SparkApplication>>(&client),
108110
watcher::Config::default(),
109111
)
110112
.owns(

rust/operator-binary/src/pod_driver_controller.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use stackable_operator::{
1313
};
1414
use strum::{EnumDiscriminants, IntoStaticStr};
1515

16-
use crate::crd::{constants::POD_DRIVER_CONTROLLER_NAME, SparkApplication, SparkApplicationStatus};
16+
use crate::crd::{constants::POD_DRIVER_CONTROLLER_NAME, v1alpha1, SparkApplicationStatus};
1717

1818
const LABEL_NAME_INSTANCE: &str = "app.kubernetes.io/instance";
1919

@@ -84,7 +84,7 @@ pub async fn reconcile(pod: Arc<DeserializeGuard<Pod>>, client: Arc<Client>) ->
8484
)?;
8585

8686
let app = client
87-
.get::<SparkApplication>(
87+
.get::<v1alpha1::SparkApplication>(
8888
app_name.as_ref(),
8989
pod.metadata
9090
.namespace

rust/operator-binary/src/spark_k8s_controller.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use crate::{
5959
constants::*,
6060
logdir::ResolvedLogDir,
6161
roles::{RoleConfig, SparkApplicationRole, SparkContainer, SubmitConfig},
62-
tlscerts, to_spark_env_sh_string, SparkApplication, SparkApplicationStatus,
62+
tlscerts, to_spark_env_sh_string, v1alpha1, SparkApplicationStatus,
6363
},
6464
product_logging::{self, resolve_vector_aggregator_address},
6565
Ctx,
@@ -206,7 +206,7 @@ impl ReconcilerError for Error {
206206
}
207207

208208
pub async fn reconcile(
209-
spark_application: Arc<DeserializeGuard<SparkApplication>>,
209+
spark_application: Arc<DeserializeGuard<v1alpha1::SparkApplication>>,
210210
ctx: Arc<Ctx>,
211211
) -> Result<Action> {
212212
tracing::info!("Starting reconcile");
@@ -430,7 +430,7 @@ pub async fn reconcile(
430430
}
431431

432432
fn init_containers(
433-
spark_application: &SparkApplication,
433+
spark_application: &v1alpha1::SparkApplication,
434434
logging: &Logging<SparkContainer>,
435435
s3conn: &Option<S3ConnectionSpec>,
436436
logdir: &Option<ResolvedLogDir>,
@@ -585,7 +585,7 @@ fn init_containers(
585585

586586
#[allow(clippy::too_many_arguments)]
587587
fn pod_template(
588-
spark_application: &SparkApplication,
588+
spark_application: &v1alpha1::SparkApplication,
589589
role: SparkApplicationRole,
590590
config: &RoleConfig,
591591
volumes: &[Volume],
@@ -678,7 +678,7 @@ fn pod_template(
678678

679679
#[allow(clippy::too_many_arguments)]
680680
fn pod_template_config_map(
681-
spark_application: &SparkApplication,
681+
spark_application: &v1alpha1::SparkApplication,
682682
role: SparkApplicationRole,
683683
merged_config: &RoleConfig,
684684
product_config: Option<&HashMap<PropertyNameKind, BTreeMap<String, String>>>,
@@ -797,7 +797,7 @@ fn pod_template_config_map(
797797
}
798798

799799
fn submit_job_config_map(
800-
spark_application: &SparkApplication,
800+
spark_application: &v1alpha1::SparkApplication,
801801
product_config: Option<&HashMap<PropertyNameKind, BTreeMap<String, String>>>,
802802
spark_image: &ResolvedProductImage,
803803
) -> Result<ConfigMap> {
@@ -856,7 +856,7 @@ fn submit_job_config_map(
856856

857857
#[allow(clippy::too_many_arguments)]
858858
fn spark_job(
859-
spark_application: &SparkApplication,
859+
spark_application: &v1alpha1::SparkApplication,
860860
spark_image: &ResolvedProductImage,
861861
serviceaccount: &ServiceAccount,
862862
env: &[EnvVar],
@@ -974,7 +974,7 @@ fn spark_job(
974974
/// Both objects have an owner reference to the SparkApplication, as well as the same name as the app.
975975
/// They are deleted when the job is deleted.
976976
fn build_spark_role_serviceaccount(
977-
spark_app: &SparkApplication,
977+
spark_app: &v1alpha1::SparkApplication,
978978
spark_image: &ResolvedProductImage,
979979
) -> Result<(ServiceAccount, RoleBinding)> {
980980
let sa_name = spark_app.metadata.name.as_ref().unwrap().to_string();
@@ -1029,7 +1029,7 @@ fn security_context() -> PodSecurityContext {
10291029
}
10301030

10311031
pub fn error_policy(
1032-
_obj: Arc<DeserializeGuard<SparkApplication>>,
1032+
_obj: Arc<DeserializeGuard<v1alpha1::SparkApplication>>,
10331033
error: &Error,
10341034
_ctx: Arc<Ctx>,
10351035
) -> Action {

0 commit comments

Comments
 (0)