Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 279 additions & 49 deletions Cargo.lock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, missed update on Cargo.nix

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace]
members = ["rust/crd", "rust/operator-binary"]
members = ["rust/operator-binary"]
resolver = "2"

[workspace.package]
Expand All @@ -10,19 +10,21 @@ edition = "2021"
repository = "https://github.com/stackabletech/spark-k8s-operator"

[workspace.dependencies]
stackable-versioned = { git = "https://github.com/stackabletech/operator-rs.git", features = ["k8s"], tag = "stackable-versioned-0.5.0" }
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }

anyhow = "1.0"
built = { version = "0.7", features = ["chrono", "git2"] }
clap = "4.5"
const_format = "0.2"
futures = { version = "0.3", features = ["compat"] }
product-config = { git = "https://github.com/stackabletech/product-config.git", tag = "0.7.0" }
rstest = "0.24"
semver = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
snafu = "0.8"
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.85.0" }
strum = { version = "0.26", features = ["derive"] }
tokio = { version = "1.39", features = ["full"] }
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ spec:
kind: SparkApplication
plural: sparkapplications
shortNames:
- sc
- sparkapp
singular: sparkapplication
scope: Namespaced
versions:
Expand Down Expand Up @@ -998,7 +998,7 @@ spec:
kind: SparkHistoryServer
plural: sparkhistoryservers
shortNames:
- shs
- sparkhist
singular: sparkhistoryserver
scope: Namespaced
versions:
Expand Down
25 changes: 0 additions & 25 deletions rust/crd/Cargo.toml

This file was deleted.

11 changes: 8 additions & 3 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,27 @@ repository.workspace = true
publish = false

[dependencies]
stackable-spark-k8s-crd = { path = "../crd" }
stackable-versioned.workspace = true
stackable-operator.workspace = true
product-config.workspace = true

anyhow.workspace = true
product-config.workspace = true
const_format.workspace = true
semver.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
snafu.workspace = true
stackable-operator.workspace = true
strum.workspace = true
tracing.workspace = true
tracing-futures.workspace = true
clap.workspace = true
futures.workspace = true
tokio.workspace = true

[dev-dependencies]
indoc.workspace = true
rstest.workspace = true

[build-dependencies]
built.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use stackable_operator::{
k8s_openapi::api::core::v1::PodAntiAffinity,
};

use crate::constants::{APP_NAME, HISTORY_ROLE_NAME};
use crate::crd::constants::{APP_NAME, HISTORY_ROLE_NAME};

pub fn history_affinity(cluster_name: &str) -> StackableAffinityFragment {
let affinity_between_role_pods =
Expand Down Expand Up @@ -36,7 +36,7 @@ mod test {
role_utils::RoleGroupRef,
};

use crate::{constants::HISTORY_ROLE_NAME, history::SparkHistoryServer};
use crate::crd::{constants::HISTORY_ROLE_NAME, history::v1alpha1};

#[test]
pub fn test_history_affinity_defaults() {
Expand All @@ -62,7 +62,7 @@ mod test {
"#;

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
let history: v1alpha1::SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();
let expected: StackableAffinity = StackableAffinity {
node_affinity: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ pub const STACKABLE_TRUST_STORE_NAME: &str = "stackable-truststore";
pub const STACKABLE_TLS_STORE_PASSWORD: &str = "changeit";
pub const SYSTEM_TRUST_STORE_PASSWORD: &str = "changeit";
pub const STACKABLE_MOUNT_PATH_TLS: &str = "/stackable/mount_server_tls";
pub const STACKABLE_MOUNT_NAME_TLS: &str = "servertls";

pub const MIN_MEMORY_OVERHEAD: u32 = 384;
pub const JVM_OVERHEAD_FACTOR: f32 = 0.1;
Expand Down
128 changes: 65 additions & 63 deletions rust/crd/src/history.rs → rust/operator-binary/src/crd/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use stackable_operator::{
schemars::{self, JsonSchema},
time::Duration,
};
use stackable_versioned::versioned;
use strum::{Display, EnumIter};

use crate::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir};
use crate::crd::{affinity::history_affinity, constants::*, logdir::ResolvedLogDir};

#[derive(Snafu, Debug)]
pub enum Error {
Expand All @@ -48,62 +49,63 @@ pub enum Error {
CannotRetrieveRoleGroup { role_group: String },
}

/// A Spark cluster history server component. This resource is managed by the Stackable operator
/// for Apache Spark. Find more information on how to use it in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server).
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
#[kube(
group = "spark.stackable.tech",
version = "v1alpha1",
kind = "SparkHistoryServer",
shortname = "shs",
namespaced,
crates(
kube_core = "stackable_operator::kube::core",
k8s_openapi = "stackable_operator::k8s_openapi",
schemars = "stackable_operator::schemars"
)
)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerSpec {
pub image: ProductImage,

/// Global Spark history server configuration that applies to all roles and role groups.
#[serde(default)]
pub cluster_config: SparkHistoryServerClusterConfig,

/// Name of the Vector aggregator discovery ConfigMap.
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_aggregator_config_map_name: Option<String>,

/// The log file directory definition used by the Spark history server.
pub log_file_directory: LogFileDirectorySpec,

/// A map of key/value strings that will be passed directly to Spark when deploying the history server.
#[serde(default)]
pub spark_conf: BTreeMap<String, String>,

/// A history server node role definition.
pub nodes: Role<HistoryConfigFragment>,
}
#[versioned(version(name = "v1alpha1"))]
pub mod versioned {
/// A Spark cluster history server component. This resource is managed by the Stackable operator
/// for Apache Spark. Find more information on how to use it in the
/// [operator documentation](DOCS_BASE_URL_PLACEHOLDER/spark-k8s/usage-guide/history-server).
#[versioned(k8s(
group = "spark.stackable.tech",
shortname = "sparkhist",
namespaced,
crates(
kube_core = "stackable_operator::kube::core",
k8s_openapi = "stackable_operator::k8s_openapi",
schemars = "stackable_operator::schemars"
)
))]
#[derive(Clone, CustomResource, Debug, Deserialize, JsonSchema, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerSpec {
pub image: ProductImage,

/// Global Spark history server configuration that applies to all roles and role groups.
#[serde(default)]
pub cluster_config: v1alpha1::SparkHistoryServerClusterConfig,

/// Name of the Vector aggregator discovery ConfigMap.
/// It must contain the key `ADDRESS` with the address of the Vector aggregator.
#[serde(skip_serializing_if = "Option::is_none")]
pub vector_aggregator_config_map_name: Option<String>,

/// The log file directory definition used by the Spark history server.
pub log_file_directory: LogFileDirectorySpec,

/// A map of key/value strings that will be passed directly to Spark when deploying the history server.
#[serde(default)]
pub spark_conf: BTreeMap<String, String>,

/// A history server node role definition.
pub nodes: Role<HistoryConfigFragment>,
}

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerClusterConfig {
/// This field controls which type of Service the Operator creates for this HistoryServer:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
///
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkHistoryServerClusterConfig {
/// This field controls which type of Service the Operator creates for this HistoryServer:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
///
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
}
}

