Skip to content

Commit 3a7fea3

Browse files
Merge pull request #160 from jinyoungmoonDEV/master
add: Elasticache Valkey Resource
2 parents 1d7071f + ce52a4c commit 3a7fea3

File tree

4 files changed

+345
-59
lines changed

4 files changed

+345
-59
lines changed

src/spaceone/inventory/connector/aws_elasticache_connector/connector.py

Lines changed: 113 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import logging
22

33
from spaceone.core.utils import *
4-
from spaceone.inventory.connector.aws_elasticache_connector.schema.data import Redis, Memcached
4+
from spaceone.inventory.connector.aws_elasticache_connector.schema.data import Redis, Memcached, Valkey
55
from spaceone.inventory.connector.aws_elasticache_connector.schema.resource import RedisResource, RedisResponse, \
6-
MemcachedResource, MemcachedResponse
6+
MemcachedResource, MemcachedResponse, ValkeyResponse, ValkeyResource
77
from spaceone.inventory.connector.aws_elasticache_connector.schema.service_type import CLOUD_SERVICE_TYPES
88
from spaceone.inventory.libs.connector import SchematicAWSConnector
99
from spaceone.inventory.libs.schema.resource import ReferenceModel, CloudWatchModel
@@ -39,7 +39,9 @@ def get_resources(self):
3939

4040
resources.append(MemcachedResponse({'resource': memcached_vo}))
4141

42-
for redis_vo in self.get_redis_data(region_name, cache_clusters):
42+
replication_groups = cache_clusters = [cluster for cluster in self.describe_replication_groups()]
43+
44+
for redis_vo in self.get_redis_data(region_name, replication_groups, cache_clusters):
4345
if getattr(redis_vo, 'resource_type', None) and redis_vo.resource_type == 'inventory.ErrorResource':
4446
# Error Resource
4547
resources.append(redis_vo)
@@ -48,6 +50,16 @@ def get_resources(self):
4850
redis_vo.cloudwatch = CloudWatchModel(redis_vo.set_cloudwatch(region_name))
4951

5052
resources.append(RedisResponse({'resource': redis_vo}))
53+
54+
for valkey_vo in self.get_valkey_data(region_name, replication_groups, cache_clusters):
55+
if getattr(valkey_vo, 'resource_type', None) and valkey_vo.resource_type == 'inventory.ErrorResource':
56+
# Error Resource
57+
resources.append(valkey_vo)
58+
else:
59+
if getattr(valkey_vo, 'set_cloudwatch', None):
60+
valkey_vo.cloudwatch = CloudWatchModel(valkey_vo.set_cloudwatch(region_name))
61+
62+
resources.append(ValkeyResponse({'resource': valkey_vo}))
5163
except Exception as e:
5264
error_resource_response = self.generate_error(region_name, '', e)
5365
resources.append(error_resource_response)
@@ -87,47 +99,97 @@ def get_memcached_data(self, region_name, cache_clusters):
8799
error_resource_response = self.generate_error(region_name, resource_id, e)
88100
yield {'data': error_resource_response}
89101

90-
def get_redis_data(self, region_name, cache_clusters):
102+
def get_redis_data(self, region_name, replication_groups, cache_clusters):
91103
self.cloud_service_type = 'Redis'
92104
cloudtrail_resource_type = None
93105

