Skip to content

Commit 49bf21d

Browse files
refactoring
1 parent 8caa1d3 commit 49bf21d

File tree

7 files changed

+320
-37
lines changed

7 files changed

+320
-37
lines changed

aws_lambda_powertools/event_handler/appsync_events.py

Lines changed: 152 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
import asyncio
44
import logging
5+
import warnings
56
from typing import TYPE_CHECKING, Any, Callable
67

78
from aws_lambda_powertools.event_handler.events_appsync.router import Router
89
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_events_event import AppSyncResolverEventsEvent
10+
from aws_lambda_powertools.warnings import PowertoolsUserWarning
911

1012
if TYPE_CHECKING:
1113
from aws_lambda_powertools.utilities.typing.lambda_context import LambdaContext
@@ -43,7 +45,7 @@ def resolve(
4345
self.current_event = Router.current_event
4446

4547
if self.current_event.info.operation == "PUBLISH":
46-
response = self._call_publish_events(payload=self.current_event.events)
48+
return self._call_publish_events(payload=self.current_event.events)
4749

4850
response = self._call_subscribe_events()
4951

@@ -52,33 +54,165 @@ def resolve(
5254
return response
5355

5456
def _call_subscribe_events(self) -> Any:
55-
# PLACEHOLDER
57+
logger.debug(f"Processing subscribe events for path {self.current_event.info.channel_path}")
58+
59+
resolver = self._subscribe_registry.find_resolver(self.current_event.info.channel_path)
60+
if not resolver:
61+
warnings.warn(
62+
f"No resolvers were found for publish operations with path {self.current_event.info.channel_path}",
63+
stacklevel=2,
64+
category=PowertoolsUserWarning,
65+
)
66+
return
5667
pass
5768

5869
def _call_publish_events(self, payload: list[dict[str, Any]]) -> Any:
5970
"""Call single event resolver
6071
6172
Parameters
6273
----------
63-
event : dict
64-
Event
65-
data_model : type[AppSyncResolverEvent]
66-
Data_model to decode AppSync event, by default it is of AppSyncResolverEvent type or subclass of it
74+
payload : list[dict[str, Any]]
75+
the messages sent by AppSync
6776
"""
6877

69-
result = []
70-
logger.debug("Processing direct resolver event")
78+
logger.debug(f"Processing publish events for path {self.current_event.info.channel_path}")
7179

72-
#self.current_event = data_model(event)
7380
resolver = self._publish_registry.find_resolver(self.current_event.info.channel_path)
74-
if not resolver:
75-
print(f"No resolver found for '{self.current_event.info.channel_path}'")
76-
print(resolver)
81+
async_resolver = self._async_publish_registry.find_resolver(self.current_event.info.channel_path)
82+
83+
if resolver and async_resolver:
84+
warnings.warn(
85+
f"Both synchronous and asynchronous resolvers found for the same event and field."
86+
f"The synchronous resolver takes precedence. Executing: {resolver['func'].__name__}",
87+
stacklevel=2,
88+
category=PowertoolsUserWarning,
89+
)
90+
91+
if resolver:
92+
logger.debug(f"Found sync resolver. {resolver}")
93+
return self._call_publish_event_sync_resolver(
94+
resolver=resolver["func"],
95+
aggregate=resolver["aggregate"],
96+
)
97+
98+
if async_resolver:
99+
logger.debug(f"Found async resolver. {resolver}")
100+
return asyncio.run(
101+
self._call_publish_event_async_resolver(
102+
resolver=async_resolver["func"],
103+
aggregate=async_resolver["aggregate"],
104+
),
105+
)
106+
107+
# No resolver found
108+
# Warning and returning AS IS
109+
warnings.warn(
110+
f"No resolvers were found for publish operations with path {self.current_event.info.channel_path}",
111+
stacklevel=2,
112+
category=PowertoolsUserWarning)
113+
114+
return {"events": payload}
115+
116+
def _call_publish_event_sync_resolver(
117+
self,
118+
resolver: Callable,
119+
aggregate: bool = True,
120+
) -> list[Any]:
121+
"""
122+
Calls a synchronous batch resolver function for each event in the current batch.
123+
124+
Parameters
125+
----------
126+
resolver: Callable
127+
The callable function to resolve events.
128+
raise_on_error: bool
129+
A flag indicating whether to raise an error when processing batches
130+
with failed items. Defaults to False, which means errors are handled without raising exceptions.
131+
aggregate: bool
132+
A flag indicating whether the batch items should be processed at once or individually.
133+
If True (default), the batch resolver will process all items in the batch as a single event.
134+
If False, the batch resolver will process each item in the batch individually.
135+
136+
Returns
137+
-------
138+
list[Any]
139+
A list of results corresponding to the resolved events.
140+
"""
141+
142+
# Checks whether the entire batch should be processed at once
143+
if aggregate:
144+
# Process the entire batch
145+
response = resolver(payload=self.current_event.events)
146+
147+
if not isinstance(response, list):
148+
warnings.warn(
149+
"Response must be a list when using aggregate, AppSync will drop those events.",
150+
stacklevel=2,
151+
category=PowertoolsUserWarning)
152+
153+
return response
154+
155+
156+
# By default, we gracefully append `None` for any records that failed processing
157+
results = []
158+
for idx, event in enumerate(self.current_event.events):
159+
try:
160+
results.append(resolver(payload=event))
161+
except Exception:
162+
logger.debug(f"Failed to process event number {idx}")
163+
results.append(None)
77164

78-
if not resolver["aggregate"]:
79-
return resolver["func"](payload=self.current_event.events)
80-
else:
81-
for i in self.current_event.events:
82-
result.append(resolver["func"](payload=i))
165+
return results
83166

84-
return result
167+
async def _call_publish_event_async_resolver(
168+
self,
169+
resolver: Callable,
170+
aggregate: bool = True,
171+
) -> list[Any]:
172+
"""
173+
Asynchronously call a batch resolver for each event in the current batch.
174+
175+
Parameters
176+
----------
177+
resolver: Callable
178+
The asynchronous resolver function.
179+
raise_on_error: bool
180+
A flag indicating whether to raise an error when processing batches
181+
with failed items. Defaults to False, which means errors are handled without raising exceptions.
182+
aggregate: bool
183+
A flag indicating whether the batch items should be processed at once or individually.
184+
If True (default), the batch resolver will process all items in the batch as a single event.
185+
If False, the batch resolver will process each item in the batch individually.
186+
187+
Returns
188+
-------
189+
list[Any]
190+
A list of results corresponding to the resolved events.
191+
"""
192+
193+
# Checks whether the entire batch should be processed at once
194+
if aggregate:
195+
# Process the entire batch
196+
response = await resolver(event=self.current_batch_event)
197+
if not isinstance(response, list):
198+
warnings.warn(
199+
"Response must be a list when using aggregate, AppSync will drop those events.",
200+
stacklevel=2,
201+
category=PowertoolsUserWarning)
202+
203+
return response
204+
205+
response: list = []
206+
207+
# Prime coroutines
208+
tasks = [resolver(event=e, **e.arguments) for e in self.current_batch_event]
209+
210+
# Aggregate results and exceptions, then filter them out
211+
# Use `None` upon exception for graceful error handling at GraphQL engine level
212+
#
213+
# NOTE: asyncio.gather(return_exceptions=True) catches and includes exceptions in the results
214+
# this will become useful when we support exception handling in AppSync resolver
215+
results = await asyncio.gather(*tasks, return_exceptions=True)
216+
response.extend(None if isinstance(ret, Exception) else ret for ret in results)
217+
218+
return response

aws_lambda_powertools/event_handler/events_appsync/_registry.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@
99

1010
logger = logging.getLogger(__name__)
1111

12+
KIND_EVENT = Literal["on_publish", "async_on_publish", "on_subscribe"]
13+
1214
class ResolverEventsRegistry:
13-
def __init__(self):
15+
def __init__(self, kind_resolver: str):
1416
self.resolvers: dict[str, dict[str, Any]] = {}
17+
self.kind_resolver = kind_resolver
1518

1619
def register(
1720
self,
1821
path: str = "/default/*",
1922
aggregate: bool = False,
20-
operation: Literal["on_publish", "async_on_publish", "on_subscribe"] = "on_publish",
2123
) -> Callable:
2224
"""Registers the resolver for path that includes namespace + channel
2325
@@ -27,8 +29,8 @@ def register(
2729
Path including namespace + channel
2830
aggregate: bool
2931
A flag indicating whether the batch items should be processed at once or individually.
30-
If True , the batch resolver will process all items in the batch as a single event.
31-
If False (default), the batch resolver will process each item in the batch individually.
32+
If True, the resolver will process all items as a single event.
33+
If False (default), the resolver will process each item individually.
3234
3335
Return
3436
----------
@@ -37,7 +39,7 @@ def register(
3739
"""
3840
if not is_valid_path(path):
3941
warnings.warn(
40-
f"The path `{path}` registered for `{operation}` is not valid and will be skipped."
42+
f"The path `{path}` registered for `{self.kind_resolver}` is not valid and will be skipped."
4143
f"A path should always have a namespace starting with '/'"
4244
"A path can have multiple namespaces, all separated by '/'."
4345
"Wildcards are allowed only at the end of the path.",
@@ -47,7 +49,7 @@ def register(
4749

4850

4951
def _register(func) -> Callable:
50-
logger.debug(f"Adding resolver `{func.__name__}` for path `{path}` and event `{operation}`")
52+
print(f"Adding resolver `{func.__name__}` for path `{path}` and kind_resolver `{self.kind_resolver}`")
5153
self.resolvers[f"{path}"] = {
5254
"func": func,
5355
"aggregate": aggregate,
@@ -65,10 +67,10 @@ def find_resolver(self, path: str) -> dict | None:
6567
Type name
6668
Return
6769
----------
68-
Optional[Dict]
69-
A dictionary with the resolver and if raise exception on error
70+
dict | None
71+
A dictionary with the resolver and if this is aggregated or not
7072
"""
71-
logger.debug(f"Looking for resolver for path={path}")
73+
logger.debug(f"Looking for resolver for path `{path}` and kind_resolver `{self.kind_resolver}`")
7274
return self.resolvers.get(find_best_route(self.resolvers, path))
7375

7476
def merge(self, other_registry: ResolverEventsRegistry):

aws_lambda_powertools/event_handler/events_appsync/router.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44

55
from aws_lambda_powertools.event_handler.events_appsync._registry import ResolverEventsRegistry
66
from aws_lambda_powertools.event_handler.events_appsync.base import BaseRouter
7-
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_events_event import AppSyncResolverEventsEvent
87

98
if TYPE_CHECKING:
10-
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import AppSyncResolverEvent
9+
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_events_event import AppSyncResolverEventsEvent
1110
from aws_lambda_powertools.utilities.typing.lambda_context import LambdaContext
1211

1312

@@ -19,29 +18,29 @@ class Router(BaseRouter):
1918

2019
def __init__(self):
2120
self.context = {} # early init as customers might add context before event resolution
22-
self._publish_registry = ResolverEventsRegistry()
23-
self._async_publish_registry = ResolverEventsRegistry()
24-
self._subscribe_registry = ResolverEventsRegistry()
21+
self._publish_registry = ResolverEventsRegistry(kind_resolver="on_publish")
22+
self._async_publish_registry = ResolverEventsRegistry(kind_resolver="async_on_publish")
23+
self._subscribe_registry = ResolverEventsRegistry(kind_resolver="on_subscribe")
2524

2625
def on_publish(
2726
self,
2827
path: str = "/default/*",
2928
aggregate: bool = False,
3029
) -> Callable:
31-
return self._publish_registry.register(path=path, aggregate=aggregate, operation="on_publish")
30+
return self._publish_registry.register(path=path, aggregate=aggregate)
3231

3332
def async_on_publish(
3433
self,
3534
path: str = "/default/*",
3635
aggregate: bool = False,
3736
) -> Callable:
38-
return self._async_publish_registry.register(path=path, aggregate=aggregate, operation="async_on_publish")
37+
return self._async_publish_registry.register(path=path, aggregate=aggregate)
3938

4039
def on_subscribe(
4140
self,
4241
path: str = "/default/*",
4342
) -> Callable:
44-
return self._subscribe_registry.register(path=path, operation="on_subscribe")
43+
return self._subscribe_registry.register(path=path)
4544

4645
def append_context(self, **additional_context):
4746
"""Append key=value data as routing context"""

tests/unit/event_handler/_required_dependencies/__init__.py

Whitespace-only changes.

tests/unit/event_handler/_required_dependencies/appsync_events/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)