|
1 | 1 | use std::{ |
2 | | - borrow::Cow, |
3 | 2 | collections::{BTreeMap, HashMap}, |
4 | 3 | num::TryFromIntError, |
5 | 4 | }; |
6 | 5 |
|
7 | | -use futures::future::try_join_all; |
8 | 6 | use product_config::types::PropertyNameKind; |
9 | 7 | use security::AuthenticationConfig; |
10 | 8 | use serde::{Deserialize, Serialize}; |
@@ -37,7 +35,6 @@ use stackable_operator::{ |
37 | 35 | schemars::{self, JsonSchema}, |
38 | 36 | status::condition::{ClusterCondition, HasStatusCondition}, |
39 | 37 | time::Duration, |
40 | | - utils::cluster_info::KubernetesClusterInfo, |
41 | 38 | versioned::versioned, |
42 | 39 | }; |
43 | 40 | use strum::{Display, EnumIter, EnumString}; |
@@ -591,169 +588,6 @@ impl v1alpha1::HbaseCluster { |
591 | 588 | role_group: group_name.into(), |
592 | 589 | } |
593 | 590 | } |
594 | | - |
595 | | - /// Returns rolegroup and replica information for a specific role. |
596 | | - /// We can't pass through the merged config for a particular role-group |
597 | | - /// here as we need more than the config. As this will be called by role, |
598 | | - /// the merged listener-class is called so that only role-group information |
599 | | - /// for externally-reachable services (based on their listener class) are |
600 | | - /// included in the collection. |
601 | | - pub fn rolegroup_ref_and_replicas( |
602 | | - &self, |
603 | | - role: &HbaseRole, |
604 | | - ) -> Vec<(RoleGroupRef<v1alpha1::HbaseCluster>, u16)> { |
605 | | - match role { |
606 | | - HbaseRole::Master => self |
607 | | - .spec |
608 | | - .masters |
609 | | - .iter() |
610 | | - .flat_map(|role| &role.role_groups) |
611 | | - // Order rolegroups consistently, to avoid spurious downstream rewrites |
612 | | - .collect::<BTreeMap<_, _>>() |
613 | | - .into_iter() |
614 | | - .map(|(rolegroup_name, role_group)| { |
615 | | - ( |
616 | | - self.rolegroup_ref(HbaseRole::Master.to_string(), rolegroup_name), |
617 | | - role_group.replicas.unwrap_or_default(), |
618 | | - ) |
619 | | - }) |
620 | | - .collect(), |
621 | | - HbaseRole::RegionServer => self |
622 | | - .spec |
623 | | - .region_servers |
624 | | - .iter() |
625 | | - .flat_map(|role| &role.role_groups) |
626 | | - // Order rolegroups consistently, to avoid spurious downstream rewrites |
627 | | - .collect::<BTreeMap<_, _>>() |
628 | | - .into_iter() |
629 | | - .map(|(rolegroup_name, role_group)| { |
630 | | - ( |
631 | | - self.rolegroup_ref(HbaseRole::RegionServer.to_string(), rolegroup_name), |
632 | | - role_group.replicas.unwrap_or_default(), |
633 | | - ) |
634 | | - }) |
635 | | - .collect(), |
636 | | - HbaseRole::RestServer => self |
637 | | - .spec |
638 | | - .rest_servers |
639 | | - .iter() |
640 | | - .flat_map(|role| &role.role_groups) |
641 | | - // Order rolegroups consistently, to avoid spurious downstream rewrites |
642 | | - .collect::<BTreeMap<_, _>>() |
643 | | - .into_iter() |
644 | | - .map(|(rolegroup_name, role_group)| { |
645 | | - ( |
646 | | - self.rolegroup_ref(HbaseRole::RestServer.to_string(), rolegroup_name), |
647 | | - role_group.replicas.unwrap_or_default(), |
648 | | - ) |
649 | | - }) |
650 | | - .collect(), |
651 | | - } |
652 | | - } |
653 | | - |
654 | | - pub fn pod_refs( |
655 | | - &self, |
656 | | - role: &HbaseRole, |
657 | | - hbase_version: &str, |
658 | | - ) -> Result<Vec<HbasePodRef>, Error> { |
659 | | - let ns = self.metadata.namespace.clone().context(NoNamespaceSnafu)?; |
660 | | - let rolegroup_ref_and_replicas = self.rolegroup_ref_and_replicas(role); |
661 | | - |
662 | | - Ok(rolegroup_ref_and_replicas |
663 | | - .iter() |
664 | | - .flat_map(|(rolegroup_ref, replicas)| { |
665 | | - let ns = ns.clone(); |
666 | | - (0..*replicas).map(move |i| HbasePodRef { |
667 | | - namespace: ns.clone(), |
668 | | - role_group_service_name: rolegroup_ref.object_name(), |
669 | | - pod_name: format!("{}-{}", rolegroup_ref.object_name(), i), |
670 | | - ports: self |
671 | | - .ports(role, hbase_version) |
672 | | - .iter() |
673 | | - .map(|(n, p)| (n.clone(), *p)) |
674 | | - .collect(), |
675 | | - fqdn_override: None, |
676 | | - }) |
677 | | - }) |
678 | | - .collect()) |
679 | | - } |
680 | | - |
681 | | - pub async fn listener_refs( |
682 | | - &self, |
683 | | - client: &stackable_operator::client::Client, |
684 | | - role: &HbaseRole, |
685 | | - hbase_version: &str, |
686 | | - ) -> Result<Vec<HbasePodRef>, Error> { |
687 | | - let pod_refs = self.pod_refs(role, hbase_version)?; |
688 | | - try_join_all(pod_refs.into_iter().map(|pod_ref| async { |
689 | | - // N.B. use the naming convention for persistent listener volumes as we |
690 | | - // have specified above that we only want externally-reachable endpoints. |
691 | | - let listener_name = format!("{LISTENER_VOLUME_NAME}-{}", pod_ref.pod_name); |
692 | | - let listener_ref = |
693 | | - || ObjectRef::<Listener>::new(&listener_name).within(&pod_ref.namespace); |
694 | | - let pod_obj_ref = |
695 | | - || ObjectRef::<Pod>::new(&pod_ref.pod_name).within(&pod_ref.namespace); |
696 | | - let listener = client |
697 | | - .get::<Listener>(&listener_name, &pod_ref.namespace) |
698 | | - .await |
699 | | - .context(GetPodListenerSnafu { |
700 | | - listener: listener_ref(), |
701 | | - pod: pod_obj_ref(), |
702 | | - })?; |
703 | | - let listener_address = listener |
704 | | - .status |
705 | | - .and_then(|s| s.ingress_addresses?.into_iter().next()) |
706 | | - .context(PodListenerHasNoAddressSnafu { |
707 | | - listener: listener_ref(), |
708 | | - pod: pod_obj_ref(), |
709 | | - })?; |
710 | | - Ok(HbasePodRef { |
711 | | - fqdn_override: Some(listener_address.address), |
712 | | - ports: listener_address |
713 | | - .ports |
714 | | - .into_iter() |
715 | | - .map(|(port_name, port)| { |
716 | | - let port = u16::try_from(port).context(PortOutOfBoundsSnafu { |
717 | | - port_name: &port_name, |
718 | | - port, |
719 | | - })?; |
720 | | - Ok((port_name, port)) |
721 | | - }) |
722 | | - .collect::<Result<_, _>>()?, |
723 | | - ..pod_ref |
724 | | - }) |
725 | | - })) |
726 | | - .await |
727 | | - } |
728 | | -} |
729 | | - |
730 | | -/// Reference to a single `Pod` that is a component of a [`HbaseCluster`] |
731 | | -/// |
732 | | -/// Used for service discovery. |
733 | | -#[derive(Debug)] |
734 | | -pub struct HbasePodRef { |
735 | | - pub namespace: String, |
736 | | - pub role_group_service_name: String, |
737 | | - pub pod_name: String, |
738 | | - pub fqdn_override: Option<String>, |
739 | | - pub ports: HashMap<String, u16>, |
740 | | -} |
741 | | - |
742 | | -impl HbasePodRef { |
743 | | - pub fn fqdn(&self, cluster_info: &KubernetesClusterInfo) -> Cow<str> { |
744 | | - self.fqdn_override.as_deref().map_or_else( |
745 | | - || { |
746 | | - Cow::Owned(format!( |
747 | | - "{pod_name}.{role_group_service_name}.{namespace}.svc.{cluster_domain}", |
748 | | - pod_name = self.pod_name, |
749 | | - role_group_service_name = self.role_group_service_name, |
750 | | - namespace = self.namespace, |
751 | | - cluster_domain = cluster_info.cluster_domain, |
752 | | - )) |
753 | | - }, |
754 | | - Cow::Borrowed, |
755 | | - ) |
756 | | - } |
757 | 591 | } |
758 | 592 |
|
759 | 593 | pub fn merged_env(rolegroup_config: Option<&BTreeMap<String, String>>) -> Vec<EnvVar> { |
|
0 commit comments