Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.vscode
.pytest_cache
__pycache__
*.DS_Store
947 changes: 522 additions & 425 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ python = "^3.7"
boto3 = "^1.26.91"

[tool.poetry.group.dev.dependencies]
pytest = "^7.2.2"
pytest-cov = "^4.0.0"
moto = "^4.1.4"
pytest = "^7.3.2"
pytest-cov = "^4.1.0"
moto = "^4.1.11"
black = "^23.1"
flake8 = [
# https://github.com/python/importlib_metadata/issues/406
{ version = "*", python="^3.7" },
{ version = ">=5", python= ">=3.8"},
]
isort = [
{ version = "5.11.5", python="^3.7" },
{ version = "5.11.5", python="3.7" },
{ version = "^5.11.6", python= ">=3.8"},
]

Expand Down
5 changes: 2 additions & 3 deletions src/sns_extended_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@

from .session import SNSExtendedClientSession

# Monkey patch to use our Session object instead of boto3's
boto3.session.Session = SNSExtendedClientSession
setattr(boto3.session, "Session", SNSExtendedClientSession)

# Now take care of the reference in the boto3.__init__ module
setattr(boto3, "Session", SNSExtendedClientSession)

# Now ensure that even the default session is our SNSExtendedClientSession
if boto3.DEFAULT_SESSION:
boto3.setup_default_session()
boto3.setup_default_session()
178 changes: 90 additions & 88 deletions src/sns_extended_client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
from uuid import uuid4

import boto3

import botocore.session
from boto3 import resource

import logging
logger = logging.getLogger("sns_extended_client.client")
logger.setLevel(logging.WARNING)

from .exceptions import MissingPayloadOffloadingResource, SNSExtendedClientException

Expand Down Expand Up @@ -85,23 +89,6 @@ def _set_use_legacy_attribute(self, use_legacy_attribute: bool):
setattr(self, "__use_legacy_attribute", use_legacy_attribute)


def _delete_s3(self):
if hasattr(self, "__s3"):
del self.__s3


def _get_s3(self):
s3 = getattr(self, "__s3", None)
if not s3:
s3 = resource("s3")
self.s3 = s3
return s3


def _set_s3(self, s3):
setattr(self, "__s3", s3)


def _is_large_message(self, attributes: dict, encoded_body: bytes):
total = 0
for key, value in attributes.items():
Expand Down Expand Up @@ -147,14 +134,6 @@ def _get_s3_key(self, message_attributes: dict):
return str(uuid4())


def _create_s3_put_object_params(self, encoded_body: bytes):
return {
"ACL": "private",
"Body": encoded_body,
"ContentLength": len(encoded_body),
}


def _create_reserved_message_attribute_value(self, encoded_body_size_string):
return {"DataType": "Number", "StringValue": encoded_body_size_string}

