55import functools
66import os
77import json
8+ import base64
9+ import time
10+ from typing import Optional , Dict , Union , Tuple , Type , Optional
11+ from functools import wraps
812
913from ceph .deployment import inventory
1014from ceph .deployment .service_spec import ServiceSpec , NFSServiceSpec , RGWSpec , PlacementSpec
1115from ceph .utils import datetime_now
1216
13- from typing import List , Dict , Optional , Callable , Any , TypeVar , Tuple , TYPE_CHECKING
17+ from typing import List , Dict , Optional , Callable , Any , TypeVar , Tuple , TYPE_CHECKING , cast
1418
1519try :
1620 from ceph .deployment .drive_group import DriveGroupSpec
1923
2024try :
2125 from kubernetes import client , config
22- from kubernetes .client . rest import ApiException
26+ from kubernetes .client import ApiException , CoreV1Api , V1Secret
2327
2428 kubernetes_imported = True
2529
@@ -33,6 +37,9 @@ def names(self: Any, names: Any) -> None:
3337 kubernetes_imported = False
3438 client = None
3539 config = None
40+ ApiException = Exception
41+ CoreV1Api = None
42+ V1Secret = object
3643
3744from mgr_module import MgrModule , Option , NFS_POOL_NAME
3845import orchestrator
@@ -44,6 +51,34 @@ def names(self: Any, names: Any) -> None:
4451FuncT = TypeVar ('FuncT' , bound = Callable )
4552ServiceSpecT = TypeVar ('ServiceSpecT' , bound = ServiceSpec )
4653
54+ def retry (
55+ on_exception : Union [Type [Exception ], Tuple [Type [Exception ], ...]],
56+ tries : int = 3 ,
57+ delay : int = 1 ,
58+ backoff : int = 2 ,
59+ max_delay : int = 60 ,
60+ logger : Optional [logging .Logger ] = None ,
61+ ) -> Callable [[Callable [..., Any ]], Callable [..., Any ]]:
62+ def decorator (func : Callable [..., Any ]) -> Callable [..., Any ]:
63+ @wraps (func )
64+ def wrapper (* args : Any , ** kwargs : Any ) -> Any :
65+ wait = delay
66+ err : Optional [Exception ] = None
67+ for i in range (tries ):
68+ try :
69+ return func (* args , ** kwargs )
70+ except on_exception as e :
71+ err = e
72+ if logger :
73+ logger .warning (
74+ f"Retry #{ i + 1 } /{ tries } after exception in '{ func .__name__ } ': { e } "
75+ )
76+ if i < tries - 1 :
77+ time .sleep (min (wait , max_delay ))
78+ wait *= backoff
79+ raise err # type: ignore
80+ return wrapper
81+ return decorator
4782
4883class RookEnv (object ):
4984 def __init__ (self ) -> None :
@@ -82,6 +117,18 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
82117 default = 'local' ,
83118 desc = 'storage class name for LSO-discovered PVs' ,
84119 ),
120+ Option (
121+ 'secure_monitoring_stack' ,
122+ type = 'bool' ,
123+ default = False ,
124+ desc = 'Enable TLS security for all the monitoring stack daemons'
125+ ),
126+ Option (
127+ 'prometheus_tls_secret_name' ,
128+ type = 'str' ,
129+ default = 'rook-ceph-prometheus-server-tls' ,
130+ desc = 'name of tls secret in k8s for prometheus' ,
131+ )
85132 ]
86133
87134 @staticmethod
@@ -531,6 +578,16 @@ def _get_pool_params(self) -> Tuple[int, str]:
531578 break
532579 return num_replicas , leaf_type
533580
581+ @handle_orch_error
582+ def get_security_config (self ) -> Dict [str , bool ]:
583+ secure_monitoring_stack = cast (
584+ bool , self .get_module_option_ex ('rook' , 'secure_monitoring_stack' , False )
585+ )
586+ return {
587+ 'security_enabled' : secure_monitoring_stack ,
588+ 'mgmt_gw_enabled' : False
589+ }
590+
534591 @handle_orch_error
535592 def remove_service (self , service_name : str , force : bool = False ) -> str :
536593 if service_name == 'rbd-mirror' :
@@ -567,7 +624,7 @@ def zap_device(self, host: str, path: str) -> OrchResult[str]:
567624 except Exception as e :
568625 logging .error (e )
569626 return OrchResult (None , Exception ("Unable to zap device: " + str (e .with_traceback (None ))))
570- return OrchResult (f'{ path } on { host } zapped' )
627+ return OrchResult (f'{ path } on { host } zapped' )
571628
572629 @handle_orch_error
573630 def apply_mon (self , spec ):
@@ -639,3 +696,43 @@ def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
639696 @handle_orch_error
640697 def upgrade_ls (self , image : Optional [str ], tags : bool , show_all_versions : Optional [bool ]) -> Dict [Any , Any ]:
641698 return {}
699+
700+ # Retry decorator for handling transient Kubernetes API failures
701+ @retry (on_exception = ApiException , tries = 7 , delay = 1 , backoff = 2 , max_delay = 60 )
702+ def fetch_k8s_secret (self , secret_name : str ) -> Optional [V1Secret ]:
703+ if self ._k8s_CoreV1_api is None :
704+ logging .warning ("CoreV1Api client is not initialized, returning None." )
705+ return None
706+
707+ try :
708+ return self ._k8s_CoreV1_api .read_namespaced_secret (
709+ name = secret_name ,
710+ namespace = self ._rook_env .namespace
711+ )
712+ except Exception as e :
713+ logging .warning (f"Failed to fetch secret '{ secret_name } ': { e } " )
714+ return None
715+
716+ @handle_orch_error
717+ def generate_certificates (self , module_name : str ) -> Optional [Dict [str , str ]]:
718+ api_response = None
719+ cert , key = "" , ""
720+ supported_modules = ['prometheus' ]
721+ if module_name not in supported_modules :
722+ raise orchestrator .OrchestratorError (f'Unsupported module { module_name } . Supported module are: { supported_modules } ' )
723+
724+ secret_name = self .get_module_option (f'{ module_name } _tls_secret_name' )
725+ try :
726+ api_response = self .fetch_k8s_secret (secret_name )
727+ except ApiException as e :
728+ raise orchestrator .OrchestratorError (f'Unable to get certificates for { module_name } , error: { e } ' )
729+
730+ if api_response is None :
731+ raise orchestrator .OrchestratorError (f'Unable to get certificates for { module_name } ' )
732+ else :
733+ cert = base64 .b64decode (api_response .data .get ('tls.crt' ,'' )).decode ('utf-8' )
734+ key = base64 .b64decode (api_response .data .get ('tls.key' , '' )).decode ('utf-8' )
735+ if cert == "" or key == "" :
736+ raise orchestrator .OrchestratorError (f'Unable to parse certificates for { module_name } module' )
737+
738+ return {'cert' : cert , 'key' : key }
0 commit comments