Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 71 additions & 33 deletions datadog_sync/utils/storage/aws_s3_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,77 @@ def get(self, origin: Origin) -> StorageData:
log.info("AWS S3 get called")
data = StorageData()

prefix_contents = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=self.source_resources_path)
source_prefix_exists = "Contents" in prefix_contents
if origin in [Origin.SOURCE, Origin.ALL] and source_prefix_exists:
for item in prefix_contents["Contents"]:
key = item["Key"]
if key.endswith(".json"):
resource_type = key.split(".")[0].split("/")[-1]
response = self.client.get_object(
Bucket=self.bucket_name,
Key=key,
)
content_body = response.get("Body")
try:
data.source[resource_type].update(json.load(content_body))
except json.decoder.JSONDecodeError:
log.warning(f"invalid json in aws source resource file: {resource_type}")

prefix_contents = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=self.destination_resources_path)
destination_prefix_exists = "Contents" in prefix_contents
if origin in [Origin.DESTINATION, Origin.ALL] and destination_prefix_exists:
for item in prefix_contents["Contents"]:
key = item["Key"]
if key.endswith(".json"):
resource_type = key.split(".")[0].split("/")[-1]
response = self.client.get_object(
Bucket=self.bucket_name,
Key=key,
)
content_body = response.get("Body")
try:
data.destination[resource_type].update(json.load(content_body))
except json.decoder.JSONDecodeError:
log.warning(f"invalid json in aws destination resource file: {resource_type}")
# Load source resources with pagination
if origin in [Origin.SOURCE, Origin.ALL]:
continuation_token = None
while True:
# Build list_objects_v2 kwargs
list_kwargs = {
"Bucket": self.bucket_name,
"Prefix": self.source_resources_path,
}
if continuation_token:
list_kwargs["ContinuationToken"] = continuation_token

prefix_contents = self.client.list_objects_v2(**list_kwargs)

# Process contents if they exist
if "Contents" in prefix_contents:
for item in prefix_contents["Contents"]:
key = item["Key"]
if key.endswith(".json"):
resource_type = key.split(".")[0].split("/")[-1]
response = self.client.get_object(
Bucket=self.bucket_name,
Key=key,
)
content_body = response.get("Body")
try:
data.source[resource_type].update(json.load(content_body))
except json.decoder.JSONDecodeError:
log.warning(f"invalid json in aws source resource file: {resource_type}")

# Check if there are more pages to fetch
if prefix_contents.get("IsTruncated"):
continuation_token = prefix_contents.get("NextContinuationToken")
else:
break

# Load destination resources with pagination
if origin in [Origin.DESTINATION, Origin.ALL]:
continuation_token = None
while True:
# Build list_objects_v2 kwargs
list_kwargs = {
"Bucket": self.bucket_name,
"Prefix": self.destination_resources_path,
}
if continuation_token:
list_kwargs["ContinuationToken"] = continuation_token

prefix_contents = self.client.list_objects_v2(**list_kwargs)

# Process contents if they exist
if "Contents" in prefix_contents:
for item in prefix_contents["Contents"]:
key = item["Key"]
if key.endswith(".json"):
resource_type = key.split(".")[0].split("/")[-1]
response = self.client.get_object(
Bucket=self.bucket_name,
Key=key,
)
content_body = response.get("Body")
try:
data.destination[resource_type].update(json.load(content_body))
except json.decoder.JSONDecodeError:
log.warning(f"invalid json in aws destination resource file: {resource_type}")

# Check if there are more pages to fetch
if prefix_contents.get("IsTruncated"):
continuation_token = prefix_contents.get("NextContinuationToken")
else:
break

return data

Expand Down
Loading