Skip to content

Commit afe3b5a

Browse files
committed
AI recons its done good
1 parent 5250255 commit afe3b5a

File tree

4 files changed

+572
-177
lines changed

4 files changed

+572
-177
lines changed

chronos/worker.py

Lines changed: 111 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,50 @@ async def webhook_request(client: AsyncClient, url: str, endpoint_id: int, *, we
4545
'Content-Type': 'application/json',
4646
'webhook-signature': webhook_sig,
4747
}
48+
request_data = RequestData(
49+
endpoint_id=endpoint_id, request_headers=json.dumps(headers), request_body=json.dumps(data)
50+
)
4851
with logfire.span('{method=} {url!r}', url=url, method='POST'):
49-
r = None
5052
try:
53+
if not data or (isinstance(data, dict) and not data.get('events')):
54+
request_data.status_code = 999
55+
request_data.response_headers = json.dumps({})
56+
request_data.response_body = json.dumps({'error': 'Empty payload'})
57+
request_data.successful_response = False
58+
return request_data
59+
60+
if isinstance(data, dict) and data.get('events') and not data['events'][0].get('data'):
61+
request_data.status_code = 999
62+
request_data.response_headers = json.dumps({})
63+
request_data.response_body = json.dumps({'error': 'Empty event data'})
64+
request_data.successful_response = False
65+
return request_data
66+
5167
r = await client.post(url=url, json=data, headers=headers, timeout=8)
68+
request_data.response_headers = json.dumps(dict(r.headers))
69+
try:
70+
response_body = r.json()
71+
except json.JSONDecodeError:
72+
response_body = r.content.decode()
73+
request_data.response_body = json.dumps(response_body)
74+
request_data.status_code = r.status_code
75+
request_data.successful_response = r.status_code in {200, 201, 202, 204}
76+
return request_data
5277
except httpx.TimeoutException as terr:
5378
app_logger.info('Timeout error sending webhook to %s: %s', url, terr)
79+
request_data.status_code = 999
80+
request_data.response_headers = json.dumps({}) # Empty headers for timeout
81+
request_data.response_body = json.dumps({'error': 'Timeout error'})
82+
request_data.successful_response = False
83+
raise terr
5484
except httpx.HTTPError as httperr:
5585
app_logger.info('HTTP error sending webhook to %s: %s', url, httperr)
56-
request_data = RequestData(
57-
endpoint_id=endpoint_id, request_headers=json.dumps(headers), request_body=json.dumps(data)
58-
)
59-
if r is not None:
60-
request_data.response_headers = json.dumps(dict(r.headers))
61-
request_data.response_body = json.dumps(r.content.decode())
62-
request_data.status_code = r.status_code
63-
request_data.successful_response = True
64-
return request_data
86+
response = getattr(httperr, 'response', httpx.Response(status_code=500))
87+
request_data.status_code = response.status_code
88+
request_data.response_headers = json.dumps(dict(response.headers) if response.headers else {})
89+
request_data.response_body = json.dumps({'error': str(httperr)})
90+
request_data.successful_response = False
91+
raise httperr
6592

6693

6794
acceptable_url_schemes = ('http', 'https', 'ftp', 'ftps')
@@ -107,21 +134,45 @@ async def _async_post_webhooks(endpoints, url_extension, payload):
107134
if url_extension:
108135
url += f'/{url_extension}'
109136
# Send the Webhook to the endpoint
137+
try:
138+
loaded_payload = json.loads(payload)
139+
task = asyncio.ensure_future(
140+
webhook_request(client, url, endpoint.id, webhook_sig=sig_hex, data=loaded_payload)
141+
)
142+
tasks.append(task)
143+
except json.JSONDecodeError:
144+
app_logger.error('Failed to decode payload for endpoint %s', endpoint.id)
145+
continue
110146

