Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 113 additions & 44 deletions src/spaceone/inventory/connector/aws_elasticache_connector/connector.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging

from spaceone.core.utils import *
from spaceone.inventory.connector.aws_elasticache_connector.schema.data import Redis, Memcached
from spaceone.inventory.connector.aws_elasticache_connector.schema.data import Redis, Memcached, Valkey
from spaceone.inventory.connector.aws_elasticache_connector.schema.resource import RedisResource, RedisResponse, \
MemcachedResource, MemcachedResponse
MemcachedResource, MemcachedResponse, ValkeyResponse, ValkeyResource
from spaceone.inventory.connector.aws_elasticache_connector.schema.service_type import CLOUD_SERVICE_TYPES
from spaceone.inventory.libs.connector import SchematicAWSConnector
from spaceone.inventory.libs.schema.resource import ReferenceModel, CloudWatchModel
Expand Down Expand Up @@ -39,7 +39,9 @@ def get_resources(self):

resources.append(MemcachedResponse({'resource': memcached_vo}))

for redis_vo in self.get_redis_data(region_name, cache_clusters):
replication_groups = cache_clusters = [cluster for cluster in self.describe_replication_groups()]

for redis_vo in self.get_redis_data(region_name, replication_groups, cache_clusters):
if getattr(redis_vo, 'resource_type', None) and redis_vo.resource_type == 'inventory.ErrorResource':
# Error Resource
resources.append(redis_vo)
Expand All @@ -48,6 +50,16 @@ def get_resources(self):
redis_vo.cloudwatch = CloudWatchModel(redis_vo.set_cloudwatch(region_name))

resources.append(RedisResponse({'resource': redis_vo}))

for valkey_vo in self.get_valkey_data(region_name, replication_groups, cache_clusters):
if getattr(valkey_vo, 'resource_type', None) and valkey_vo.resource_type == 'inventory.ErrorResource':
# Error Resource
resources.append(valkey_vo)
else:
if getattr(valkey_vo, 'set_cloudwatch', None):
valkey_vo.cloudwatch = CloudWatchModel(valkey_vo.set_cloudwatch(region_name))

resources.append(ValkeyResponse({'resource': valkey_vo}))
except Exception as e:
error_resource_response = self.generate_error(region_name, '', e)
resources.append(error_resource_response)
Expand Down Expand Up @@ -87,47 +99,97 @@ def get_memcached_data(self, region_name, cache_clusters):
error_resource_response = self.generate_error(region_name, resource_id, e)
yield {'data': error_resource_response}

def get_redis_data(self, region_name, cache_clusters):
def get_redis_data(self, region_name, replication_groups, cache_clusters):
self.cloud_service_type = 'Redis'
cloudtrail_resource_type = None

for replication_group in self.describe_replication_groups():
for replication_group in replication_groups:
try:
replication_group.update({
'mode': self.set_redis_mode(replication_group.get('ClusterEnabled')),
'engine': 'redis',
'engine_version': self.get_engine_version(replication_group, cache_clusters),
'shard_count': self.get_shard_count(replication_group.get('NodeGroups', [])),
'availability_zones': self.get_redis_availability_zones(replication_group.get('NodeGroups', [])),
'subnet_group_name': self.get_redis_subnet_group_name(replication_group, cache_clusters),
'parameter_group_name': self.get_redis_parameter_group_name(replication_group, cache_clusters),
'node_count': self.get_node_count(replication_group.get('MemberClusters', [])),
'cloudtrail': self.set_cloudtrail(region_name, cloudtrail_resource_type,
replication_group['ReplicationGroupId']),
'nodes': self.get_redis_nodes_info(replication_group, cache_clusters),
})

