@@ -45,23 +45,50 @@ async def webhook_request(client: AsyncClient, url: str, endpoint_id: int, *, we
45
45
'Content-Type' : 'application/json' ,
46
46
'webhook-signature' : webhook_sig ,
47
47
}
48
+ request_data = RequestData (
49
+ endpoint_id = endpoint_id , request_headers = json .dumps (headers ), request_body = json .dumps (data )
50
+ )
48
51
with logfire .span ('{method=} {url!r}' , url = url , method = 'POST' ):
49
- r = None
50
52
try :
53
+ if not data or (isinstance (data , dict ) and not data .get ('events' )):
54
+ request_data .status_code = 999
55
+ request_data .response_headers = json .dumps ({})
56
+ request_data .response_body = json .dumps ({'error' : 'Empty payload' })
57
+ request_data .successful_response = False
58
+ return request_data
59
+
60
+ if isinstance (data , dict ) and data .get ('events' ) and not data ['events' ][0 ].get ('data' ):
61
+ request_data .status_code = 999
62
+ request_data .response_headers = json .dumps ({})
63
+ request_data .response_body = json .dumps ({'error' : 'Empty event data' })
64
+ request_data .successful_response = False
65
+ return request_data
66
+
51
67
r = await client .post (url = url , json = data , headers = headers , timeout = 8 )
68
+ request_data .response_headers = json .dumps (dict (r .headers ))
69
+ try :
70
+ response_body = r .json ()
71
+ except json .JSONDecodeError :
72
+ response_body = r .content .decode ()
73
+ request_data .response_body = json .dumps (response_body )
74
+ request_data .status_code = r .status_code
75
+ request_data .successful_response = r .status_code in {200 , 201 , 202 , 204 }
76
+ return request_data
52
77
except httpx .TimeoutException as terr :
53
78
app_logger .info ('Timeout error sending webhook to %s: %s' , url , terr )
79
+ request_data .status_code = 999
80
+ request_data .response_headers = json .dumps ({}) # Empty headers for timeout
81
+ request_data .response_body = json .dumps ({'error' : 'Timeout error' })
82
+ request_data .successful_response = False
83
+ raise terr
54
84
except httpx .HTTPError as httperr :
55
85
app_logger .info ('HTTP error sending webhook to %s: %s' , url , httperr )
56
- request_data = RequestData (
57
- endpoint_id = endpoint_id , request_headers = json .dumps (headers ), request_body = json .dumps (data )
58
- )
59
- if r is not None :
60
- request_data .response_headers = json .dumps (dict (r .headers ))
61
- request_data .response_body = json .dumps (r .content .decode ())
62
- request_data .status_code = r .status_code
63
- request_data .successful_response = True
64
- return request_data
86
+ response = getattr (httperr , 'response' , httpx .Response (status_code = 500 ))
87
+ request_data .status_code = response .status_code
88
+ request_data .response_headers = json .dumps (dict (response .headers ) if response .headers else {})
89
+ request_data .response_body = json .dumps ({'error' : str (httperr )})
90
+ request_data .successful_response = False
91
+ raise httperr
65
92
66
93
67
94
acceptable_url_schemes = ('http' , 'https' , 'ftp' , 'ftps' )
@@ -107,21 +134,45 @@ async def _async_post_webhooks(endpoints, url_extension, payload):
107
134
if url_extension :
108
135
url += f'/{ url_extension } '
109
136
# Send the Webhook to the endpoint
137
+ try :
138
+ loaded_payload = json .loads (payload )
139
+ task = asyncio .ensure_future (
140
+ webhook_request (client , url , endpoint .id , webhook_sig = sig_hex , data = loaded_payload )
141
+ )
142
+ tasks .append (task )
143
+ except json .JSONDecodeError :
144
+ app_logger .error ('Failed to decode payload for endpoint %s' , endpoint .id )
145
+ continue
110
146
111
- loaded_payload = json .loads (payload )
112
- task = asyncio .ensure_future (
113
- webhook_request (client , url , endpoint .id , webhook_sig = sig_hex , data = loaded_payload )
114
- )
115
- tasks .append (task )
116
147
webhook_responses = await asyncio .gather (* tasks , return_exceptions = True )
117
148
for response in webhook_responses :
149
+ if isinstance (response , Exception ):
150
+ app_logger .info ('Error from endpoint %s: %s' , endpoint .id , response )
151
+ webhook_logs .append (
152
+ WebhookLog (
153
+ webhook_endpoint_id = endpoint .id ,
154
+ request_headers = json .dumps ({}),
155
+ request_body = payload ,
156
+ response_headers = json .dumps ({}),
157
+ response_body = json .dumps ({'error' : str (response )}),
158
+ status = 'Unexpected response' ,
159
+ status_code = 999 ,
160
+ )
161
+ )
162
+ total_failed += 1
163
+ continue
164
+
118
165
if not isinstance (response , RequestData ):
119
166
app_logger .info ('No response from endpoint %s: %s. %s' , endpoint .id , endpoint .webhook_url , response )
120
167
continue
121
- elif not response .successful_response :
122
- app_logger .info ('No response from endpoint %s: %s' , endpoint .id , endpoint .webhook_url )
123
168
124
- if response .status_code in {200 , 201 , 202 , 204 }:
169
+ try :
170
+ response_body = json .loads (response .response_body )
171
+ response_status = response_body .get ('status' , '' ).lower () # Default to empty string if not specified
172
+ except (json .JSONDecodeError , AttributeError ):
173
+ response_status = 'success' # Default to success on parse error
174
+
175
+ if response .status_code in {200 , 201 , 202 , 204 } and response_status == 'success' :
125
176
status = 'Success'
126
177
total_success += 1
127
178
else :
@@ -151,41 +202,49 @@ def task_send_webhooks(
151
202
"""
152
203
Send the webhook to the relevant endpoints
153
204
"""
154
- loaded_payload = json .loads (payload )
155
- loaded_payload ['_request_time' ] = loaded_payload .pop ('request_time' )
156
- qlength = get_qlength ()
157
-
158
- if loaded_payload .get ('events' ):
159
- branch_id = loaded_payload ['events' ][0 ]['branch' ]
160
- else :
161
- branch_id = loaded_payload ['branch_id' ]
162
-
163
- if qlength > 100 :
164
- app_logger .error ('Queue is too long. Check workers and speeds.' )
165
-
166
- app_logger .info ('Starting send webhook task for branch %s. qlength=%s.' , branch_id , qlength )
167
- lf_span = 'Sending webhooks for branch: {branch_id=}'
168
- with logfire .span (lf_span , branch_id = branch_id ):
169
- with Session (engine ) as db :
170
- # Get all the endpoints for the branch
171
- endpoints_query = select (WebhookEndpoint ).where (
172
- WebhookEndpoint .branch_id == branch_id , WebhookEndpoint .active
173
- )
174
- endpoints = db .exec (endpoints_query ).all ()
205
+ try :
206
+ loaded_payload = json .loads (payload )
207
+ if 'request_time' in loaded_payload :
208
+ loaded_payload ['_request_time' ] = loaded_payload .pop ('request_time' )
209
+ qlength = get_qlength ()
210
+
211
+ if loaded_payload .get ('events' ):
212
+ branch_id = loaded_payload ['events' ][0 ]['branch' ]
213
+ else :
214
+ branch_id = loaded_payload ['branch_id' ]
215
+
216
+ if qlength > 100 :
217
+ app_logger .error ('Queue is too long. Check workers and speeds.' )
218
+
219
+ app_logger .info ('Starting send webhook task for branch %s. qlength=%s.' , branch_id , qlength )
220
+ lf_span = 'Sending webhooks for branch: {branch_id=}'
221
+ with logfire .span (lf_span , branch_id = branch_id ):
222
+ with Session (engine ) as db :
223
+ # Get all the endpoints for the branch
224
+ endpoints_query = select (WebhookEndpoint ).where (
225
+ WebhookEndpoint .branch_id == branch_id , WebhookEndpoint .active
226
+ )
227
+ endpoints = db .exec (endpoints_query ).all ()
175
228
176
- webhook_logs , total_success , total_failed = asyncio .run (
177
- _async_post_webhooks (endpoints , url_extension , payload )
178
- )
179
- for webhook_log in webhook_logs :
180
- db .add (webhook_log )
181
- db .commit ()
182
- app_logger .info (
183
- '%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s' ,
184
- total_success + total_failed ,
185
- branch_id ,
186
- total_success ,
187
- total_failed ,
188
- )
229
+ webhook_logs , total_success , total_failed = asyncio .run (
230
+ _async_post_webhooks (endpoints , url_extension , payload )
231
+ )
232
+ for webhook_log in webhook_logs :
233
+ db .add (webhook_log )
234
+ db .commit ()
235
+ app_logger .info (
236
+ '%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s' ,
237
+ total_success + total_failed ,
238
+ branch_id ,
239
+ total_success ,
240
+ total_failed ,
241
+ )
242
+ except json .JSONDecodeError as e :
243
+ app_logger .error ('Failed to decode payload: %s' , payload )
244
+ raise e
245
+ except Exception as e :
246
+ app_logger .error ('Error sending webhooks: %s' , str (e ))
247
+ raise e
189
248
190
249
191
250
DELETE_JOBS_KEY = 'delete_old_logs_job'
0 commit comments