Expand Down Expand Up @@ -197,9 +176,7 @@ def _make_payload(self, message_attributes: dict, message_body, message_structur

s3_key = self._get_s3_key(message_attributes)

self.s3.Object(self.large_payload_support, s3_key).put(
**self._create_s3_put_object_params(encoded_body)
)
self.s3_client.put_object(Bucket=self.large_payload_support, Key=s3_key, Body=encoded_body)

message_body = dumps(
[
Expand All @@ -211,53 +188,6 @@ def _make_payload(self, message_attributes: dict, message_body, message_structur
return message_attributes, message_body


def _add_custom_attributes(class_attributes: dict):
class_attributes["large_payload_support"] = property(
_get_large_payload_support,
_set_large_payload_support,
_delete_large_payload_support,
)
class_attributes["message_size_threshold"] = property(
_get_message_size_threshold,
_set_message_size_threshold,
_delete_messsage_size_threshold,
)
class_attributes["always_through_s3"] = property(
_get_always_through_s3,
_set_always_through_s3,
_delete_always_through_s3,
)
class_attributes["use_legacy_attribute"] = property(
_get_use_legacy_attribute,
_set_use_legacy_attribute,
_delete_use_legacy_attribute,
)
class_attributes["s3"] = property(_get_s3, _set_s3, _delete_s3)

class_attributes["_create_s3_put_object_params"] = _create_s3_put_object_params
class_attributes[
"_create_reserved_message_attribute_value"
] = _create_reserved_message_attribute_value
class_attributes["_is_large_message"] = _is_large_message
class_attributes["_make_payload"] = _make_payload
class_attributes["_get_s3_key"] = _get_s3_key
class_attributes["_check_size_of_message_attributes"] = _check_size_of_message_attributes
class_attributes["_check_message_attributes"] = _check_message_attributes
class_attributes["publish"] = _publish_decorator(class_attributes["publish"])


def _add_client_custom_attributes(base_classes, **kwargs):
_add_custom_attributes(kwargs["class_attributes"])


def _add_topic_resource_custom_attributes(class_attributes, **kwargs):
_add_custom_attributes(class_attributes)


def _add_platform_endpoint_resource_custom_attributes(class_attributes, **kwargs):
_add_custom_attributes(class_attributes)


def _publish_decorator(func):
def _publish(self, **kwargs):
if (
Expand All @@ -270,14 +200,39 @@ def _publish(self, **kwargs):
kwargs["MessageAttributes"], kwargs["Message"] = self._make_payload(
kwargs.get("MessageAttributes", {}),
kwargs["Message"],
kwargs.get("MessageStructure", None),
kwargs.get("MessageStructure",None),
)
return func(self, **kwargs)

return _publish



class SNSExtendedClientSession(boto3.session.Session):

"""
A session stores configuration state and allows you to create service
clients and resources. SNSExtendedClientSession extends the functionality
of the boto3 Session object by using the .register event functionality.

:type aws_access_key_id: string
:param aws_access_key_id: AWS access key ID
:type aws_secret_access_key: string
:param aws_secret_access_key: AWS secret access key
:type aws_session_token: string
:param aws_session_token: AWS temporary session token
:type region_name: string
:param region_name: Default region when creating new connections
:type botocore_session: botocore.session.Session
:param botocore_session: Use this Botocore session instead of creating
a new default one.
:type profile_name: string
:param profile_name: The name of a profile to use. If not given, then
the default profile is used.

"""


def __init__(
self,
aws_access_key_id=None,
Expand All @@ -288,31 +243,78 @@ def __init__(
profile_name=None,
):
if botocore_session is None:
botocore_session = botocore.session.get_session()
self._session = botocore.session.get_session()
else:
self._session = botocore_session

user_agent_header = self.__class__.__name__
self.add_custom_user_agent()

# Attaching SNSExtendedClient Session to the HTTP headers
if botocore_session.user_agent_extra:
botocore_session.user_agent_extra += " " + user_agent_header
else:
botocore_session.user_agent_extra = user_agent_header


super().__init__(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_session_token=aws_session_token,
region_name=region_name,
botocore_session=botocore_session,
botocore_session=self._session,
profile_name=profile_name,
)

self.events.register("creating-client-class.sns", _add_client_custom_attributes)
# Adding Additional attributes for sns Client, Topic and PlatformEndpoint Objects
self.events.register("creating-client-class.sns", self.add_custom_attributes)
self.events.register(
"creating-resource-class.sns.Topic",
_add_topic_resource_custom_attributes,
self.add_custom_attributes,
)
self.events.register(
"creating-resource-class.sns.PlatformEndpoint",
_add_platform_endpoint_resource_custom_attributes,
self.add_custom_attributes,
)

def add_custom_user_agent(self):
# Attaching SNSExtendedClient Session to the HTTP headers

user_agent_header = self.__class__.__name__

if self._session.user_agent_extra:
self._session.user_agent_extra += " " + user_agent_header
else:
self._session.user_agent_extra = user_agent_header

def add_custom_attributes(self,class_attributes,**kwargs):


class_attributes["large_payload_support"] = property(
_get_large_payload_support,
_set_large_payload_support,
_delete_large_payload_support,
)
class_attributes["message_size_threshold"] = property(
_get_message_size_threshold,
_set_message_size_threshold,
_delete_messsage_size_threshold,
)
class_attributes["always_through_s3"] = property(
_get_always_through_s3,
_set_always_through_s3,
_delete_always_through_s3,
)
class_attributes["use_legacy_attribute"] = property(
_get_use_legacy_attribute,
_set_use_legacy_attribute,
_delete_use_legacy_attribute,
)
class_attributes["s3_client"] = super().client("s3")

class_attributes[
"_create_reserved_message_attribute_value"
] = _create_reserved_message_attribute_value
class_attributes["_is_large_message"] = _is_large_message
class_attributes["_make_payload"] = _make_payload
class_attributes["_get_s3_key"] = _get_s3_key

# Adding the S3 client to the object

class_attributes["_check_size_of_message_attributes"] = _check_size_of_message_attributes
class_attributes["_check_message_attributes"] = _check_message_attributes
class_attributes["publish"] = _publish_decorator(class_attributes["publish"])
Loading