// TODO: Temporary solution until listener-operator is finished
Expand All @@ -129,7 +131,7 @@ impl CurrentlySupportedListenerClasses {
}
}

impl SparkHistoryServer {
impl v1alpha1::SparkHistoryServer {
/// Returns a reference to the role. Raises an error if the role is not defined.
pub fn role(&self) -> &Role<HistoryConfigFragment> {
&self.spec.nodes
Expand All @@ -138,7 +140,7 @@ impl SparkHistoryServer {
/// Returns a reference to the role group. Raises an error if the role or role group are not defined.
pub fn rolegroup(
&self,
rolegroup_ref: &RoleGroupRef<SparkHistoryServer>,
rolegroup_ref: &RoleGroupRef<Self>,
) -> Result<RoleGroup<HistoryConfigFragment, GenericProductSpecificCommonConfig>, Error> {
self.spec
.nodes
Expand All @@ -152,7 +154,7 @@ impl SparkHistoryServer {

pub fn merged_config(
&self,
rolegroup_ref: &RoleGroupRef<SparkHistoryServer>,
rolegroup_ref: &RoleGroupRef<Self>,
) -> Result<HistoryConfig, Error> {
// Initialize the result with all default values as baseline
let conf_defaults = HistoryConfig::default_config(&self.name_any());
Expand Down Expand Up @@ -184,7 +186,7 @@ impl SparkHistoryServer {
.map(i32::from)
}

pub fn cleaner_rolegroups(&self) -> Vec<RoleGroupRef<SparkHistoryServer>> {
pub fn cleaner_rolegroups(&self) -> Vec<RoleGroupRef<Self>> {
let mut rgs = vec![];
for (rg_name, rg_config) in &self.spec.nodes.role_groups {
if let Some(true) = rg_config.config.config.cleaner {
Expand Down Expand Up @@ -444,7 +446,7 @@ impl HistoryConfig {
}

impl Configuration for HistoryConfigFragment {
type Configurable = SparkHistoryServer;
type Configurable = v1alpha1::SparkHistoryServer;

fn compute_env(
&self,
Expand Down Expand Up @@ -484,7 +486,7 @@ mod test {
};

use super::*;
use crate::logdir::S3LogDir;
use crate::crd::logdir::S3LogDir;

#[test]
pub fn test_env_overrides() {
Expand Down Expand Up @@ -515,7 +517,7 @@ mod test {
"#};

let deserializer = serde_yaml::Deserializer::from_str(input);
let history: SparkHistoryServer =
let history: v1alpha1::SparkHistoryServer =
serde_yaml::with::singleton_map_recursive::deserialize(deserializer).unwrap();

let log_dir = ResolvedLogDir::S3(S3LogDir {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use stackable_operator::{
};
use strum::{EnumDiscriminants, IntoStaticStr};

use crate::{
use crate::crd::{
constants::*,
history::{
LogFileDirectorySpec::{self, S3},
Expand Down Expand Up @@ -133,13 +133,6 @@ impl ResolvedLogDir {
}
}

pub fn credentials(&self) -> Option<SecretClassVolume> {
match self {
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials(),
ResolvedLogDir::Custom(_) => None,
}
}

pub fn credentials_mount_path(&self) -> Option<String> {
match self {
ResolvedLogDir::S3(s3_log_dir) => s3_log_dir.credentials_mount_path(),
Expand Down
Loading