111-
loaded_payload = json.loads(payload)
112-
task = asyncio.ensure_future(
113-
webhook_request(client, url, endpoint.id, webhook_sig=sig_hex, data=loaded_payload)
114-
)
115-
tasks.append(task)
116147
webhook_responses = await asyncio.gather(*tasks, return_exceptions=True)
117148
for response in webhook_responses:
149+
if isinstance(response, Exception):
150+
app_logger.info('Error from endpoint %s: %s', endpoint.id, response)
151+
webhook_logs.append(
152+
WebhookLog(
153+
webhook_endpoint_id=endpoint.id,
154+
request_headers=json.dumps({}),
155+
request_body=payload,
156+
response_headers=json.dumps({}),
157+
response_body=json.dumps({'error': str(response)}),
158+
status='Unexpected response',
159+
status_code=999,
160+
)
161+
)
162+
total_failed += 1
163+
continue
164+
118165
if not isinstance(response, RequestData):
119166
app_logger.info('No response from endpoint %s: %s. %s', endpoint.id, endpoint.webhook_url, response)
120167
continue
121-
elif not response.successful_response:
122-
app_logger.info('No response from endpoint %s: %s', endpoint.id, endpoint.webhook_url)
123168

124-
if response.status_code in {200, 201, 202, 204}:
169+
try:
170+
response_body = json.loads(response.response_body)
171+
response_status = response_body.get('status', '').lower() # Default to empty string if not specified
172+
except (json.JSONDecodeError, AttributeError):
173+
response_status = 'success' # Default to success on parse error
174+
175+
if response.status_code in {200, 201, 202, 204} and response_status == 'success':
125176
status = 'Success'
126177
total_success += 1
127178
else:
@@ -151,41 +202,49 @@ def task_send_webhooks(
151202
"""
152203
Send the webhook to the relevant endpoints
153204
"""
154-
loaded_payload = json.loads(payload)
155-
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
156-
qlength = get_qlength()
157-
158-
if loaded_payload.get('events'):
159-
branch_id = loaded_payload['events'][0]['branch']
160-
else:
161-
branch_id = loaded_payload['branch_id']
162-
163-
if qlength > 100:
164-
app_logger.error('Queue is too long. Check workers and speeds.')
165-
166-
app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
167-
lf_span = 'Sending webhooks for branch: {branch_id=}'
168-
with logfire.span(lf_span, branch_id=branch_id):
169-
with Session(engine) as db:
170-
# Get all the endpoints for the branch
171-
endpoints_query = select(WebhookEndpoint).where(
172-
WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active
173-
)
174-
endpoints = db.exec(endpoints_query).all()
205+
try:
206+
loaded_payload = json.loads(payload)
207+
if 'request_time' in loaded_payload:
208+
loaded_payload['_request_time'] = loaded_payload.pop('request_time')
209+
qlength = get_qlength()
210+
211+
if loaded_payload.get('events'):
212+
branch_id = loaded_payload['events'][0]['branch']
213+
else:
214+
branch_id = loaded_payload['branch_id']
215+
216+
if qlength > 100:
217+
app_logger.error('Queue is too long. Check workers and speeds.')
218+
219+
app_logger.info('Starting send webhook task for branch %s. qlength=%s.', branch_id, qlength)
220+
lf_span = 'Sending webhooks for branch: {branch_id=}'
221+
with logfire.span(lf_span, branch_id=branch_id):
222+
with Session(engine) as db:
223+
# Get all the endpoints for the branch
224+
endpoints_query = select(WebhookEndpoint).where(
225+
WebhookEndpoint.branch_id == branch_id, WebhookEndpoint.active
226+
)
227+
endpoints = db.exec(endpoints_query).all()
175228

176-
webhook_logs, total_success, total_failed = asyncio.run(
177-
_async_post_webhooks(endpoints, url_extension, payload)
178-
)
179-
for webhook_log in webhook_logs:
180-
db.add(webhook_log)
181-
db.commit()
182-
app_logger.info(
183-
'%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s',
184-
total_success + total_failed,
185-
branch_id,
186-
total_success,
187-
total_failed,
188-
)
229+
webhook_logs, total_success, total_failed = asyncio.run(
230+
_async_post_webhooks(endpoints, url_extension, payload)
231+
)
232+
for webhook_log in webhook_logs:
233+
db.add(webhook_log)
234+
db.commit()
235+
app_logger.info(
236+
'%s Webhooks sent for branch %s. Total Sent: %s. Total failed: %s',
237+
total_success + total_failed,
238+
branch_id,
239+
total_success,
240+
total_failed,
241+
)
242+
except json.JSONDecodeError as e:
243+
app_logger.error('Failed to decode payload: %s', payload)
244+
raise e
245+
except Exception as e:
246+
app_logger.error('Error sending webhooks: %s', str(e))
247+
raise e
189248

190249

191250
DELETE_JOBS_KEY = 'delete_old_logs_job'

conftest.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ def create_tables(engine):
2323
SQLModel.metadata.drop_all(engine)
2424

2525

26+
27+
@pytest.fixture(scope='session')
28+
def celery_includes():
29+
return ['chronos.worker']
30+
2631
@pytest.fixture
2732
def session(engine, create_tables):
2833
connection = engine.connect()
@@ -48,7 +53,12 @@ def get_session_override():
4853

4954
@pytest.fixture(scope='session')
5055
def celery_config():
51-
return {'broker_url': 'redis://', 'result_backend': 'redis://'}
56+
return {
57+
'broker_url': 'redis://',
58+
'result_backend': 'redis://',
59+
'task_always_eager': True,
60+
'task_eager_propagates': True,
61+
}
5262

5363

5464
@pytest.fixture(scope='session')

tests/test_helpers.py

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import hashlib
2+
import hmac
13
import json
24

5+
import httpx
36
from httpx import Response
4-
from requests import Request
57

68
from chronos.main import app
79
from chronos.sql_models import WebhookEndpoint, WebhookLog
@@ -142,22 +144,28 @@ def create_webhook_log_from_dft_data(**kwargs) -> WebhookLog:
142144

143145

144146
def get_successful_response(payload, headers, **kwargs) -> Response:
145-
response_dict = {'status_code': 200, 'message': 'success'}
147+
response_dict = {'status': 'success', 'message': 'success'}
146148
for k, v in kwargs.items():
147149
response_dict[k] = v
148-
request = Request()
149-
request.headers = headers
150-
request.body = json.dumps(payload).encode()
151-
response = Response(status_code=200, request=request, content=json.dumps(response_dict).encode())
152-
return response
150+
headers = headers.copy()
151+
headers['webhook-signature'] = hmac.new(b'test_key', json.dumps(payload).encode(), hashlib.sha256).hexdigest()
152+
return Response(
153+
status_code=200,
154+
json=response_dict,
155+
request=httpx.Request('POST', 'https://example.com', json=payload, headers=headers),
156+
headers=headers,
157+
)
153158

154159

155160
def get_failed_response(payload, headers, **kwargs) -> Response:
156-
response_dict = {'status_code': 409, 'message': 'Bad request'}
161+
response_dict = {'status': 'error', 'message': 'Bad request'}
157162
for k, v in kwargs.items():
158163
response_dict[k] = v
159-
request = Request()
160-
request.headers = headers
161-
request.body = json.dumps(payload).encode()
162-
response = Response(status_code=409, request=request, content=json.dumps(response_dict).encode())
163-
return response
164+
headers = headers.copy()
165+
headers['webhook-signature'] = hmac.new(b'test_key', json.dumps(payload).encode(), hashlib.sha256).hexdigest()
166+
return Response(
167+
status_code=409,
168+
json=response_dict,
169+
request=httpx.Request('POST', 'https://example.com', json=payload, headers=headers),
170+
headers=headers,
171+
)

0 commit comments

Comments
 (0)