diff --git a/CHANGELOG.md b/CHANGELOG.md index 0848c4d5..7a2d5602 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to this project will be documented in this file. - Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept. - Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation. - Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`. +- Add Listener integration for Trino ([#753]). - Add support for Trino 476 ([#755]). ### Changed @@ -52,6 +53,7 @@ All notable changes to this project will be documented in this file. [#745]: https://github.com/stackabletech/trino-operator/pull/745 [#748]: https://github.com/stackabletech/trino-operator/pull/748 [#752]: https://github.com/stackabletech/trino-operator/pull/752 +[#753]: https://github.com/stackabletech/trino-operator/pull/753 [#755]: https://github.com/stackabletech/trino-operator/pull/755 [#760]: https://github.com/stackabletech/trino-operator/pull/760 diff --git a/deploy/helm/trino-operator/crds/crds.yaml b/deploy/helm/trino-operator/crds/crds.yaml index fcc5d1f3..887ababd 100644 --- a/deploy/helm/trino-operator/crds/crds.yaml +++ b/deploy/helm/trino-operator/crds/crds.yaml @@ -105,23 +105,6 @@ spec: description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. type: object type: object - listenerClass: - default: cluster-internal - description: |- - This field controls which type of Service the Operator creates for this TrinoCluster: - - * cluster-internal: Use a ClusterIP service - - * external-unstable: Use a NodePort service - - * external-stable: Use a LoadBalancer service - - This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - enum: - - cluster-internal - - external-unstable - - external-stable - type: string tls: default: internalSecretClass: tls @@ -436,11 +419,16 @@ spec: x-kubernetes-preserve-unknown-fields: true roleConfig: default: + listenerClass: cluster-internal podDisruptionBudget: enabled: true maxUnavailable: null description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. properties: + listenerClass: + default: cluster-internal + description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the coordinator. + type: string podDisruptionBudget: default: enabled: true diff --git a/deploy/helm/trino-operator/templates/roles.yaml b/deploy/helm/trino-operator/templates/roles.yaml index 7586c26e..4a268c3a 100644 --- a/deploy/helm/trino-operator/templates/roles.yaml +++ b/deploy/helm/trino-operator/templates/roles.yaml @@ -97,11 +97,16 @@ rules: verbs: - get - apiGroups: - - apiextensions.k8s.io + - listeners.stackable.tech resources: - - customresourcedefinitions + - listeners verbs: - get + - list + - watch + - patch + - create + - delete - apiGroups: - events.k8s.io resources: diff --git a/docs/modules/trino/examples/getting_started/code/trino.yaml b/docs/modules/trino/examples/getting_started/code/trino.yaml index 26a0ca1d..7a521015 100644 --- a/docs/modules/trino/examples/getting_started/code/trino.yaml +++ b/docs/modules/trino/examples/getting_started/code/trino.yaml @@ -10,8 +10,9 @@ spec: catalogLabelSelector: matchLabels: trino: simple-trino - listenerClass: external-unstable coordinators: + roleConfig: + listenerClass: external-unstable roleGroups: default: replicas: 1 diff --git a/docs/modules/trino/examples/getting_started/code/trino.yaml.j2 b/docs/modules/trino/examples/getting_started/code/trino.yaml.j2 index 26a0ca1d..7a521015 100644 --- a/docs/modules/trino/examples/getting_started/code/trino.yaml.j2 +++ b/docs/modules/trino/examples/getting_started/code/trino.yaml.j2 @@ -10,8 +10,9 @@ spec: catalogLabelSelector: matchLabels: trino: simple-trino - listenerClass: external-unstable coordinators: + roleConfig: + listenerClass: external-unstable roleGroups: default: replicas: 1 diff --git a/docs/modules/trino/pages/usage-guide/listenerclass.adoc b/docs/modules/trino/pages/usage-guide/listenerclass.adoc index 2e38886f..148513fd 100644 --- a/docs/modules/trino/pages/usage-guide/listenerclass.adoc +++ b/docs/modules/trino/pages/usage-guide/listenerclass.adoc @@ -1,17 +1,18 @@ = Service exposition with ListenerClasses +:description: Configure Trino service exposure with ListenerClasses: cluster-internal, external-unstable, or external-stable. -Trino offers a web UI and an API, both are exposed by the `connector` xref:concepts:roles-and-role-groups.adoc[role]. -The Operator deploys a service called `-connector` (where `` is the name of the TrinoCluster) through which Trino can be reached. - -This service can have three different types: `cluster-internal`, `external-unstable` and `external-stable`. -Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level. - -This is how the ListenerClass is configured: +The operator deploys a xref:listener-operator:listener.adoc[Listener] for the coodinator pod. +The listener defaults to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.coordinators.roleConfig.listenerClass`: [source,yaml] ---- spec: - clusterConfig: - listenerClass: cluster-internal # <1> + coordinators: + roleConfig: + listenerClass: external-unstable # <1> + ... + workers: + ... ---- -<1> The default `cluster-internal` setting. +<1> Specify a ListenerClass, such as `external-stable`, `external-unstable`, or `cluster-internal` (the default setting is `cluster-internal`). +This can be set only for the coordinator role. diff --git a/rust/operator-binary/src/config/jvm.rs b/rust/operator-binary/src/config/jvm.rs index 5420594c..90ea4230 100644 --- a/rust/operator-binary/src/config/jvm.rs +++ b/rust/operator-binary/src/config/jvm.rs @@ -276,7 +276,7 @@ mod tests { let role = TrinoRole::Coordinator; let rolegroup_ref = role.rolegroup_ref(&trino, "default"); let merged_config = trino.merged_config(&role, &rolegroup_ref, &[]).unwrap(); - let coordinators = trino.spec.coordinators.unwrap(); + let coordinators = trino.role(&role).unwrap(); let product_version = trino.spec.image.product_version(); diff --git a/rust/operator-binary/src/controller.rs b/rust/operator-binary/src/controller.rs index be2928ad..2b610599 100644 --- a/rust/operator-binary/src/controller.rs +++ b/rust/operator-binary/src/controller.rs @@ -88,7 +88,11 @@ use crate::{ authentication::resolve_authentication_classes, catalog, discovery::{TrinoDiscovery, TrinoDiscoveryProtocol, TrinoPodRef}, - v1alpha1, + rolegroup_metrics_service_name, v1alpha1, + }, + listener::{ + LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, build_group_listener, build_group_listener_pvc, + group_listener_name, }, operations::{ add_graceful_shutdown_config, graceful_shutdown_config_properties, pdb::add_pdbs, @@ -126,7 +130,10 @@ pub enum Error { ObjectHasNoNamespace, #[snafu(display("object defines no {role:?} role"))] - MissingTrinoRole { role: String }, + MissingTrinoRole { + source: crate::crd::Error, + role: String, + }, #[snafu(display("failed to create cluster resources"))] CreateClusterResources { @@ -337,16 +344,19 @@ pub enum Error { #[snafu(display("failed to read role"))] ReadRole { source: crate::crd::Error }, - #[snafu(display("failed to get merged jvmArgumentOverrides"))] - GetMergedJvmArgumentOverrides { - source: stackable_operator::role_utils::Error, - }, - #[snafu(display("unable to parse Trino version: {product_version:?}"))] ParseTrinoVersion { source: ParseIntError, product_version: String, }, + + #[snafu(display("failed to apply group listener"))] + ApplyGroupListener { + source: stackable_operator::cluster_resources::Error, + }, + + #[snafu(display("failed to configure listener"))] + ListenerConfiguration { source: crate::listener::Error }, } type Result = std::result::Result; @@ -458,35 +468,28 @@ pub async fn reconcile_trino( None => None, }; - let coordinator_role_service = build_coordinator_role_service(trino, &resolved_product_image)?; - - cluster_resources - .add(client, coordinator_role_service) - .await - .context(ApplyRoleServiceSnafu)?; - create_shared_internal_secret(trino, client).await?; let mut sts_cond_builder = StatefulSetConditionBuilder::default(); for (trino_role_str, role_config) in validated_config { let trino_role = TrinoRole::from_str(&trino_role_str).context(FailedToParseRoleSnafu)?; - let role: &Role = - trino.role(&trino_role).context(ReadRoleSnafu)?; + let role = trino.role(&trino_role).context(ReadRoleSnafu)?; for (role_group, config) in role_config { - let rolegroup = trino_role.rolegroup_ref(trino, &role_group); + let role_group_ref = trino_role.rolegroup_ref(trino, &role_group); let merged_config = trino - .merged_config(&trino_role, &rolegroup, &catalog_definitions) + .merged_config(&trino_role, &role_group_ref, &catalog_definitions) .context(FailedToResolveConfigSnafu)?; - let rg_service = build_rolegroup_service(trino, &resolved_product_image, &rolegroup)?; + let rg_service = + build_rolegroup_service(trino, &resolved_product_image, &role_group_ref)?; let rg_configmap = build_rolegroup_config_map( trino, &resolved_product_image, - role, + &role, &trino_role, - &rolegroup, + &role_group_ref, &config, &merged_config, &trino_authentication_config, @@ -496,14 +499,14 @@ pub async fn reconcile_trino( let rg_catalog_configmap = build_rolegroup_catalog_config_map( trino, &resolved_product_image, - &rolegroup, + &role_group_ref, &catalogs, )?; let rg_stateful_set = build_rolegroup_statefulset( trino, &trino_role, &resolved_product_image, - &rolegroup, + &role_group_ref, &config, &merged_config, &trino_authentication_config, @@ -515,21 +518,21 @@ pub async fn reconcile_trino( .add(client, rg_service) .await .with_context(|_| ApplyRoleGroupServiceSnafu { - rolegroup: rolegroup.clone(), + rolegroup: role_group_ref.clone(), })?; cluster_resources .add(client, rg_configmap) .await .with_context(|_| ApplyRoleGroupConfigSnafu { - rolegroup: rolegroup.clone(), + rolegroup: role_group_ref.clone(), })?; cluster_resources .add(client, rg_catalog_configmap) .await .with_context(|_| ApplyRoleGroupConfigSnafu { - rolegroup: rolegroup.clone(), + rolegroup: role_group_ref.clone(), })?; sts_cond_builder.add( @@ -537,12 +540,34 @@ pub async fn reconcile_trino( .add(client, rg_stateful_set) .await .with_context(|_| ApplyRoleGroupStatefulSetSnafu { - rolegroup: rolegroup.clone(), + rolegroup: role_group_ref.clone(), })?, ); } - let role_config = trino.role_config(&trino_role); + if let Some(listener_class) = trino_role.listener_class_name(trino) { + if let Some(listener_group_name) = group_listener_name(trino, &trino_role) { + let role_group_listener = build_group_listener( + trino, + build_recommended_labels( + trino, + &resolved_product_image.app_version_label, + &trino_role_str, + "none", + ), + listener_class.to_string(), + listener_group_name, + ) + .context(ListenerConfigurationSnafu)?; + + cluster_resources + .add(client, role_group_listener) + .await + .context(ApplyGroupListenerSnafu)?; + } + } + + let role_config = trino.generic_role_config(&trino_role); if let Some(GenericRoleConfig { pod_disruption_budget: pdb, }) = role_config @@ -575,45 +600,6 @@ pub async fn reconcile_trino( Ok(Action::await_change()) } -/// The coordinator-role service is the primary endpoint that should be used by clients that do not -/// perform internal load balancing, including targets outside of the cluster. -pub fn build_coordinator_role_service( - trino: &v1alpha1::TrinoCluster, - resolved_product_image: &ResolvedProductImage, -) -> Result { - let role = TrinoRole::Coordinator; - let role_name = role.to_string(); - let role_svc_name = trino - .role_service_name(&role) - .context(InternalOperatorFailureSnafu)?; - Ok(Service { - metadata: ObjectMetaBuilder::new() - .name_and_namespace(trino) - .name(&role_svc_name) - .ownerreference_from_resource(trino, None, Some(true)) - .context(ObjectMissingMetadataForOwnerRefSnafu)? - .with_recommended_labels(build_recommended_labels( - trino, - &resolved_product_image.app_version_label, - &role_name, - "global", - )) - .context(MetadataBuildSnafu)? - .build(), - spec: Some(ServiceSpec { - ports: Some(service_ports(trino)), - selector: Some( - Labels::role_selector(trino, APP_NAME, &role_name) - .context(LabelBuildSnafu)? - .into(), - ), - type_: Some(trino.spec.cluster_config.listener_class.k8s_service_type()), - ..ServiceSpec::default() - }), - status: None, - }) -} - /// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator #[allow(clippy::too_many_arguments)] fn build_rolegroup_config_map( @@ -854,7 +840,7 @@ fn build_rolegroup_statefulset( trino: &v1alpha1::TrinoCluster, trino_role: &TrinoRole, resolved_product_image: &ResolvedProductImage, - rolegroup_ref: &RoleGroupRef, + role_group_ref: &RoleGroupRef, config: &HashMap>, merged_config: &v1alpha1::TrinoConfig, trino_authentication_config: &TrinoAuthenticationConfig, @@ -865,7 +851,7 @@ fn build_rolegroup_statefulset( .role(trino_role) .context(InternalOperatorFailureSnafu)?; let rolegroup = trino - .rolegroup(rolegroup_ref) + .rolegroup(role_group_ref) .context(InternalOperatorFailureSnafu)?; let mut pod_builder = PodBuilder::new(); @@ -940,6 +926,7 @@ fn build_rolegroup_statefulset( let requested_secret_lifetime = merged_config .requested_secret_lifetime .context(MissingSecretLifetimeSnafu)?; + // add volume mounts depending on the client tls, internal tls, catalogs and authentication tls_volume_mounts( trino, @@ -999,6 +986,36 @@ fn build_rolegroup_statefulset( ) .build(); + // for rw config + let mut persistent_volume_claims = vec![ + merged_config + .resources + .storage + .data + .build_pvc("data", Some(vec!["ReadWriteOnce"])), + ]; + // Add listener + if let Some(group_listener_name) = group_listener_name(trino, trino_role) { + cb_trino + .add_volume_mount(LISTENER_VOLUME_NAME, LISTENER_VOLUME_DIR) + .context(AddVolumeMountSnafu)?; + + // Used for PVC templates that cannot be modified once they are deployed + let unversioned_recommended_labels = Labels::recommended(build_recommended_labels( + trino, + // A version value is required, and we do want to use the "recommended" format for the other desired labels + "none", + &role_group_ref.role, + &role_group_ref.role_group, + )) + .context(LabelBuildSnafu)?; + + persistent_volume_claims.push( + build_group_listener_pvc(&group_listener_name, &unversioned_recommended_labels) + .context(ListenerConfigurationSnafu)?, + ); + } + let container_trino = cb_trino .image_from_product_image(resolved_product_image) .command(vec![ @@ -1058,7 +1075,7 @@ fn build_rolegroup_statefulset( .add_volume(Volume { name: "log-config".to_string(), config_map: Some(ConfigMapVolumeSource { - name: rolegroup_ref.object_name(), + name: role_group_ref.object_name(), ..ConfigMapVolumeSource::default() }), ..Volume::default() @@ -1096,8 +1113,8 @@ fn build_rolegroup_statefulset( .with_recommended_labels(build_recommended_labels( trino, &resolved_product_image.app_version_label, - &rolegroup_ref.role, - &rolegroup_ref.role_group, + &role_group_ref.role, + &role_group_ref.role_group, )) .context(MetadataBuildSnafu)? .with_annotation( @@ -1115,7 +1132,7 @@ fn build_rolegroup_statefulset( .add_volume(Volume { name: "config".to_string(), config_map: Some(ConfigMapVolumeSource { - name: rolegroup_ref.object_name(), + name: role_group_ref.object_name(), ..ConfigMapVolumeSource::default() }), ..Volume::default() @@ -1126,7 +1143,7 @@ fn build_rolegroup_statefulset( .add_volume(Volume { name: "catalog".to_string(), config_map: Some(ConfigMapVolumeSource { - name: format!("{}-catalog", rolegroup_ref.object_name()), + name: format!("{}-catalog", role_group_ref.object_name()), ..ConfigMapVolumeSource::default() }), ..Volume::default() @@ -1149,14 +1166,14 @@ fn build_rolegroup_statefulset( Ok(StatefulSet { metadata: ObjectMetaBuilder::new() .name_and_namespace(trino) - .name(rolegroup_ref.object_name()) + .name(role_group_ref.object_name()) .ownerreference_from_resource(trino, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( trino, &resolved_product_image.app_version_label, - &rolegroup_ref.role, - &rolegroup_ref.role_group, + &role_group_ref.role, + &role_group_ref.role_group, )) .context(MetadataBuildSnafu)? .build(), @@ -1168,23 +1185,19 @@ fn build_rolegroup_statefulset( Labels::role_group_selector( trino, APP_NAME, - &rolegroup_ref.role, - &rolegroup_ref.role_group, + &role_group_ref.role, + &role_group_ref.role_group, ) .context(LabelBuildSnafu)? .into(), ), ..LabelSelector::default() }, - service_name: Some(rolegroup_ref.object_name()), + service_name: Some(rolegroup_metrics_service_name( + &role_group_ref.object_name(), + )), template: pod_template, - volume_claim_templates: Some(vec![ - merged_config - .resources - .storage - .data - .build_pvc("data", Some(vec!["ReadWriteOnce"])), - ]), + volume_claim_templates: Some(persistent_volume_claims), ..StatefulSetSpec::default() }), status: None, @@ -1197,19 +1210,21 @@ fn build_rolegroup_statefulset( fn build_rolegroup_service( trino: &v1alpha1::TrinoCluster, resolved_product_image: &ResolvedProductImage, - rolegroup_ref: &RoleGroupRef, + role_group_ref: &RoleGroupRef, ) -> Result { Ok(Service { metadata: ObjectMetaBuilder::new() .name_and_namespace(trino) - .name(rolegroup_ref.object_name()) + .name(rolegroup_metrics_service_name( + &role_group_ref.object_name(), + )) .ownerreference_from_resource(trino, None, Some(true)) .context(ObjectMissingMetadataForOwnerRefSnafu)? .with_recommended_labels(build_recommended_labels( trino, &resolved_product_image.app_version_label, - &rolegroup_ref.role, - &rolegroup_ref.role_group, + &role_group_ref.role, + &role_group_ref.role_group, )) .context(MetadataBuildSnafu)? .with_label(Label::try_from(("prometheus.io/scrape", "true")).context(LabelBuildSnafu)?) @@ -1218,13 +1233,13 @@ fn build_rolegroup_service( // Internal communication does not need to be exposed type_: Some("ClusterIP".to_string()), cluster_ip: Some("None".to_string()), - ports: Some(service_ports(trino)), + ports: Some(service_ports()), selector: Some( Labels::role_group_selector( trino, APP_NAME, - &rolegroup_ref.role, - &rolegroup_ref.role_group, + &role_group_ref.role, + &role_group_ref.role_group, ) .context(LabelBuildSnafu)? .into(), @@ -1291,30 +1306,28 @@ fn validated_product_config( PropertyNameKind::File(ACCESS_CONTROL_PROPERTIES.to_string()), ]; + let coordinator_role = TrinoRole::Coordinator; roles.insert( - TrinoRole::Coordinator.to_string(), + coordinator_role.to_string(), ( config_files.clone(), trino - .spec - .coordinators - .clone() - .with_context(|| MissingTrinoRoleSnafu { - role: TrinoRole::Coordinator.to_string(), + .role(&coordinator_role) + .with_context(|_| MissingTrinoRoleSnafu { + role: coordinator_role.to_string(), })?, ), ); + let worker_role = TrinoRole::Worker; roles.insert( - TrinoRole::Worker.to_string(), + worker_role.to_string(), ( config_files, trino - .spec - .workers - .clone() - .with_context(|| MissingTrinoRoleSnafu { - role: TrinoRole::Worker.to_string(), + .role(&worker_role) + .with_context(|_| MissingTrinoRoleSnafu { + role: worker_role.to_string(), })?, ), ); @@ -1396,33 +1409,13 @@ fn get_random_base64() -> String { openssl::base64::encode_block(&buf) } -fn service_ports(trino: &v1alpha1::TrinoCluster) -> Vec { - let mut ports = vec![ServicePort { +fn service_ports() -> Vec { + vec![ServicePort { name: Some(METRICS_PORT_NAME.to_string()), port: METRICS_PORT.into(), protocol: Some("TCP".to_string()), ..ServicePort::default() - }]; - - if trino.expose_http_port() { - ports.push(ServicePort { - name: Some(HTTP_PORT_NAME.to_string()), - port: HTTP_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }); - } - - if trino.expose_https_port() { - ports.push(ServicePort { - name: Some(HTTPS_PORT_NAME.to_string()), - port: HTTPS_PORT.into(), - protocol: Some("TCP".to_string()), - ..ServicePort::default() - }); - } - - ports + }] } fn container_ports(trino: &v1alpha1::TrinoCluster) -> Vec { @@ -1734,12 +1727,12 @@ mod tests { TrinoRole::Coordinator.to_string(), ( config_files.clone(), - trino.spec.coordinators.clone().unwrap(), + trino.role(&TrinoRole::Coordinator).unwrap(), ), ), ( TrinoRole::Worker.to_string(), - (config_files, trino.spec.workers.clone().unwrap()), + (config_files, trino.role(&TrinoRole::Worker).unwrap()), ), ] .into(), @@ -1791,7 +1784,7 @@ mod tests { build_rolegroup_config_map( &trino, &resolved_product_image, - role, + &role, &trino_role, &rolegroup_ref, validated_config diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 2465b086..a68bfddc 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -29,7 +29,9 @@ use stackable_operator::{ memory::{BinaryMultiple, MemoryQuantity}, product_config_utils::{Configuration, Error as ConfigError}, product_logging::{self, spec::Logging}, - role_utils::{GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef}, + role_utils::{ + CommonConfiguration, GenericRoleConfig, JavaCommonConfig, Role, RoleGroup, RoleGroupRef, + }, schemars::{self, JsonSchema}, status::condition::{ClusterCondition, HasStatusCondition}, time::Duration, @@ -37,8 +39,9 @@ use stackable_operator::{ versioned::versioned, }; use strum::{Display, EnumIter, EnumString, IntoEnumIterator}; +use v1alpha1::TrinoConfigFragment; -use crate::crd::discovery::TrinoPodRef; +use crate::crd::{discovery::TrinoPodRef, v1alpha1::TrinoCoordinatorRoleConfig}; pub const APP_NAME: &str = "trino"; // ports @@ -115,6 +118,8 @@ pub const MAX_TRINO_LOG_FILES_SIZE: MemoryQuantity = MemoryQuantity { unit: BinaryMultiple::Mebi, }; +pub const METRICS_SERVICE_SUFFIX: &str = "metrics"; + pub const JVM_HEAP_FACTOR: f32 = 0.8; pub const DEFAULT_COORDINATOR_GRACEFUL_SHUTDOWN_TIMEOUT: Duration = @@ -189,13 +194,26 @@ pub mod versioned { // no doc - it's in the struct. #[serde(default, skip_serializing_if = "Option::is_none")] - pub coordinators: Option>, + pub coordinators: + Option>, // no doc - it's in the struct. #[serde(default, skip_serializing_if = "Option::is_none")] pub workers: Option>, } + // TODO: move generic version to op-rs? + #[derive(Clone, Debug, Deserialize, JsonSchema, PartialEq, Serialize)] + #[serde(rename_all = "camelCase")] + pub struct TrinoCoordinatorRoleConfig { + #[serde(flatten)] + pub common: GenericRoleConfig, + + /// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the coordinator. + #[serde(default = "coordinator_default_listener_class")] + pub listener_class: String, + } + #[derive(Clone, Debug, Default, Fragment, JsonSchema, PartialEq)] #[fragment_attrs( derive( @@ -261,20 +279,6 @@ pub mod versioned { /// to learn how to configure log aggregation with Vector. #[serde(skip_serializing_if = "Option::is_none")] pub vector_aggregator_config_map_name: Option, - - /// This field controls which type of Service the Operator creates for this TrinoCluster: - /// - /// * cluster-internal: Use a ClusterIP service - /// - /// * external-unstable: Use a NodePort service - /// - /// * external-stable: Use a LoadBalancer service - /// - /// This is a temporary solution with the goal to keep yaml manifests forward compatible. - /// In the future, this setting will control which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) - /// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change. - #[serde(default)] - pub listener_class: CurrentlySupportedListenerClasses, } #[derive(Clone, Debug, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] @@ -335,29 +339,19 @@ pub mod versioned { } } -// TODO: Temporary solution until listener-operator is finished -#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)] -#[serde(rename_all = "PascalCase")] -pub enum CurrentlySupportedListenerClasses { - #[default] - #[serde(rename = "cluster-internal")] - ClusterInternal, - #[serde(rename = "external-unstable")] - ExternalUnstable, - #[serde(rename = "external-stable")] - ExternalStable, -} - -impl CurrentlySupportedListenerClasses { - pub fn k8s_service_type(&self) -> String { - match self { - CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(), - CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(), - CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(), +impl Default for v1alpha1::TrinoCoordinatorRoleConfig { + fn default() -> Self { + v1alpha1::TrinoCoordinatorRoleConfig { + listener_class: coordinator_default_listener_class(), + common: Default::default(), } } } +fn coordinator_default_listener_class() -> String { + "cluster-internal".to_string() +} + impl Default for v1alpha1::TrinoTls { fn default() -> Self { v1alpha1::TrinoTls { @@ -421,6 +415,17 @@ impl TrinoRole { } roles } + + pub fn listener_class_name(&self, trino: &v1alpha1::TrinoCluster) -> Option { + match self { + Self::Coordinator => trino + .spec + .coordinators + .to_owned() + .map(|coordinator| coordinator.role_config.listener_class), + Self::Worker => None, + } + } } #[derive( @@ -733,11 +738,15 @@ impl v1alpha1::TrinoCluster { pub fn role( &self, role_variant: &TrinoRole, - ) -> Result<&Role, Error> + ) -> Result, Error> { match role_variant { - TrinoRole::Coordinator => self.spec.coordinators.as_ref(), - TrinoRole::Worker => self.spec.workers.as_ref(), + TrinoRole::Coordinator => self + .spec + .coordinators + .to_owned() + .map(extract_role_from_coordinator_config), + TrinoRole::Worker => self.spec.workers.to_owned(), } .with_context(|| CannotRetrieveTrinoRoleSnafu { role: role_variant.to_string(), @@ -748,23 +757,31 @@ impl v1alpha1::TrinoCluster { pub fn rolegroup( &self, rolegroup_ref: &RoleGroupRef, - ) -> Result<&RoleGroup, Error> { - let role_variant = + ) -> Result, Error> { + let trino_role = TrinoRole::from_str(&rolegroup_ref.role).with_context(|_| UnknownTrinoRoleSnafu { role: rolegroup_ref.role.to_owned(), roles: TrinoRole::roles(), })?; - let role = self.role(&role_variant)?; - role.role_groups + + let role_variant = self.role(&trino_role)?; + + role_variant + .role_groups .get(&rolegroup_ref.role_group) + .cloned() .with_context(|| CannotRetrieveTrinoRoleGroupSnafu { role_group: rolegroup_ref.role_group.to_owned(), }) } - pub fn role_config(&self, role: &TrinoRole) -> Option<&GenericRoleConfig> { + pub fn generic_role_config(&self, role: &TrinoRole) -> Option<&GenericRoleConfig> { match role { - TrinoRole::Coordinator => self.spec.coordinators.as_ref().map(|c| &c.role_config), + TrinoRole::Coordinator => self + .spec + .coordinators + .as_ref() + .map(|c| &c.role_config.common), TrinoRole::Worker => self.spec.workers.as_ref().map(|w| &w.role_config), } } @@ -816,12 +833,17 @@ impl v1alpha1::TrinoCluster { .collect::>() .into_iter() .flat_map(move |(rolegroup_name, rolegroup)| { - let rolegroup_ref = TrinoRole::Coordinator.rolegroup_ref(self, rolegroup_name); + let role_group_ref = TrinoRole::Coordinator.rolegroup_ref(self, rolegroup_name); let ns = ns.clone(); (0..rolegroup.replicas.unwrap_or(0)).map(move |i| TrinoPodRef { namespace: ns.clone(), - role_group_service_name: rolegroup_ref.object_name(), - pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), + role_group_service_name: rolegroup_metrics_service_name( + &role_group_ref.object_name(), + ), + pod_name: format!( + "{role_group}-{i}", + role_group = role_group_ref.object_name() + ), }) })) } @@ -918,6 +940,47 @@ impl v1alpha1::TrinoCluster { } } +/// Returns the metrics rolegroup service name `---`. +pub fn rolegroup_metrics_service_name(role_group_ref_object_name: &str) -> String { + format!("{role_group_ref_object_name}-{METRICS_SERVICE_SUFFIX}") +} + +fn extract_role_from_coordinator_config( + fragment: Role, +) -> Role { + Role { + config: CommonConfiguration { + config: fragment.config.config, + config_overrides: fragment.config.config_overrides, + env_overrides: fragment.config.env_overrides, + cli_overrides: fragment.config.cli_overrides, + pod_overrides: fragment.config.pod_overrides, + product_specific_common_config: fragment.config.product_specific_common_config, + }, + role_config: fragment.role_config.common, + role_groups: fragment + .role_groups + .into_iter() + .map(|(k, v)| { + ( + k, + RoleGroup { + config: CommonConfiguration { + config: v.config.config, + config_overrides: v.config.config_overrides, + env_overrides: v.config.env_overrides, + cli_overrides: v.config.cli_overrides, + pod_overrides: v.config.pod_overrides, + product_specific_common_config: v.config.product_specific_common_config, + }, + replicas: v.replicas, + }, + ) + }) + .collect(), + } +} + impl HasStatusCondition for v1alpha1::TrinoCluster { fn conditions(&self) -> Vec { match &self.status { diff --git a/rust/operator-binary/src/listener.rs b/rust/operator-binary/src/listener.rs new file mode 100644 index 00000000..443a7a45 --- /dev/null +++ b/rust/operator-binary/src/listener.rs @@ -0,0 +1,96 @@ +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + builder::{ + meta::ObjectMetaBuilder, + pod::volume::{ListenerOperatorVolumeSourceBuilder, ListenerReference}, + }, + crd::listener::v1alpha1::{Listener, ListenerPort, ListenerSpec}, + k8s_openapi::api::core::v1::PersistentVolumeClaim, + kube::ResourceExt, + kvp::{Labels, ObjectLabels}, +}; + +use crate::crd::{TrinoRole, v1alpha1}; + +pub const LISTENER_VOLUME_NAME: &str = "listener"; +pub const LISTENER_VOLUME_DIR: &str = "/stackable/listener"; + +#[derive(Snafu, Debug)] +pub enum Error { + #[snafu(display("listener object is missing metadata to build owner reference"))] + ObjectMissingMetadataForOwnerRef { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build listener object meta data"))] + BuildObjectMeta { + source: stackable_operator::builder::meta::Error, + }, + + #[snafu(display("failed to build listener volume"))] + BuildListenerPersistentVolume { + source: stackable_operator::builder::pod::volume::ListenerOperatorVolumeSourceBuilderError, + }, +} + +pub fn build_group_listener( + trino: &v1alpha1::TrinoCluster, + object_labels: ObjectLabels, + listener_class: String, + listener_group_name: String, +) -> Result { + Ok(Listener { + metadata: ObjectMetaBuilder::new() + .name_and_namespace(trino) + .name(listener_group_name) + .ownerreference_from_resource(trino, None, Some(true)) + .context(ObjectMissingMetadataForOwnerRefSnafu)? + .with_recommended_labels(object_labels) + .context(BuildObjectMetaSnafu)? + .build(), + spec: ListenerSpec { + class_name: Some(listener_class), + ports: Some(listener_ports(trino)), + ..ListenerSpec::default() + }, + status: None, + }) +} + +pub fn build_group_listener_pvc( + group_listener_name: &str, + unversioned_recommended_labels: &Labels, +) -> Result { + ListenerOperatorVolumeSourceBuilder::new( + &ListenerReference::ListenerName(group_listener_name.to_string()), + unversioned_recommended_labels, + ) + .context(BuildListenerPersistentVolumeSnafu)? + .build_pvc(LISTENER_VOLUME_NAME.to_string()) + .context(BuildListenerPersistentVolumeSnafu) +} + +/// The name of the group-listener provided for a specific role-group. +/// Coordinator(s) will use this group listener so that only one load balancer +/// is needed (per role group). +pub fn group_listener_name(trino: &v1alpha1::TrinoCluster, role: &TrinoRole) -> Option { + match role { + TrinoRole::Coordinator => Some(format!( + "{cluster_name}-{role}", + cluster_name = trino.name_any() + )), + TrinoRole::Worker => None, + } +} + +/// We only use the http/https port here and intentionally omit the metrics one. +fn listener_ports(trino: &v1alpha1::TrinoCluster) -> Vec { + let name = trino.exposed_protocol().to_string(); + let port = trino.exposed_port().into(); + + vec![ListenerPort { + name, + port, + protocol: Some("TCP".to_string()), + }] +} diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 22bd6b0e..25a44ba4 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -5,6 +5,7 @@ mod command; mod config; mod controller; mod crd; +mod listener; mod operations; mod product_logging; diff --git a/tests/templates/kuttl/authentication/05-assert.yaml b/tests/templates/kuttl/authentication/05-assert.yaml index 1257ec56..168d5d8f 100644 --- a/tests/templates/kuttl/authentication/05-assert.yaml +++ b/tests/templates/kuttl/authentication/05-assert.yaml @@ -6,7 +6,7 @@ timeout: 300 apiVersion: apps/v1 kind: StatefulSet metadata: - name: test-trino + name: trino-test-helper status: readyReplicas: 1 replicas: 1 diff --git a/tests/templates/kuttl/authentication/05-install-test-trino.yaml.j2 b/tests/templates/kuttl/authentication/05-install-test-trino.yaml.j2 index 41d2f209..d8fba82c 100644 --- a/tests/templates/kuttl/authentication/05-install-test-trino.yaml.j2 +++ b/tests/templates/kuttl/authentication/05-install-test-trino.yaml.j2 @@ -2,18 +2,18 @@ apiVersion: apps/v1 kind: StatefulSet metadata: - name: test-trino + name: trino-test-helper labels: - app: test-trino + app: trino-test-helper spec: replicas: 1 selector: matchLabels: - app: test-trino + app: trino-test-helper template: metadata: labels: - app: test-trino + app: trino-test-helper spec: serviceAccount: integration-tests-sa {% if test_scenario['values']['openshift'] == 'false' %} @@ -21,7 +21,7 @@ spec: fsGroup: 1000 {% endif %} containers: - - name: test-trino + - name: trino-test-helper image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev command: ["sleep", "infinity"] volumeMounts: @@ -36,4 +36,4 @@ spec: driver: secrets.stackable.tech volumeAttributes: secrets.stackable.tech/class: tls - secrets.stackable.tech/scope: pod + secrets.stackable.tech/scope: pod,node diff --git a/tests/templates/kuttl/authentication/20-assert.yaml b/tests/templates/kuttl/authentication/20-assert.yaml index 0ed9b5ae..8f38517c 100644 --- a/tests/templates/kuttl/authentication/20-assert.yaml +++ b/tests/templates/kuttl/authentication/20-assert.yaml @@ -4,10 +4,10 @@ kind: TestAssert timeout: 300 commands: # file - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-active-workers.py -u test_user_1 -p test_user_1 -n $NAMESPACE -w 1 - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-active-workers.py -u test_user_2_other -p test_user_2_other -n $NAMESPACE -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u test_user_1 -p test_user_1 -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u test_user_2_other -p test_user_2_other -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 # ldap - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-active-workers.py -u integrationtest -p integrationtest -n $NAMESPACE -w 1 - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-active-workers.py -u integrationtest-other -p integrationtest-other -n $NAMESPACE -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u integrationtest -p integrationtest -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u integrationtest-other -p integrationtest-other -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 # oidc/oauth2 - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-oauth-login.py https://trino-coordinator-default.$NAMESPACE.svc.cluster.local:8443/ui/ + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-oauth-login.py https://trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local:8443/ui/ diff --git a/tests/templates/kuttl/authentication/20-test-trino.yaml b/tests/templates/kuttl/authentication/20-test-trino.yaml index 75643e64..f215bfb9 100644 --- a/tests/templates/kuttl/authentication/20-test-trino.yaml +++ b/tests/templates/kuttl/authentication/20-test-trino.yaml @@ -2,5 +2,5 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - script: kubectl cp -n $NAMESPACE ./check-active-workers.py test-trino-0:/tmp - - script: kubectl cp -n $NAMESPACE ./check-oauth-login.py test-trino-0:/tmp + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-active-workers.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ./check-oauth-login.py trino-test-helper-0:/tmp diff --git a/tests/templates/kuttl/authentication/31-assert.yaml b/tests/templates/kuttl/authentication/31-assert.yaml index f3aa92eb..5b2948fa 100644 --- a/tests/templates/kuttl/authentication/31-assert.yaml +++ b/tests/templates/kuttl/authentication/31-assert.yaml @@ -5,4 +5,4 @@ timeout: 600 commands: # file # new user? - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -n $NAMESPACE -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1 diff --git a/tests/templates/kuttl/authentication/33-assert.yaml b/tests/templates/kuttl/authentication/33-assert.yaml index 7a84cfde..5794c83e 100644 --- a/tests/templates/kuttl/authentication/33-assert.yaml +++ b/tests/templates/kuttl/authentication/33-assert.yaml @@ -5,4 +5,4 @@ timeout: 600 commands: # We use the check-active-workers script for the login. Since we do want to wait until we cannot log in anymore # we flip the return value in the end. - - script: kubectl exec -n $NAMESPACE test-trino-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -n $NAMESPACE -w 1; if [ $? -eq 0 ]; then exit 1; fi + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u hot_reloaded -p hot_reloaded -c trino-coordinator-default-metrics.$NAMESPACE.svc.cluster.local -w 1; if [ $? -eq 0 ]; then exit 1; fi diff --git a/tests/templates/kuttl/authentication/check-active-workers.py b/tests/templates/kuttl/authentication/check-active-workers.py deleted file mode 100755 index 993f4baf..00000000 --- a/tests/templates/kuttl/authentication/check-active-workers.py +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -import trino -import argparse -import sys - -if not sys.warnoptions: - import warnings -warnings.simplefilter("ignore") - - -def get_connection(username, password, namespace): - host = 'trino-coordinator-default-0.trino-coordinator-default.' + namespace + '.svc.cluster.local' - conn = trino.dbapi.connect( - host=host, - port=8443, - user=username, - http_scheme='https', - auth=trino.auth.BasicAuthentication(username, password), - ) - conn._http_session.verify = False - return conn - - -if __name__ == '__main__': - # Construct an argument parser - all_args = argparse.ArgumentParser() - - # Add arguments to the parser - all_args.add_argument("-u", "--user", required=True, - help="Username to connect as") - all_args.add_argument("-p", "--password", required=True, - help="Password for the user") - all_args.add_argument("-n", "--namespace", required=True, - help="Namespace the test is running in") - all_args.add_argument("-w", "--workers", required=True, - help="Expected amount of workers to be present") - - args = vars(all_args.parse_args()) - - expected_workers = args['workers'] - conn = get_connection(args['user'], args['password'], args['namespace']) - - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) as nodes FROM system.runtime.nodes WHERE coordinator=false AND state='active'") - - (active_workers,) = cursor.fetchone() - - if int(active_workers) != int(expected_workers): - print("Missmatch: [expected/active] workers [" + str(expected_workers) + "/" + str(active_workers) + "]") - exit(-1) - - print("Test check-active-workers.py succeeded!") diff --git a/tests/templates/kuttl/authentication/check-oauth-login.py b/tests/templates/kuttl/authentication/check-oauth-login.py index d2c5ceb8..7729f0fa 100644 --- a/tests/templates/kuttl/authentication/check-oauth-login.py +++ b/tests/templates/kuttl/authentication/check-oauth-login.py @@ -5,6 +5,7 @@ of the Keycloak page and posts the credentials of a test user to it. Finally it tests that Keycloak redirects back to the original page. """ + import logging import requests import sys @@ -19,17 +20,15 @@ def test_login_flow(login_url): result.raise_for_status() - html = BeautifulSoup(result.text, 'html.parser') - authenticate_url = html.form['action'] - result = session.post(authenticate_url, data={ - 'username': "test", - 'password': "test" - }) + html = BeautifulSoup(result.text, "html.parser") + authenticate_url = html.form["action"] + result = session.post( + authenticate_url, data={"username": "test", "password": "test"} + ) result.raise_for_status() - assert result.url == login_url, \ - "Redirection to the Trino UI expected" + assert result.url == login_url, "Redirection to the Trino UI expected" def main(): diff --git a/tests/templates/kuttl/smoke_aws/check-active-workers.py b/tests/templates/kuttl/commons/check-active-workers.py similarity index 84% rename from tests/templates/kuttl/smoke_aws/check-active-workers.py rename to tests/templates/kuttl/commons/check-active-workers.py index 1367e1d7..431d2dee 100755 --- a/tests/templates/kuttl/smoke_aws/check-active-workers.py +++ b/tests/templates/kuttl/commons/check-active-workers.py @@ -8,14 +8,9 @@ warnings.simplefilter("ignore") -def get_connection(username, password, namespace): - host = ( - "trino-coordinator-default-0.trino-coordinator-default." - + namespace - + ".svc.cluster.local" - ) +def get_connection(username, password, coordinator): conn = trino.dbapi.connect( - host=host, + host=coordinator, port=8443, user=username, http_scheme="https", @@ -36,7 +31,10 @@ def get_connection(username, password, namespace): "-p", "--password", required=True, help="Password for the user" ) all_args.add_argument( - "-n", "--namespace", required=True, help="Namespace the test is running in" + "-c", + "--coordinator", + required=True, + help="Trino Coordinator Host to connect to", ) all_args.add_argument( "-w", @@ -48,7 +46,7 @@ def get_connection(username, password, namespace): args = vars(all_args.parse_args()) expected_workers = args["workers"] - conn = get_connection(args["user"], args["password"], args["namespace"]) + conn = get_connection(args["user"], args["password"], args["coordinator"]) cursor = conn.cursor() cursor.execute( diff --git a/tests/templates/kuttl/commons/check-metrics.py b/tests/templates/kuttl/commons/check-metrics.py index 9a4a42c5..7fd11bda 100644 --- a/tests/templates/kuttl/commons/check-metrics.py +++ b/tests/templates/kuttl/commons/check-metrics.py @@ -68,17 +68,16 @@ def check_monitoring(hosts): if __name__ == "__main__": all_args = argparse.ArgumentParser(description="Test Trino metrics.") all_args.add_argument( - "-n", "--namespace", help="The namespace to run in", required=True + "-c", "--coordinator", help="The coordinator service name", required=True ) - args = vars(all_args.parse_args()) - namespace = args["namespace"] - - host_coordinator_0 = f"trino-coordinator-default-0.trino-coordinator-default.{namespace}.svc.cluster.local" - host_worker_0 = ( - f"trino-worker-default-0.trino-worker-default.{namespace}.svc.cluster.local" + all_args.add_argument( + "-w", "--worker", help="The worker service name", required=True ) + args = vars(all_args.parse_args()) + service_coordinator = args["coordinator"] + service_worker = args["worker"] - hosts = [host_coordinator_0, host_worker_0] + hosts = [service_coordinator, service_worker] check_monitoring(hosts) diff --git a/tests/templates/kuttl/smoke_aws/check-opa.py b/tests/templates/kuttl/commons/check-opa.py similarity index 69% rename from tests/templates/kuttl/smoke_aws/check-opa.py rename to tests/templates/kuttl/commons/check-opa.py index e45e8511..9cec9621 100755 --- a/tests/templates/kuttl/smoke_aws/check-opa.py +++ b/tests/templates/kuttl/commons/check-opa.py @@ -10,14 +10,9 @@ warnings.simplefilter("ignore") -def get_connection(username, password, namespace): - host = ( - "trino-coordinator-default-0.trino-coordinator-default." - + namespace - + ".svc.cluster.local" - ) +def get_connection(username, password, coordinator): conn = trino.dbapi.connect( - host=host, + host=coordinator, port=8443, user=username, http_scheme="https", @@ -31,8 +26,8 @@ def get_connection(username, password, namespace): return conn -def test_user(user, password, namespace, query): - conn = get_connection(user, password, namespace) +def test_user(user, password, coordinator, query): + conn = get_connection(user, password, coordinator) cursor = conn.cursor() try: cursor.execute(query) @@ -47,26 +42,29 @@ def main(): all_args = argparse.ArgumentParser() # Add arguments to the parser all_args.add_argument( - "-n", "--namespace", required=True, help="Namespace the test is running in" + "-c", + "--coordinator", + required=True, + help="Trino Coordinator Host to connect to", ) args = vars(all_args.parse_args()) - namespace = args["namespace"] + coordinator = args["coordinator"] # We expect the admin user query to pass - if not test_user("admin", "admin", namespace, "SHOW CATALOGS"): + if not test_user("admin", "admin", coordinator, "SHOW CATALOGS"): print("User admin cannot show catalogs!") sys.exit(-1) # We expect the admin user query to pass - if not test_user("admin", "admin", namespace, "SHOW SCHEMAS FROM system"): + if not test_user("admin", "admin", coordinator, "SHOW SCHEMAS FROM system"): print("User admin cannot select schemas from system") sys.exit(-1) # We expect the bob query for catalogs to pass - if not test_user("bob", "bob", namespace, "SHOW CATALOGS"): + if not test_user("bob", "bob", coordinator, "SHOW CATALOGS"): print("User bob cannot show catalogs!") sys.exit(-1) # We expect the bob query for schemas to fail - if test_user("bob", "bob", namespace, "SHOW SCHEMAS FROM system"): + if test_user("bob", "bob", coordinator, "SHOW SCHEMAS FROM system"): print("User bob can show schemas from system. This should not be happening!") sys.exit(-1) diff --git a/tests/templates/kuttl/smoke_aws/check-s3.py b/tests/templates/kuttl/commons/check-s3.py similarity index 93% rename from tests/templates/kuttl/smoke_aws/check-s3.py rename to tests/templates/kuttl/commons/check-s3.py index 296d99d4..9c2474f6 100755 --- a/tests/templates/kuttl/smoke_aws/check-s3.py +++ b/tests/templates/kuttl/commons/check-s3.py @@ -9,18 +9,12 @@ warnings.simplefilter("ignore") -def get_connection(username, password, namespace): - host = ( - "trino-coordinator-default-0.trino-coordinator-default." - + namespace - + ".svc.cluster.local" - ) +def get_connection(username, password, coordinator): # If you want to debug this locally use # kubectl -n kuttl-test-XXX port-forward svc/trino-coordinator-default 8443 # host = '127.0.0.1' - conn = trino.dbapi.connect( - host=host, + host=coordinator, port=8443, user=username, http_scheme="https", @@ -43,14 +37,21 @@ def run_query(connection, query): all_args = argparse.ArgumentParser() # Add arguments to the parser all_args.add_argument( - "-n", "--namespace", required=True, help="Namespace the test is running in" + "-c", + "--coordinator", + required=True, + help="Trino Coordinator Host to connect to", + ) + all_args.add_argument( + "-b", "--bucket", required=True, help="The S3 bucket name to use" ) args = vars(all_args.parse_args()) - namespace = args["namespace"] + coordinator = args["coordinator"] + bucket_name = args["bucket"] print("Starting S3 tests...") - connection = get_connection("admin", "admin", namespace) + connection = get_connection("admin", "admin", coordinator) trino_version = run_query( connection, @@ -65,10 +66,9 @@ def run_query(connection, query): assert trino_product_version.isnumeric() assert trino_version == run_query(connection, "select version()")[0][0] - # WARNING (@NickLarsenNZ): Hard-coded bucket run_query( connection, - "CREATE SCHEMA IF NOT EXISTS hive.s3 WITH (location = 's3a://my-bucket/')", + f"CREATE SCHEMA IF NOT EXISTS hive.s3 WITH (location = 's3a://{bucket_name}/')", ) run_query(connection, "DROP TABLE IF EXISTS hive.s3.taxi_data") @@ -79,7 +79,7 @@ def run_query(connection, query): run_query( connection, - """ + f""" CREATE TABLE IF NOT EXISTS hive.s3.taxi_data ( vendor_id VARCHAR, tpep_pickup_datetime VARCHAR, @@ -88,7 +88,7 @@ def run_query(connection, query): trip_distance VARCHAR, ratecode_id VARCHAR ) WITH ( - external_location = 's3a://sble-s3-smoke-bucket-1/taxi-data/', + external_location = 's3a://{bucket_name}/taxi-data/', format = 'csv', skip_header_line_count = 1 ) diff --git a/tests/templates/kuttl/listener/00-assert.yaml.j2 b/tests/templates/kuttl/listener/00-assert.yaml.j2 new file mode 100644 index 00000000..50b1d4c3 --- /dev/null +++ b/tests/templates/kuttl/listener/00-assert.yaml.j2 @@ -0,0 +1,10 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +{% endif %} diff --git a/tests/templates/kuttl/listener/00-install-vector-aggregator-discovery-configmap.yaml.j2 b/tests/templates/kuttl/listener/00-install-vector-aggregator-discovery-configmap.yaml.j2 new file mode 100644 index 00000000..2d6a0df5 --- /dev/null +++ b/tests/templates/kuttl/listener/00-install-vector-aggregator-discovery-configmap.yaml.j2 @@ -0,0 +1,9 @@ +{% if lookup('env', 'VECTOR_AGGREGATOR') %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: vector-aggregator-discovery +data: + ADDRESS: {{ lookup('env', 'VECTOR_AGGREGATOR') }} +{% endif %} diff --git a/tests/templates/kuttl/listener/00-patch-ns.yaml.j2 b/tests/templates/kuttl/listener/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/listener/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/listener/00-rbac.yaml.j2 b/tests/templates/kuttl/listener/00-rbac.yaml.j2 new file mode 100644 index 00000000..9cbf0351 --- /dev/null +++ b/tests/templates/kuttl/listener/00-rbac.yaml.j2 @@ -0,0 +1,29 @@ +--- +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +rules: +{% if test_scenario['values']['openshift'] == "true" %} + - apiGroups: ["security.openshift.io"] + resources: ["securitycontextconstraints"] + resourceNames: ["privileged"] + verbs: ["use"] +{% endif %} +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: integration-tests-sa +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: use-integration-tests-scc +subjects: + - kind: ServiceAccount + name: integration-tests-sa +roleRef: + kind: Role + name: use-integration-tests-scc + apiGroup: rbac.authorization.k8s.io diff --git a/tests/templates/kuttl/listener/10-assert.yaml b/tests/templates/kuttl/listener/10-assert.yaml new file mode 100644 index 00000000..24ae199a --- /dev/null +++ b/tests/templates/kuttl/listener/10-assert.yaml @@ -0,0 +1,59 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 600 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-trino-coordinator-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-trino-worker-default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-trino-coordinator +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: test-trino-worker +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: test-trino-coordinator +spec: + type: NodePort # external-unstable - by listener op +--- +apiVersion: v1 +kind: Service +metadata: + name: test-trino-coordinator-default-metrics +spec: + type: ClusterIP # cluster-internal - by trino op +--- +apiVersion: v1 +kind: Service +metadata: + name: test-trino-worker-default-metrics +spec: + type: ClusterIP # cluster-internal - by trino op diff --git a/tests/templates/kuttl/listener/10-install-trino.yaml.j2 b/tests/templates/kuttl/listener/10-install-trino.yaml.j2 new file mode 100644 index 00000000..c16fd689 --- /dev/null +++ b/tests/templates/kuttl/listener/10-install-trino.yaml.j2 @@ -0,0 +1,52 @@ +--- +apiVersion: authentication.stackable.tech/v1alpha1 +kind: AuthenticationClass +metadata: + name: password +spec: + provider: + static: + userCredentialsSecret: + name: test-users +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-users +stringData: + admin: admin +--- +apiVersion: trino.stackable.tech/v1alpha1 +kind: TrinoCluster +metadata: + name: test-trino +spec: + image: +{% if test_scenario['values']['trino'].find(",") > 0 %} + custom: "{{ test_scenario['values']['trino'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['trino'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['trino'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + authentication: + - authenticationClass: password + catalogLabelSelector: {} +{% if lookup('env', 'VECTOR_AGGREGATOR') %} + vectorAggregatorConfigMapName: vector-aggregator-discovery +{% endif %} + coordinators: + roleConfig: + listenerClass: external-unstable + config: + gracefulShutdownTimeout: 5s # Let the test run faster + roleGroups: + default: + replicas: 1 + workers: + config: + gracefulShutdownTimeout: 5s # Let the test run faster + roleGroups: + default: + replicas: 1 diff --git a/tests/templates/kuttl/listener/20-assert.yaml b/tests/templates/kuttl/listener/20-assert.yaml new file mode 100644 index 00000000..168d5d8f --- /dev/null +++ b/tests/templates/kuttl/listener/20-assert.yaml @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-test-helper +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/listener/20-install-check.yaml b/tests/templates/kuttl/listener/20-install-check.yaml new file mode 100644 index 00000000..cbd9fb4c --- /dev/null +++ b/tests/templates/kuttl/listener/20-install-check.yaml @@ -0,0 +1,29 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: trino-test-helper + labels: + app: trino-test-helper +spec: + replicas: 1 + selector: + matchLabels: + app: trino-test-helper + template: + metadata: + labels: + app: trino-test-helper + spec: + serviceAccount: integration-tests-sa + containers: + - name: trino-test-helper + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + command: ["sleep", "infinity"] + resources: + requests: + cpu: "250m" + memory: "64Mi" + limits: + cpu: "500m" + memory: "64Mi" diff --git a/tests/templates/kuttl/listener/21-assert.yaml b/tests/templates/kuttl/listener/21-assert.yaml new file mode 100644 index 00000000..d9f9be94 --- /dev/null +++ b/tests/templates/kuttl/listener/21-assert.yaml @@ -0,0 +1,6 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 300 +commands: + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -c test-trino-coordinator -w 1 diff --git a/tests/templates/kuttl/listener/21-copy-scripts.yaml b/tests/templates/kuttl/listener/21-copy-scripts.yaml new file mode 100644 index 00000000..9a67ea90 --- /dev/null +++ b/tests/templates/kuttl/listener/21-copy-scripts.yaml @@ -0,0 +1,5 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-active-workers.py trino-test-helper-0:/tmp || true diff --git a/tests/templates/kuttl/logging/test_log_aggregation.py b/tests/templates/kuttl/logging/test_log_aggregation.py index 83d653cd..811af20b 100755 --- a/tests/templates/kuttl/logging/test_log_aggregation.py +++ b/tests/templates/kuttl/logging/test_log_aggregation.py @@ -4,9 +4,9 @@ def check_sent_events(): response = requests.post( - 'http://trino-vector-aggregator:8686/graphql', + "http://trino-vector-aggregator:8686/graphql", json={ - 'query': """ + "query": """ { transforms(first:100) { nodes { @@ -20,29 +20,30 @@ def check_sent_events(): } } """ - } + }, ) - assert response.status_code == 200, \ - 'Cannot access the API of the vector aggregator.' + assert response.status_code == 200, ( + "Cannot access the API of the vector aggregator." + ) result = response.json() - transforms = result['data']['transforms']['nodes'] + transforms = result["data"]["transforms"]["nodes"] for transform in transforms: - sentEvents = transform['metrics']['sentEventsTotal'] - componentId = transform['componentId'] + sentEvents = transform["metrics"]["sentEventsTotal"] + componentId = transform["componentId"] - if componentId == 'filteredInvalidEvents': - assert sentEvents is None or \ - sentEvents['sentEventsTotal'] == 0, \ - 'Invalid log events were sent.' + if componentId == "filteredInvalidEvents": + assert sentEvents is None or sentEvents["sentEventsTotal"] == 0, ( + "Invalid log events were sent." + ) else: - assert sentEvents is not None and \ - sentEvents['sentEventsTotal'] > 0, \ + assert sentEvents is not None and sentEvents["sentEventsTotal"] > 0, ( f'No events were sent in "{componentId}".' + ) -if __name__ == '__main__': +if __name__ == "__main__": check_sent_events() - print('Test successful!') + print("Test successful!") diff --git a/tests/templates/kuttl/smoke/21-assert.yaml b/tests/templates/kuttl/smoke/21-assert.yaml index b3f78bfc..1619480f 100644 --- a/tests/templates/kuttl/smoke/21-assert.yaml +++ b/tests/templates/kuttl/smoke/21-assert.yaml @@ -3,7 +3,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 commands: - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -n $NAMESPACE -w 1 - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-opa.py -n $NAMESPACE - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-s3.py -n $NAMESPACE - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-metrics.py -n $NAMESPACE + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -c trino-coordinator -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-opa.py -c trino-coordinator + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-s3.py -c trino-coordinator -b trino + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-metrics.py -c trino-coordinator-default-metrics -w trino-worker-default-metrics diff --git a/tests/templates/kuttl/smoke/21-copy-scripts.yaml b/tests/templates/kuttl/smoke/21-copy-scripts.yaml index fc51e8f8..9e78256f 100644 --- a/tests/templates/kuttl/smoke/21-copy-scripts.yaml +++ b/tests/templates/kuttl/smoke/21-copy-scripts.yaml @@ -2,7 +2,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - script: kubectl cp -n $NAMESPACE ./check-active-workers.py trino-test-helper-0:/tmp || true - - script: kubectl cp -n $NAMESPACE ./check-opa.py trino-test-helper-0:/tmp || true - - script: kubectl cp -n $NAMESPACE ./check-s3.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-active-workers.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-opa.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-s3.py trino-test-helper-0:/tmp || true - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-metrics.py trino-test-helper-0:/tmp || true diff --git a/tests/templates/kuttl/smoke/31-assert.yaml b/tests/templates/kuttl/smoke/31-assert.yaml index fa6250c7..b70ce2a1 100644 --- a/tests/templates/kuttl/smoke/31-assert.yaml +++ b/tests/templates/kuttl/smoke/31-assert.yaml @@ -3,7 +3,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 600 commands: - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -n $NAMESPACE -w 2 - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-opa.py -n $NAMESPACE - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-s3.py -n $NAMESPACE - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-metrics.py -n $NAMESPACE + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -c trino-coordinator -w 2 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-opa.py -c trino-coordinator + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-s3.py -c trino-coordinator -b trino + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-metrics.py -c trino-coordinator-default-metrics -w trino-worker-default-metrics diff --git a/tests/templates/kuttl/smoke/check-active-workers.py b/tests/templates/kuttl/smoke/check-active-workers.py deleted file mode 100755 index e78e36b6..00000000 --- a/tests/templates/kuttl/smoke/check-active-workers.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -import trino -import argparse -import sys - -if not sys.warnoptions: - import warnings -warnings.simplefilter("ignore") - - -def get_connection(username, password, namespace): - host = 'trino-coordinator-default-0.trino-coordinator-default.' + namespace + '.svc.cluster.local' - conn = trino.dbapi.connect( - host=host, - port=8443, - user=username, - http_scheme='https', - auth=trino.auth.BasicAuthentication(username, password), - session_properties={"query_max_execution_time": "60s"}, - ) - conn._http_session.verify = False - return conn - - -if __name__ == '__main__': - # Construct an argument parser - all_args = argparse.ArgumentParser() - - # Add arguments to the parser - all_args.add_argument("-u", "--user", required=True, - help="Username to connect as") - all_args.add_argument("-p", "--password", required=True, - help="Password for the user") - all_args.add_argument("-n", "--namespace", required=True, - help="Namespace the test is running in") - all_args.add_argument("-w", "--workers", required=True, - help="Expected amount of workers to be present") - - args = vars(all_args.parse_args()) - - expected_workers = args['workers'] - conn = get_connection(args['user'], args['password'], args['namespace']) - - cursor = conn.cursor() - cursor.execute("SELECT COUNT(*) as nodes FROM system.runtime.nodes WHERE coordinator=false AND state='active'") - - (active_workers,) = cursor.fetchone() - - if int(active_workers) != int(expected_workers): - print("Missmatch: [expected/active] workers [" + str(expected_workers) + "/" + str(active_workers) + "]") - exit(-1) - - print("Test check-active-workers.py succeeded!") diff --git a/tests/templates/kuttl/smoke/check-opa.py b/tests/templates/kuttl/smoke/check-opa.py deleted file mode 100755 index e45e8511..00000000 --- a/tests/templates/kuttl/smoke/check-opa.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -import argparse -import sys - -import trino -import trino.exceptions as trino_ex - -if not sys.warnoptions: - import warnings -warnings.simplefilter("ignore") - - -def get_connection(username, password, namespace): - host = ( - "trino-coordinator-default-0.trino-coordinator-default." - + namespace - + ".svc.cluster.local" - ) - conn = trino.dbapi.connect( - host=host, - port=8443, - user=username, - http_scheme="https", - auth=trino.auth.BasicAuthentication(username, password), - verify=False, - # Commented out because it apparently breaks the OPA rules. - # With this enabled, the script fails to validate that user bob can - # show catalogs. - # session_properties={"query_max_execution_time": "60s"}, - ) - return conn - - -def test_user(user, password, namespace, query): - conn = get_connection(user, password, namespace) - cursor = conn.cursor() - try: - cursor.execute(query) - cursor.fetchone() - return True - except trino_ex.Error: - return False - - -def main(): - # Construct an argument parser - all_args = argparse.ArgumentParser() - # Add arguments to the parser - all_args.add_argument( - "-n", "--namespace", required=True, help="Namespace the test is running in" - ) - - args = vars(all_args.parse_args()) - namespace = args["namespace"] - - # We expect the admin user query to pass - if not test_user("admin", "admin", namespace, "SHOW CATALOGS"): - print("User admin cannot show catalogs!") - sys.exit(-1) - # We expect the admin user query to pass - if not test_user("admin", "admin", namespace, "SHOW SCHEMAS FROM system"): - print("User admin cannot select schemas from system") - sys.exit(-1) - # We expect the bob query for catalogs to pass - if not test_user("bob", "bob", namespace, "SHOW CATALOGS"): - print("User bob cannot show catalogs!") - sys.exit(-1) - # We expect the bob query for schemas to fail - if test_user("bob", "bob", namespace, "SHOW SCHEMAS FROM system"): - print("User bob can show schemas from system. This should not be happening!") - sys.exit(-1) - - print("Test check-opa.py succeeded!") - - -if __name__ == "__main__": - main() diff --git a/tests/templates/kuttl/smoke/check-s3.py b/tests/templates/kuttl/smoke/check-s3.py deleted file mode 100755 index ff461c1c..00000000 --- a/tests/templates/kuttl/smoke/check-s3.py +++ /dev/null @@ -1,295 +0,0 @@ -#!/usr/bin/env python -import trino -import argparse -import sys -import re - -if not sys.warnoptions: - import warnings -warnings.simplefilter("ignore") - - -def get_connection(username, password, namespace): - host = ( - "trino-coordinator-default-0.trino-coordinator-default." - + namespace - + ".svc.cluster.local" - ) - # If you want to debug this locally use - # kubectl -n kuttl-test-XXX port-forward svc/trino-coordinator-default 8443 - # host = '127.0.0.1' - - conn = trino.dbapi.connect( - host=host, - port=8443, - user=username, - http_scheme="https", - auth=trino.auth.BasicAuthentication(username, password), - session_properties={"query_max_execution_time": "60s"}, - ) - conn._http_session.verify = False - return conn - - -def run_query(connection, query): - print(f"[DEBUG] Executing query {query}") - cursor = connection.cursor() - cursor.execute(query) - return cursor.fetchall() - - -if __name__ == "__main__": - # Construct an argument parser - all_args = argparse.ArgumentParser() - # Add arguments to the parser - all_args.add_argument( - "-n", "--namespace", required=True, help="Namespace the test is running in" - ) - - args = vars(all_args.parse_args()) - namespace = args["namespace"] - - print("Starting S3 tests...") - connection = get_connection("admin", "admin", namespace) - - trino_version = run_query( - connection, - "select node_version from system.runtime.nodes where coordinator = true and state = 'active'", - )[0][0] - print(f'[INFO] Testing against Trino version "{trino_version}"') - - # Strip SDP release suffix from the version string - trino_product_version = re.split(r"-stackable", trino_version, maxsplit=1)[0] - - assert len(trino_product_version) >= 3 - assert trino_product_version.isnumeric() - assert trino_version == run_query(connection, "select version()")[0][0] - - run_query( - connection, - "CREATE SCHEMA IF NOT EXISTS hive.minio WITH (location = 's3a://trino/')", - ) - - run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data") - run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data_copy") - run_query(connection, "DROP TABLE IF EXISTS hive.minio.taxi_data_transformed") - run_query(connection, "DROP TABLE IF EXISTS hive.hdfs.taxi_data_copy") - run_query(connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg") - - run_query( - connection, - """ -CREATE TABLE IF NOT EXISTS hive.minio.taxi_data ( - vendor_id VARCHAR, - tpep_pickup_datetime VARCHAR, - tpep_dropoff_datetime VARCHAR, - passenger_count VARCHAR, - trip_distance VARCHAR, - ratecode_id VARCHAR -) WITH ( - external_location = 's3a://trino/taxi-data/', - format = 'csv', - skip_header_line_count = 1 -) - """, - ) - assert ( - run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data")[0][0] == 5000 - ) - rows_written = run_query( - connection, - "CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data", - )[0][0] - assert rows_written == 5000 or rows_written == 0 - assert ( - run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_copy")[0][0] - == 5000 - ) - - rows_written = run_query( - connection, - """ -CREATE TABLE IF NOT EXISTS hive.minio.taxi_data_transformed AS -SELECT - CAST(vendor_id as BIGINT) as vendor_id, - tpep_pickup_datetime, - tpep_dropoff_datetime, - CAST(passenger_count as BIGINT) as passenger_count, - CAST(trip_distance as DOUBLE) as trip_distance, - CAST(ratecode_id as BIGINT) as ratecode_id -FROM hive.minio.taxi_data -""", - )[0][0] - assert rows_written == 5000 or rows_written == 0 - assert ( - run_query(connection, "SELECT COUNT(*) FROM hive.minio.taxi_data_transformed")[ - 0 - ][0] - == 5000 - ) - - print("[INFO] Testing HDFS") - - run_query( - connection, - "CREATE SCHEMA IF NOT EXISTS hive.hdfs WITH (location = 'hdfs://hdfs/trino/')", - ) - rows_written = run_query( - connection, - "CREATE TABLE IF NOT EXISTS hive.hdfs.taxi_data_copy AS SELECT * FROM hive.minio.taxi_data", - )[0][0] - assert rows_written == 5000 or rows_written == 0 - assert ( - run_query(connection, "SELECT COUNT(*) FROM hive.hdfs.taxi_data_copy")[0][0] - == 5000 - ) - - print("[INFO] Testing Iceberg") - run_query( - connection, "DROP TABLE IF EXISTS iceberg.minio.taxi_data_copy_iceberg" - ) # Clean up table to don't fail an second run - assert ( - run_query( - connection, - """ -CREATE TABLE IF NOT EXISTS iceberg.minio.taxi_data_copy_iceberg -WITH (partitioning = ARRAY['vendor_id', 'passenger_count'], format = 'parquet') -AS SELECT * FROM hive.minio.taxi_data -""", - )[0][0] - == 5000 - ) - # Check current count - assert ( - run_query( - connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" - )[0][0] - == 5000 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"', - )[0][0] - == 1 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"', - )[0][0] - == 12 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"', - )[0][0] - == 12 - ) - - assert ( - run_query( - connection, - "INSERT INTO iceberg.minio.taxi_data_copy_iceberg SELECT * FROM hive.minio.taxi_data", - )[0][0] - == 5000 - ) - - # Check current count - assert ( - run_query( - connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" - )[0][0] - == 10000 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"', - )[0][0] - == 2 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"', - )[0][0] - == 12 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"', - )[0][0] - == 24 - ) - - if trino_version == "377": - # io.trino.spi.TrinoException: This connector [iceberg] does not support versioned tables - print( - "[INFO] Skipping the Iceberg tests reading versioned tables for trino version 377 as it does not support versioned tables" - ) - else: - # Check count for first snapshot - first_snapshot = run_query( - connection, - 'select snapshot_id from iceberg.minio."taxi_data_copy_iceberg$snapshots" order by committed_at limit 1', - )[0][0] - assert ( - run_query( - connection, - f"SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg FOR VERSION AS OF {first_snapshot}", - )[0][0] - == 5000 - ) - - # Compact files - run_query( - connection, "ALTER TABLE iceberg.minio.taxi_data_copy_iceberg EXECUTE optimize" - ) - - # Check current count - assert ( - run_query( - connection, "SELECT COUNT(*) FROM iceberg.minio.taxi_data_copy_iceberg" - )[0][0] - == 10000 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$snapshots"', - )[0][0] - == 3 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$partitions"', - )[0][0] - == 12 - ) - assert ( - run_query( - connection, - 'SELECT COUNT(*) FROM iceberg.minio."taxi_data_copy_iceberg$files"', - )[0][0] - == 12 - ) # Compaction yeah :) - - # Test could be improved by also testing update and deletes - - # Test postgres connection - run_query(connection, "SHOW SCHEMAS IN postgresgeneric") - run_query(connection, "CREATE SCHEMA IF NOT EXISTS postgresgeneric.tpch") - run_query( - connection, - "CREATE TABLE IF NOT EXISTS postgresgeneric.tpch.nation AS SELECT * FROM tpch.tiny.nation", - ) - assert ( - run_query(connection, "SELECT COUNT(*) FROM postgresgeneric.tpch.nation")[0][0] - == 25 - ) - - print("[SUCCESS] All tests in check-s3.py succeeded!") diff --git a/tests/templates/kuttl/smoke_aws/21-assert.yaml b/tests/templates/kuttl/smoke_aws/21-assert.yaml index b3f78bfc..db05f096 100644 --- a/tests/templates/kuttl/smoke_aws/21-assert.yaml +++ b/tests/templates/kuttl/smoke_aws/21-assert.yaml @@ -3,7 +3,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestAssert timeout: 300 commands: - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -n $NAMESPACE -w 1 - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-opa.py -n $NAMESPACE - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-s3.py -n $NAMESPACE - - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-metrics.py -n $NAMESPACE + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-active-workers.py -u admin -p admin -c trino-coordinator-default -w 1 + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-opa.py -c trino-coordinator-default + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-s3.py -c trino-coordinator-default -b my-bucket + - script: kubectl exec -n $NAMESPACE trino-test-helper-0 -- python /tmp/check-metrics.py -c trino-coordinator-default-metrics -w trino-worker-default-metrics diff --git a/tests/templates/kuttl/smoke_aws/21-copy-scripts.yaml b/tests/templates/kuttl/smoke_aws/21-copy-scripts.yaml index fc51e8f8..9e78256f 100644 --- a/tests/templates/kuttl/smoke_aws/21-copy-scripts.yaml +++ b/tests/templates/kuttl/smoke_aws/21-copy-scripts.yaml @@ -2,7 +2,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - - script: kubectl cp -n $NAMESPACE ./check-active-workers.py trino-test-helper-0:/tmp || true - - script: kubectl cp -n $NAMESPACE ./check-opa.py trino-test-helper-0:/tmp || true - - script: kubectl cp -n $NAMESPACE ./check-s3.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-active-workers.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-opa.py trino-test-helper-0:/tmp || true + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-s3.py trino-test-helper-0:/tmp || true - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/check-metrics.py trino-test-helper-0:/tmp || true diff --git a/tests/templates/kuttl/smoke_aws/README.md b/tests/templates/kuttl/smoke_aws/README.md index 5bcbea75..a1343e64 100644 --- a/tests/templates/kuttl/smoke_aws/README.md +++ b/tests/templates/kuttl/smoke_aws/README.md @@ -12,7 +12,7 @@ aws s3api create-bucket --bucket ${BUCKET_NAME} --region eu-central-1 --create-b aws s3 cp yellow_tripdata_2021-07.csv s3://${BUCKET_NAME}/taxi-data/ ``` -You will need to update the bucket name in [check-s3.py](check-s3.py). +You will need to update the bucket name in the assert call for [check-s3.py](../commons/check-s3.py). ## Add AWS credentials diff --git a/tests/templates/kuttl/tls/10-assert.yaml.j2 b/tests/templates/kuttl/tls/10-assert.yaml.j2 index b19c14f9..f4348ded 100644 --- a/tests/templates/kuttl/tls/10-assert.yaml.j2 +++ b/tests/templates/kuttl/tls/10-assert.yaml.j2 @@ -25,10 +25,6 @@ metadata: name: trino-coordinator spec: ports: - - name: metrics - port: 8081 - protocol: TCP - targetPort: 8081 {% if test_scenario['values']['use-tls'] == 'false' %} - name: http port: 8080 diff --git a/tests/templates/kuttl/tls/check-tls.py b/tests/templates/kuttl/tls/check-tls.py index 4c88edea..dc7e7515 100755 --- a/tests/templates/kuttl/tls/check-tls.py +++ b/tests/templates/kuttl/tls/check-tls.py @@ -2,7 +2,6 @@ import trino import argparse import json -import requests def get_http_connection(host, user): @@ -10,17 +9,13 @@ def get_http_connection(host, user): host=host, port=8080, user=user, - http_scheme='http', + http_scheme="http", ) def get_https_connection(host, user, verify): return trino.dbapi.connect( - host=host, - port=8443, - user=user, - http_scheme='https', - verify=verify + host=host, port=8443, user=user, http_scheme="https", verify=verify ) @@ -29,9 +24,9 @@ def get_authenticated_https_connection(host, user, password, verify): host=host, port=8443, user=user, - http_scheme='https', + http_scheme="https", auth=trino.auth.BasicAuthentication(user, password), - verify=verify + verify=verify, ) @@ -50,29 +45,37 @@ def test_query_failure(conn, query, expected_error, failure_message): def read_json(config_path): - with open(config_path, 'r') as stream: + with open(config_path, "r") as stream: config = json.load(stream) return config -if __name__ == '__main__': +if __name__ == "__main__": # Construct an argument parser all_args = argparse.ArgumentParser() # Add arguments to the parser - all_args.add_argument("-n", "--namespace", required=True, help="Namespace the test is running in") + all_args.add_argument( + "-n", "--namespace", required=True, help="Namespace the test is running in" + ) args = vars(all_args.parse_args()) namespace = args["namespace"] - conf = read_json("/tmp/test-config.json") # config file to indicate our test script if auth / tls is used or not - coordinator_host = 'trino-coordinator-default-0.trino-coordinator-default.' + namespace + '.svc.cluster.local' + conf = read_json( + "/tmp/test-config.json" + ) # config file to indicate our test script if auth / tls is used or not + coordinator_host = ( + "trino-coordinator-default-metrics." + namespace + ".svc.cluster.local" + ) trusted_ca = "/stackable/trusted/ca.crt" # will be mounted from secret op untrusted_ca = "/stackable/untrusted-cert.crt" # some random CA query = "SHOW CATALOGS" # We expect these to work if conf["useAuthentication"]: - conn = get_authenticated_https_connection(coordinator_host, "admin", "admin", trusted_ca) + conn = get_authenticated_https_connection( + coordinator_host, "admin", "admin", trusted_ca + ) test_query(conn, query) elif conf["useTls"]: conn = get_https_connection(coordinator_host, "admin", trusted_ca) @@ -83,14 +86,37 @@ def read_json(config_path): # We expect these to fail if conf["useAuthentication"]: - conn = get_authenticated_https_connection(coordinator_host, "admin", "admin", untrusted_ca) - test_query_failure(conn, query, trino.exceptions.TrinoConnectionError, "Could connect with wrong certificate") - conn = get_authenticated_https_connection(coordinator_host, "admin", "wrong_password", trusted_ca) - test_query_failure(conn, query, trino.exceptions.HttpError, "Could connect with wrong password") - conn = get_authenticated_https_connection(coordinator_host, "wrong_user", "wrong_password", trusted_ca) - test_query_failure(conn, query, trino.exceptions.HttpError, "Could connect with wrong user and password") + conn = get_authenticated_https_connection( + coordinator_host, "admin", "admin", untrusted_ca + ) + test_query_failure( + conn, + query, + trino.exceptions.TrinoConnectionError, + "Could connect with wrong certificate", + ) + conn = get_authenticated_https_connection( + coordinator_host, "admin", "wrong_password", trusted_ca + ) + test_query_failure( + conn, query, trino.exceptions.HttpError, "Could connect with wrong password" + ) + conn = get_authenticated_https_connection( + coordinator_host, "wrong_user", "wrong_password", trusted_ca + ) + test_query_failure( + conn, + query, + trino.exceptions.HttpError, + "Could connect with wrong user and password", + ) elif conf["useTls"]: conn = get_https_connection(coordinator_host, "admin", untrusted_ca) - test_query_failure(conn, query, trino.exceptions.TrinoConnectionError, "Could connect with wrong certificate") + test_query_failure( + conn, + query, + trino.exceptions.TrinoConnectionError, + "Could connect with wrong certificate", + ) print("All TLS tests finished successfully!") diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index 6e94b0b8..92023474 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -111,6 +111,10 @@ tests: - opa - keycloak - openshift + - name: listener + dimensions: + - trino + - openshift suites: - name: nightly # Run with the latest product versions and tls true and false to cover different tls code paths.