diff --git a/datadog_sync/utils/storage/aws_s3_bucket.py b/datadog_sync/utils/storage/aws_s3_bucket.py index a752679f..d79ead59 100644 --- a/datadog_sync/utils/storage/aws_s3_bucket.py +++ b/datadog_sync/utils/storage/aws_s3_bucket.py @@ -59,39 +59,77 @@ def get(self, origin: Origin) -> StorageData: log.info("AWS S3 get called") data = StorageData() - prefix_contents = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=self.source_resources_path) - source_prefix_exists = "Contents" in prefix_contents - if origin in [Origin.SOURCE, Origin.ALL] and source_prefix_exists: - for item in prefix_contents["Contents"]: - key = item["Key"] - if key.endswith(".json"): - resource_type = key.split(".")[0].split("/")[-1] - response = self.client.get_object( - Bucket=self.bucket_name, - Key=key, - ) - content_body = response.get("Body") - try: - data.source[resource_type].update(json.load(content_body)) - except json.decoder.JSONDecodeError: - log.warning(f"invalid json in aws source resource file: {resource_type}") - - prefix_contents = self.client.list_objects_v2(Bucket=self.bucket_name, Prefix=self.destination_resources_path) - destination_prefix_exists = "Contents" in prefix_contents - if origin in [Origin.DESTINATION, Origin.ALL] and destination_prefix_exists: - for item in prefix_contents["Contents"]: - key = item["Key"] - if key.endswith(".json"): - resource_type = key.split(".")[0].split("/")[-1] - response = self.client.get_object( - Bucket=self.bucket_name, - Key=key, - ) - content_body = response.get("Body") - try: - data.destination[resource_type].update(json.load(content_body)) - except json.decoder.JSONDecodeError: - log.warning(f"invalid json in aws destination resource file: {resource_type}") + # Load source resources with pagination + if origin in [Origin.SOURCE, Origin.ALL]: + continuation_token = None + while True: + # Build list_objects_v2 kwargs + list_kwargs = { + "Bucket": self.bucket_name, + "Prefix": self.source_resources_path, + } + if continuation_token: + list_kwargs["ContinuationToken"] = continuation_token + + prefix_contents = self.client.list_objects_v2(**list_kwargs) + + # Process contents if they exist + if "Contents" in prefix_contents: + for item in prefix_contents["Contents"]: + key = item["Key"] + if key.endswith(".json"): + resource_type = key.split(".")[0].split("/")[-1] + response = self.client.get_object( + Bucket=self.bucket_name, + Key=key, + ) + content_body = response.get("Body") + try: + data.source[resource_type].update(json.load(content_body)) + except json.decoder.JSONDecodeError: + log.warning(f"invalid json in aws source resource file: {resource_type}") + + # Check if there are more pages to fetch + if prefix_contents.get("IsTruncated"): + continuation_token = prefix_contents.get("NextContinuationToken") + else: + break + + # Load destination resources with pagination + if origin in [Origin.DESTINATION, Origin.ALL]: + continuation_token = None + while True: + # Build list_objects_v2 kwargs + list_kwargs = { + "Bucket": self.bucket_name, + "Prefix": self.destination_resources_path, + } + if continuation_token: + list_kwargs["ContinuationToken"] = continuation_token + + prefix_contents = self.client.list_objects_v2(**list_kwargs) + + # Process contents if they exist + if "Contents" in prefix_contents: + for item in prefix_contents["Contents"]: + key = item["Key"] + if key.endswith(".json"): + resource_type = key.split(".")[0].split("/")[-1] + response = self.client.get_object( + Bucket=self.bucket_name, + Key=key, + ) + content_body = response.get("Body") + try: + data.destination[resource_type].update(json.load(content_body)) + except json.decoder.JSONDecodeError: + log.warning(f"invalid json in aws destination resource file: {resource_type}") + + # Check if there are more pages to fetch + if prefix_contents.get("IsTruncated"): + continuation_token = prefix_contents.get("NextContinuationToken") + else: + break return data