94-
for replication_group in self.describe_replication_groups():
106+
for replication_group in replication_groups:
95107
try:
96-
replication_group.update({
97-
'mode': self.set_redis_mode(replication_group.get('ClusterEnabled')),
98-
'engine': 'redis',
99-
'engine_version': self.get_engine_version(replication_group, cache_clusters),
100-
'shard_count': self.get_shard_count(replication_group.get('NodeGroups', [])),
101-
'availability_zones': self.get_redis_availability_zones(replication_group.get('NodeGroups', [])),
102-
'subnet_group_name': self.get_redis_subnet_group_name(replication_group, cache_clusters),
103-
'parameter_group_name': self.get_redis_parameter_group_name(replication_group, cache_clusters),
104-
'node_count': self.get_node_count(replication_group.get('MemberClusters', [])),
105-
'cloudtrail': self.set_cloudtrail(region_name, cloudtrail_resource_type,
106-
replication_group['ReplicationGroupId']),
107-
'nodes': self.get_redis_nodes_info(replication_group, cache_clusters),
108-
})
109-
110-
if replication_group.get('mode') == 'Redis':
108+
if replication_group.get('Engine') == 'redis':
111109
replication_group.update({
112-
'primary_endpoint': self.get_redis_primary_endpoint(replication_group),
113-
'reader_endpoint': self.get_redis_reader_endpoint(replication_group)
110+
'mode': self.set_redis_mode(replication_group.get('ClusterEnabled')),
111+
'engine': 'redis',
112+
'engine_version': self.get_engine_version(replication_group, cache_clusters),
113+
'shard_count': self.get_shard_count(replication_group.get('NodeGroups', [])),
114+
'availability_zones': self.get_availability_zones(replication_group.get('NodeGroups', [])),
115+
'subnet_group_name': self.get_subnet_group_name(replication_group, cache_clusters),
116+
'parameter_group_name': self.get_parameter_group_name(replication_group, cache_clusters),
117+
'node_count': self.get_node_count(replication_group.get('MemberClusters', [])),
118+
'cloudtrail': self.set_cloudtrail(region_name, cloudtrail_resource_type,
119+
replication_group['ReplicationGroupId']),
120+
'nodes': self.get_nodes_info(replication_group, cache_clusters),
114121
})
115-
elif replication_group.get('mode') == 'Clustered Redis':
116-
replication_group.update({
117-
'shards': self.get_redis_shards_info(replication_group)
122+
123+
if replication_group.get('mode') == 'Redis':
124+
replication_group.update({
125+
'primary_endpoint': self.get_primary_endpoint(replication_group),
126+
'reader_endpoint': self.get_reader_endpoint(replication_group)
127+
})
128+
elif replication_group.get('mode') == 'Clustered Redis':
129+
replication_group.update({
130+
'shards': self.get_shards_info(replication_group)
131+
})
132+
133+
redis_vo = Redis(replication_group, strict=False)
134+
135+
yield RedisResource({
136+
'data': redis_vo,
137+
'name': redis_vo.replication_group_id,
138+
'instance_type': redis_vo.cache_node_type,
139+
'account': self.account_id,
140+
'region_code': region_name,
141+
'tags': self.list_tags(redis_vo.arn),
142+
'reference': ReferenceModel(redis_vo.reference(region_name))
118143
})
119144

120-
redis_vo = Redis(replication_group, strict=False)
145+
except Exception as e:
146+
resource_id = replication_group.get('ARN', '')
147+
error_resource_response = self.generate_error(region_name, resource_id, e)
148+
yield {'data': error_resource_response}
149+
150+
def get_valkey_data(self, region_name, replication_groups, cache_clusters):
151+
self.cloud_service_type = 'Valkey'
152+
cloudtrail_resource_type = None
153+
154+
for replication_group in replication_groups:
155+
try:
156+
if replication_group.get('Engine') == 'valkey':
157+
replication_group.update({
158+
'mode': self.set_valkey_mode(replication_group.get('ClusterEnabled')),
159+
'engine': 'valkey',
160+
'engine_version': self.get_engine_version(replication_group, cache_clusters),
161+
'shard_count': self.get_shard_count(replication_group.get('NodeGroups', [])),
162+
'availability_zones': self.get_availability_zones(
163+
replication_group.get('NodeGroups', [])),
164+
'subnet_group_name': self.get_subnet_group_name(replication_group, cache_clusters),
165+
'parameter_group_name': self.get_parameter_group_name(replication_group, cache_clusters),
166+
'node_count': self.get_node_count(replication_group.get('MemberClusters', [])),
167+
'cloudtrail': self.set_cloudtrail(region_name, cloudtrail_resource_type,
168+
replication_group['ReplicationGroupId']),
169+
'nodes': self.get_nodes_info(replication_group, cache_clusters),
170+
})
121171

122-
yield RedisResource({
123-
'data': redis_vo,
124-
'name': redis_vo.replication_group_id,
125-
'instance_type': redis_vo.cache_node_type,
126-
'account': self.account_id,
127-
'region_code': region_name,
128-
'tags': self.list_tags(redis_vo.arn),
129-
'reference': ReferenceModel(redis_vo.reference(region_name))
130-
})
172+
if replication_group.get('mode') == 'Valkey':
173+
replication_group.update({
174+
'primary_endpoint': self.get_primary_endpoint(replication_group),
175+
'reader_endpoint': self.get_reader_endpoint(replication_group)
176+
})
177+
elif replication_group.get('mode') == 'Clustered Valkey':
178+
replication_group.update({
179+
'shards': self.get_shards_info(replication_group)
180+
})
181+
182+
valkey_vo = Valkey(replication_group, strict=False)
183+
184+
yield ValkeyResource({
185+
'data': valkey_vo,
186+
'name': valkey_vo.replication_group_id,
187+
'instance_type': valkey_vo.cache_node_type,
188+
'account': self.account_id,
189+
'region_code': region_name,
190+
'tags': self.list_tags(valkey_vo.arn),
191+
'reference': ReferenceModel(valkey_vo.reference(region_name))
192+
})
131193

132194
except Exception as e:
133195
resource_id = replication_group.get('ARN', '')
@@ -195,6 +257,13 @@ def set_redis_mode(cluster_enabled):
195257
else:
196258
return 'Redis'
197259

260+
@staticmethod
261+
def set_valkey_mode(cluster_enabled):
262+
if cluster_enabled:
263+
return 'Clustered Valkey'
264+
else:
265+
return 'Valkey'
266+
198267
@staticmethod
199268
def get_node_count(member_clusters):
200269
return len(member_clusters)
@@ -204,13 +273,13 @@ def get_shard_count(node_groups):
204273
return len(node_groups)
205274

206275
@staticmethod
207-
def get_redis_primary_endpoint(replication_group):
276+
def get_primary_endpoint(replication_group):
208277
for node_group in replication_group.get('NodeGroups', []):
209278
primary_endpoint = node_group.get("PrimaryEndpoint", {})
210279
return f'{primary_endpoint.get("Address", "")}:{primary_endpoint.get("Port", "")}'
211280

212281
@staticmethod
213-
def get_redis_reader_endpoint(replication_group):
282+
def get_reader_endpoint(replication_group):
214283
for node_group in replication_group.get('NodeGroups', []):
215284
reader_endpoint = node_group.get("ReaderEndpoint", {})
216285
return f'{reader_endpoint.get("Address", "")}:{reader_endpoint.get("Port", "")}'
@@ -225,7 +294,7 @@ def get_engine_version(replication_group, cache_clusters):
225294
return ''
226295

227296
@staticmethod
228-
def get_redis_availability_zones(node_groups):
297+
def get_availability_zones(node_groups):
229298
azs = []
230299

231300
for node_group in node_groups:
@@ -237,7 +306,7 @@ def get_redis_availability_zones(node_groups):
237306
return list(set(azs))
238307

239308
@staticmethod
240-
def get_redis_subnet_group_name(replication_group, cache_clusters):
309+
def get_subnet_group_name(replication_group, cache_clusters):
241310
for member in replication_group.get('MemberClusters', []):
242311
for cache_cluster in cache_clusters:
243312
if cache_cluster.get('CacheClusterId') == member:
@@ -246,15 +315,15 @@ def get_redis_subnet_group_name(replication_group, cache_clusters):
246315
return ''
247316

248317
@staticmethod
249-
def get_redis_parameter_group_name(replication_group, cache_clusters):
318+
def get_parameter_group_name(replication_group, cache_clusters):
250319
for member in replication_group.get('MemberClusters', []):
251320
for cache_cluster in cache_clusters:
252321
if cache_cluster.get('CacheClusterId') == member:
253322
return cache_cluster.get('CacheParameterGroup', {}).get('CacheParameterGroupName', '')
254323

255324
return ''
256325

257-
def get_redis_nodes_info(self, replication_group, cache_clusters):
326+
def get_nodes_info(self, replication_group, cache_clusters):
258327
nodes = []
259328

260329
for member in replication_group.get('MemberClusters', []):
@@ -282,7 +351,7 @@ def get_redis_nodes_info(self, replication_group, cache_clusters):
282351
})
283352
else:
284353
node_dic.update({
285-
'endpoint': self.set_redis_cluster_node_endpoint(member, replication_group.get('ConfigurationEndpoint', {}).get('Address')),
354+
'endpoint': self.set_cluster_node_endpoint(member, replication_group.get('ConfigurationEndpoint', {}).get('Address')),
286355
'port': replication_group.get('ConfigurationEndpoint', {}).get('Port', ''),
287356
'zone': node_group_member.get('PreferredAvailabilityZone', ''),
288357
})
@@ -292,7 +361,7 @@ def get_redis_nodes_info(self, replication_group, cache_clusters):
292361
return nodes
293362

294363
@staticmethod
295-
def get_redis_shards_info(replication_group):
364+
def get_shards_info(replication_group):
296365
shards = []
297366

298367
for node_group in replication_group.get('NodeGroups', []):
@@ -306,7 +375,7 @@ def get_redis_shards_info(replication_group):
306375
return shards
307376

308377
@staticmethod
309-
def set_redis_cluster_node_endpoint(member, address):
378+
def set_cluster_node_endpoint(member, address):
310379
if address:
311380
address_split = address.split('.')[1:]
312381
address_split.insert(0, member)

0 commit comments

Comments
 (0)