3
3
import asyncio
4
4
import logging
5
5
import warnings
6
- from typing import TYPE_CHECKING , Any , Callable
6
+ from typing import TYPE_CHECKING , Any
7
7
8
8
from aws_lambda_powertools .event_handler .events_appsync .router import Router
9
9
from aws_lambda_powertools .utilities .data_classes .appsync_resolver_events_event import AppSyncResolverEventsEvent
10
10
from aws_lambda_powertools .warnings import PowertoolsUserWarning
11
11
12
12
if TYPE_CHECKING :
13
+ from collections .abc import Callable
14
+
15
+ from aws_lambda_powertools .event_handler .events_appsync .types import ResolverTypeDef
13
16
from aws_lambda_powertools .utilities .typing .lambda_context import LambdaContext
14
17
18
+
15
19
logger = logging .getLogger (__name__ )
20
+
21
+
16
22
class AppSyncEventsResolver (Router ):
17
23
"""
18
24
AppSync Events API Resolver
@@ -25,35 +31,37 @@ def __init__(self):
25
31
26
32
def __call__ (
27
33
self ,
28
- event : dict ,
34
+ event : dict | AppSyncResolverEventsEvent ,
29
35
context : LambdaContext ,
30
36
) -> Any :
31
37
"""Implicit lambda handler which internally calls `resolve`"""
32
38
return self .resolve (event , context )
33
39
34
40
def resolve (
35
41
self ,
36
- event : AppSyncResolverEventsEvent ,
42
+ event : dict | AppSyncResolverEventsEvent ,
37
43
context : LambdaContext ,
38
44
) -> Any :
39
45
"""Resolves the response based on the provide event and decorator operation and namespaces"""
40
46
41
47
self .lambda_context = context
42
48
Router .lambda_context = context
43
49
44
- Router .current_event = AppSyncResolverEventsEvent (event )
50
+ Router .current_event = (
51
+ event if isinstance (event , AppSyncResolverEventsEvent ) else AppSyncResolverEventsEvent (event )
52
+ )
45
53
self .current_event = Router .current_event
46
54
47
55
if self .current_event .info .operation == "PUBLISH" :
48
- return self ._call_publish_events (payload = self .current_event .events )
56
+ return self ._publish_events (payload = self .current_event .events )
49
57
50
- response = self ._call_subscribe_events ()
58
+ response = self ._subscribe_events ()
51
59
52
60
self .clear_context ()
53
61
54
62
return response
55
63
56
- def _call_subscribe_events (self ) -> Any :
64
+ def _subscribe_events (self ) -> Any :
57
65
logger .debug (f"Processing subscribe events for path { self .current_event .info .channel_path } " )
58
66
59
67
resolver = self ._subscribe_registry .find_resolver (self .current_event .info .channel_path )
@@ -66,7 +74,7 @@ def _call_subscribe_events(self) -> Any:
66
74
return
67
75
pass
68
76
69
- def _call_publish_events (self , payload : list [dict [str , Any ]]) -> Any :
77
+ def _publish_events (self , payload : list [dict [str , Any ]]) -> list [ dict [ str , Any ]] | dict [ str , Any ] :
70
78
"""Call single event resolver
71
79
72
80
Parameters
@@ -90,34 +98,33 @@ def _call_publish_events(self, payload: list[dict[str, Any]]) -> Any:
90
98
91
99
if resolver :
92
100
logger .debug (f"Found sync resolver. { resolver } " )
93
- return self ._call_publish_event_sync_resolver (
94
- resolver = resolver ["func" ],
95
- aggregate = resolver ["aggregate" ],
101
+ return self ._process_publish_event_sync_resolver (
102
+ resolver = resolver ,
96
103
)
97
104
98
105
if async_resolver :
99
106
logger .debug (f"Found async resolver. { resolver } " )
100
107
return asyncio .run (
101
108
self ._call_publish_event_async_resolver (
102
- resolver = async_resolver ["func" ],
103
- aggregate = async_resolver ["aggregate" ],
109
+ resolver = async_resolver ,
104
110
),
105
111
)
106
112
107
113
# No resolver found
108
114
# Warning and returning AS IS
109
115
warnings .warn (
110
- f"No resolvers were found for publish operations with path { self .current_event .info .channel_path } " ,
116
+ f"No resolvers were found for publish operations with path { self .current_event .info .channel_path } "
117
+ "We will return the entire payload as is" ,
111
118
stacklevel = 2 ,
112
- category = PowertoolsUserWarning )
119
+ category = PowertoolsUserWarning ,
120
+ )
113
121
114
122
return {"events" : payload }
115
123
116
- def _call_publish_event_sync_resolver (
124
+ def _process_publish_event_sync_resolver (
117
125
self ,
118
- resolver : Callable ,
119
- aggregate : bool = True ,
120
- ) -> list [Any ]:
126
+ resolver : ResolverTypeDef ,
127
+ ) -> list [dict [str , Any ]] | dict [str , Any ]:
121
128
"""
122
129
Calls a synchronous batch resolver function for each event in the current batch.
123
130
@@ -140,34 +147,37 @@ def _call_publish_event_sync_resolver(
140
147
"""
141
148
142
149
# Checks whether the entire batch should be processed at once
143
- if aggregate :
144
- # Process the entire batch
145
- response = resolver (payload = self .current_event .events )
150
+ if resolver ["aggregate" ]:
151
+ try :
152
+ # Process the entire batch
153
+ response = resolver ["func" ](payload = self .current_event .events )
146
154
147
- if not isinstance (response , list ):
148
- warnings .warn (
149
- "Response must be a list when using aggregate, AppSync will drop those events." ,
150
- stacklevel = 2 ,
151
- category = PowertoolsUserWarning )
152
-
153
- return response
155
+ if not isinstance (response , list ):
156
+ warnings .warn (
157
+ "Response must be a list when using aggregate, AppSync will drop those events." ,
158
+ stacklevel = 2 ,
159
+ category = PowertoolsUserWarning ,
160
+ )
154
161
162
+ return response
163
+ except Exception as error :
164
+ return {"error" : self .format_error_response (error )}
155
165
156
166
# By default, we gracefully append `None` for any records that failed processing
157
167
results = []
158
168
for idx , event in enumerate (self .current_event .events ):
159
169
try :
160
- results .append (resolver (payload = event ))
161
- except Exception :
170
+ results .append (resolver [ "func" ] (payload = event ))
171
+ except Exception as error :
162
172
logger .debug (f"Failed to process event number { idx } " )
163
- results .append (None )
173
+ error_return = {"id" : event .get ("id" ), "error" : self .format_error_response (error )}
174
+ results .append (error_return )
164
175
165
176
return results
166
177
167
178
async def _call_publish_event_async_resolver (
168
179
self ,
169
- resolver : Callable ,
170
- aggregate : bool = True ,
180
+ resolver : ResolverTypeDef ,
171
181
) -> list [Any ]:
172
182
"""
173
183
Asynchronously call a batch resolver for each event in the current batch.
@@ -191,28 +201,55 @@ async def _call_publish_event_async_resolver(
191
201
"""
192
202
193
203
# Checks whether the entire batch should be processed at once
194
- if aggregate :
204
+ if resolver [ " aggregate" ] :
195
205
# Process the entire batch
196
- response = await resolver (event = self .current_batch_event )
206
+ response = await resolver [ "func" ] (event = self .current_event . events )
197
207
if not isinstance (response , list ):
198
208
warnings .warn (
199
209
"Response must be a list when using aggregate, AppSync will drop those events." ,
200
210
stacklevel = 2 ,
201
- category = PowertoolsUserWarning )
211
+ category = PowertoolsUserWarning ,
212
+ )
202
213
203
214
return response
204
215
205
- response : list = []
216
+ response_async : list = []
206
217
207
218
# Prime coroutines
208
- tasks = [resolver (event = e , ** e . arguments ) for e in self .current_batch_event ]
219
+ tasks = [resolver [ "func" ] (event = e ) for e in self .current_event . events ]
209
220
210
221
# Aggregate results and exceptions, then filter them out
211
222
# Use `None` upon exception for graceful error handling at GraphQL engine level
212
223
#
213
224
# NOTE: asyncio.gather(return_exceptions=True) catches and includes exceptions in the results
214
225
# this will become useful when we support exception handling in AppSync resolver
215
226
results = await asyncio .gather (* tasks , return_exceptions = True )
216
- response .extend (None if isinstance (ret , Exception ) else ret for ret in results )
227
+ response_async .extend (None if isinstance (ret , Exception ) else ret for ret in results )
217
228
218
- return response
229
+ return response_async
230
+
231
+ def include_router (self , router : Router ) -> None :
232
+ """Adds all resolvers defined in a router
233
+
234
+ Parameters
235
+ ----------
236
+ router : Router
237
+ A router containing a dict of field resolvers
238
+ """
239
+
240
+ # Merge app and router context
241
+ logger .debug ("Merging router and app context" )
242
+ self .context .update (** router .context )
243
+
244
+ # use pointer to allow context clearance after event is processed e.g., resolve(evt, ctx)
245
+ router .context = self .context
246
+
247
+ logger .debug ("Merging router resolver registries" )
248
+ self ._publish_registry .merge (router ._publish_registry )
249
+ self ._async_publish_registry .merge (router ._async_publish_registry )
250
+ self ._subscribe_registry .merge (router ._subscribe_registry )
251
+
252
+ def format_error_response (self , error = None ) -> str :
253
+ if isinstance (error , Exception ):
254
+ return f"{ error .__class__ .__name__ } - { str (error )} "
255
+ return "An unknown error occurred"
0 commit comments