1717from ecs_composex .common .logging import LOG
1818from ecs_composex .common .stacks import ComposeXStack
1919from ecs_composex .common .troposphere_tools import (
20+ Parameter ,
2021 add_outputs ,
2122 add_resource ,
2223 build_template ,
4647)
4748from .msk_cluster_params import (
4849 MSK_CLUSTER_ADDRESSING_TYPE ,
50+ MSK_CLUSTER_INSTANCE_TYPES ,
4951 MSK_CLUSTER_SG_PARAM ,
5052 MSK_CLUSTER_USE_SASL_IAM ,
5153 MSK_CLUSTER_USE_SASL_SCRAM ,
@@ -67,24 +69,27 @@ def __init__(self, name, stack_template, cluster, top_stack, **kwargs):
6769 self .cluster .stack = self
6870
6971
70- def validate_cluster_version (cluster : MskCluster , input_version ) -> None :
72+ def validate_cluster_version (cluster : MskCluster , input_version ) -> Parameter :
7173 """
7274 Validates the kafka version
7375 """
74- versions = list_all_kafka_versions (session = cluster .lookup_session )
75- for version in versions :
76- if version ["Version" ] == input_version :
77- break
78- else :
79- raise ValueError (
80- f"{ cluster .module .res_key } .{ cluster .name } - " ,
81- f"Version { input_version } is not valid. Versions supported" ,
82- [_v ["Version" ] for _v in versions if _v ["Status" ] == "ACTIVE" ],
76+ valid_versions : list [str ] = [
77+ _version ["Version" ]
78+ for _version in list_all_kafka_versions (session = cluster .lookup_session )
79+ if _version ["Status" ] == "ACTIVE"
80+ ]
81+ if input_version in valid_versions :
82+ return Parameter (
83+ "KafkaVersion" ,
84+ Type = "String" ,
85+ AllowedValues = valid_versions ,
86+ Default = input_version ,
8387 )
84- if keyisset ( "Status" , version ) and version [ "Status" ] != "ACTIVE" :
88+ else :
8589 raise ValueError (
86- f"{ cluster .module .res_key } .{ cluster .name } - "
87- f"Version { version ['Version' ]} is not active: { version ['Status' ]} "
90+ "Version {} is not valid. Active versions supported: {}" .format (
91+ input_version , valid_versions
92+ )
8893 )
8994
9095
@@ -99,6 +104,7 @@ def set_msk_cluster_template(module: XResourceModule, cluster: MskCluster) -> Te
99104 STORAGE_SUBNETS ,
100105 APP_SUBNETS ,
101106 PUBLIC_SUBNETS ,
107+ MSK_CLUSTER_INSTANCE_TYPES ,
102108 ],
103109 )
104110 template .add_condition (
@@ -112,6 +118,32 @@ def set_msk_cluster_template(module: XResourceModule, cluster: MskCluster) -> Te
112118 return template
113119
114120
121+ def set_instance_type (cluster : MskCluster , cluster_stack : ComposeXStack ) -> None :
122+ """
123+ Checks the instance type value. Replaces it with parameter.
124+ Updates parameter value if valid, uses Default if not
125+ """
126+ instance_type = getattr (cluster .cfn_resource .BrokerNodeGroupInfo , "InstanceType" )
127+ if instance_type not in MSK_CLUSTER_INSTANCE_TYPES .AllowedValues :
128+ LOG .error (
129+ "{}.{} - {} InstanceType is not valid. Using default {}" .format (
130+ cluster .module .res_key ,
131+ cluster .name ,
132+ instance_type ,
133+ MSK_CLUSTER_INSTANCE_TYPES .Default ,
134+ )
135+ )
136+ else :
137+ cluster_stack .Parameters .update (
138+ {MSK_CLUSTER_INSTANCE_TYPES .title : instance_type }
139+ )
140+ setattr (
141+ cluster .cfn_resource .BrokerNodeGroupInfo ,
142+ "InstanceType" ,
143+ Ref (MSK_CLUSTER_INSTANCE_TYPES ),
144+ )
145+
146+
115147def build_msk_clusters (module : XResourceModule , msk_top_stack : ComposeXStack ):
116148 """
117149 Creates a new MSK cluster from properties
@@ -187,7 +219,10 @@ def build_msk_clusters(module: XResourceModule, msk_top_stack: ComposeXStack):
187219 }
188220 )
189221 cluster .cfn_resource = AwsMskCluster (cluster .logical_name , ** cluster_props )
190- validate_cluster_version (cluster , cluster .cfn_resource .KafkaVersion )
222+ version_param = cluster_template .add_parameter (
223+ validate_cluster_version (cluster , cluster .cfn_resource .KafkaVersion )
224+ )
225+ setattr (cluster .cfn_resource , "KafkaVersion" , Ref (version_param ))
191226 cluster_stack = MskClusterStack (
192227 cluster .logical_name ,
193228 cluster = cluster ,
@@ -199,8 +234,10 @@ def build_msk_clusters(module: XResourceModule, msk_top_stack: ComposeXStack):
199234 STORAGE_SUBNETS .title : Ref (STORAGE_SUBNETS ),
200235 PUBLIC_SUBNETS .title : Ref (PUBLIC_SUBNETS ),
201236 APP_SUBNETS .title : Ref (APP_SUBNETS ),
237+ version_param .title : version_param .Default ,
202238 },
203239 )
240+ set_instance_type (cluster , cluster_stack )
204241
205242 handle_msk_auth_settings (cluster )
206243 add_resource (cluster_template , cluster .cfn_resource )
0 commit comments