if replication_group.get('mode') == 'Redis':
if replication_group.get('Engine') == 'redis':
replication_group.update({
'primary_endpoint': self.get_redis_primary_endpoint(replication_group),
'reader_endpoint': self.get_redis_reader_endpoint(replication_group)
'mode': self.set_redis_mode(replication_group.get('ClusterEnabled')),
'engine': 'redis',
'engine_version': self.get_engine_version(replication_group, cache_clusters),
'shard_count': self.get_shard_count(replication_group.get('NodeGroups', [])),
'availability_zones': self.get_availability_zones(replication_group.get('NodeGroups', [])),
'subnet_group_name': self.get_subnet_group_name(replication_group, cache_clusters),
'parameter_group_name': self.get_parameter_group_name(replication_group, cache_clusters),
'node_count': self.get_node_count(replication_group.get('MemberClusters', [])),
'cloudtrail': self.set_cloudtrail(region_name, cloudtrail_resource_type,
replication_group['ReplicationGroupId']),
'nodes': self.get_nodes_info(replication_group, cache_clusters),
})
elif replication_group.get('mode') == 'Clustered Redis':
replication_group.update({
'shards': self.get_redis_shards_info(replication_group)

if replication_group.get('mode') == 'Redis':
replication_group.update({
'primary_endpoint': self.get_primary_endpoint(replication_group),
'reader_endpoint': self.get_reader_endpoint(replication_group)
})
elif replication_group.get('mode') == 'Clustered Redis':
replication_group.update({
'shards': self.get_shards_info(replication_group)
})

redis_vo = Redis(replication_group, strict=False)

yield RedisResource({
'data': redis_vo,
'name': redis_vo.replication_group_id,
'instance_type': redis_vo.cache_node_type,
'account': self.account_id,
'region_code': region_name,
'tags': self.list_tags(redis_vo.arn),
'reference': ReferenceModel(redis_vo.reference(region_name))
})

redis_vo = Redis(replication_group, strict=False)
except Exception as e:
resource_id = replication_group.get('ARN', '')
error_resource_response = self.generate_error(region_name, resource_id, e)
yield {'data': error_resource_response}

def get_valkey_data(self, region_name, replication_groups, cache_clusters):
self.cloud_service_type = 'Valkey'
cloudtrail_resource_type = None

for replication_group in replication_groups:
try:
if replication_group.get('Engine') == 'valkey':
replication_group.update({
'mode': self.set_valkey_mode(replication_group.get('ClusterEnabled')),
'engine': 'valkey',
'engine_version': self.get_engine_version(replication_group, cache_clusters),
'shard_count': self.get_shard_count(replication_group.get('NodeGroups', [])),
'availability_zones': self.get_availability_zones(
replication_group.get('NodeGroups', [])),
'subnet_group_name': self.get_subnet_group_name(replication_group, cache_clusters),
'parameter_group_name': self.get_parameter_group_name(replication_group, cache_clusters),
'node_count': self.get_node_count(replication_group.get('MemberClusters', [])),
'cloudtrail': self.set_cloudtrail(region_name, cloudtrail_resource_type,
replication_group['ReplicationGroupId']),
'nodes': self.get_nodes_info(replication_group, cache_clusters),
})

yield RedisResource({
'data': redis_vo,
'name': redis_vo.replication_group_id,
'instance_type': redis_vo.cache_node_type,
'account': self.account_id,
'region_code': region_name,
'tags': self.list_tags(redis_vo.arn),
'reference': ReferenceModel(redis_vo.reference(region_name))
})
if replication_group.get('mode') == 'Valkey':
replication_group.update({
'primary_endpoint': self.get_primary_endpoint(replication_group),
'reader_endpoint': self.get_reader_endpoint(replication_group)
})
elif replication_group.get('mode') == 'Clustered Valkey':
replication_group.update({
'shards': self.get_shards_info(replication_group)
})

valkey_vo = Valkey(replication_group, strict=False)

yield ValkeyResource({
'data': valkey_vo,
'name': valkey_vo.replication_group_id,
'instance_type': valkey_vo.cache_node_type,
'account': self.account_id,
'region_code': region_name,
'tags': self.list_tags(valkey_vo.arn),
'reference': ReferenceModel(valkey_vo.reference(region_name))
})

except Exception as e:
resource_id = replication_group.get('ARN', '')
Expand Down Expand Up @@ -195,6 +257,13 @@ def set_redis_mode(cluster_enabled):
else:
return 'Redis'

@staticmethod
def set_valkey_mode(cluster_enabled):
if cluster_enabled:
return 'Clustered Valkey'
else:
return 'Valkey'

@staticmethod
def get_node_count(member_clusters):
return len(member_clusters)
Expand All @@ -204,13 +273,13 @@ def get_shard_count(node_groups):
return len(node_groups)

@staticmethod
def get_redis_primary_endpoint(replication_group):
def get_primary_endpoint(replication_group):
for node_group in replication_group.get('NodeGroups', []):
primary_endpoint = node_group.get("PrimaryEndpoint", {})
return f'{primary_endpoint.get("Address", "")}:{primary_endpoint.get("Port", "")}'

@staticmethod
def get_redis_reader_endpoint(replication_group):
def get_reader_endpoint(replication_group):
for node_group in replication_group.get('NodeGroups', []):
reader_endpoint = node_group.get("ReaderEndpoint", {})
return f'{reader_endpoint.get("Address", "")}:{reader_endpoint.get("Port", "")}'
Expand All @@ -225,7 +294,7 @@ def get_engine_version(replication_group, cache_clusters):
return ''

@staticmethod
def get_redis_availability_zones(node_groups):
def get_availability_zones(node_groups):
azs = []

for node_group in node_groups:
Expand All @@ -237,7 +306,7 @@ def get_redis_availability_zones(node_groups):
return list(set(azs))

@staticmethod
def get_redis_subnet_group_name(replication_group, cache_clusters):
def get_subnet_group_name(replication_group, cache_clusters):
for member in replication_group.get('MemberClusters', []):
for cache_cluster in cache_clusters:
if cache_cluster.get('CacheClusterId') == member:
Expand All @@ -246,15 +315,15 @@ def get_redis_subnet_group_name(replication_group, cache_clusters):
return ''

@staticmethod
def get_redis_parameter_group_name(replication_group, cache_clusters):
def get_parameter_group_name(replication_group, cache_clusters):
for member in replication_group.get('MemberClusters', []):
for cache_cluster in cache_clusters:
if cache_cluster.get('CacheClusterId') == member:
return cache_cluster.get('CacheParameterGroup', {}).get('CacheParameterGroupName', '')

return ''

def get_redis_nodes_info(self, replication_group, cache_clusters):
def get_nodes_info(self, replication_group, cache_clusters):
nodes = []

for member in replication_group.get('MemberClusters', []):
Expand Down Expand Up @@ -282,7 +351,7 @@ def get_redis_nodes_info(self, replication_group, cache_clusters):
})
else:
node_dic.update({
'endpoint': self.set_redis_cluster_node_endpoint(member, replication_group.get('ConfigurationEndpoint', {}).get('Address')),
'endpoint': self.set_cluster_node_endpoint(member, replication_group.get('ConfigurationEndpoint', {}).get('Address')),
'port': replication_group.get('ConfigurationEndpoint', {}).get('Port', ''),
'zone': node_group_member.get('PreferredAvailabilityZone', ''),
})
Expand All @@ -292,7 +361,7 @@ def get_redis_nodes_info(self, replication_group, cache_clusters):
return nodes

@staticmethod
def get_redis_shards_info(replication_group):
def get_shards_info(replication_group):
shards = []

for node_group in replication_group.get('NodeGroups', []):
Expand All @@ -306,7 +375,7 @@ def get_redis_shards_info(replication_group):
return shards

@staticmethod
def set_redis_cluster_node_endpoint(member, address):
def set_cluster_node_endpoint(member, address):
if address:
address_split = address.split('.')[1:]
address_split.insert(0, member)
Expand Down
Loading
Loading