Skip to content

Commit 98700d1

Browse files
3coinsafshin
andauthored
Rest API endpoints for Events (#851)
* Added rest api handler for events * Added unit tests, timestamp support * Updated the response status code * jupyter.com => jupyter.org * Added e2e test for events * Re-organized the test data * Added microseconds and utc offset to timestamp * Updated payload validation * Removed redundant prop for event_bus * Fixed lint failures * Refactored model validation, timestamp retieval, and tests * Fixed ci build errors Co-authored-by: Afshin T. Darian <[email protected]>
1 parent 292162e commit 98700d1

File tree

2 files changed

+193
-9
lines changed

2 files changed

+193
-9
lines changed

jupyter_server/services/events/handlers.py

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,18 @@
33
.. versionadded:: 2.0
44
"""
55
import logging
6+
from datetime import datetime
7+
from typing import Any, Dict, Optional
68

79
from jupyter_telemetry.eventlog import _skip_message
810
from pythonjsonlogger import jsonlogger
911
from tornado import web, websocket
1012

13+
from jupyter_server.auth import authorized
1114
from jupyter_server.base.handlers import JupyterHandler
1215

16+
from ...base.handlers import APIHandler
17+
1318
AUTH_RESOURCE = "events"
1419

1520

@@ -53,11 +58,6 @@ async def get(self, *args, **kwargs):
5358
res = super().get(*args, **kwargs)
5459
await res
5560

56-
@property
57-
def event_bus(self):
58-
"""Jupyter Server's event bus that emits structured event data."""
59-
return self.settings["event_bus"]
60-
6161
def open(self):
6262
"""Routes events that are emitted by Jupyter Server's
6363
EventBus to a WebSocket client in the browser.
@@ -75,6 +75,59 @@ def on_close(self):
7575
self.event_bus.handlers.remove(self.logging_handler)
7676

7777

78+
def validate_model(data: Dict[str, Any]) -> None:
79+
"""Validates for required fields in the JSON request body"""
80+
required_keys = {"schema_name", "version", "event"}
81+
for key in required_keys:
82+
if key not in data:
83+
raise web.HTTPError(400, f"Missing `{key}` in the JSON request body.")
84+
85+
86+
def get_timestamp(data: Dict[str, Any]) -> Optional[datetime]:
87+
"""Parses timestamp from the JSON request body"""
88+
try:
89+
if "timestamp" in data:
90+
timestamp = datetime.strptime(data["timestamp"], "%Y-%m-%dT%H:%M:%S%zZ")
91+
else:
92+
timestamp = None
93+
except Exception:
94+
raise web.HTTPError(
95+
400,
96+
"""Failed to parse timestamp from JSON request body,
97+
an ISO format datetime string with UTC offset is expected,
98+
for example, 2022-05-26T13:50:00+05:00Z""",
99+
)
100+
101+
return timestamp
102+
103+
104+
class EventHandler(APIHandler):
105+
"""REST api handler for events"""
106+
107+
auth_resource = AUTH_RESOURCE
108+
109+
@web.authenticated
110+
@authorized
111+
async def post(self):
112+
payload = self.get_json_body()
113+
if payload is None:
114+
raise web.HTTPError(400, "No JSON data provided")
115+
116+
try:
117+
validate_model(payload)
118+
self.event_bus.record_event(
119+
schema_name=payload.get("schema_name"),
120+
version=payload.get("version"),
121+
event=payload.get("event"),
122+
timestamp_override=get_timestamp(payload),
123+
)
124+
self.set_status(204)
125+
self.finish()
126+
except Exception as e:
127+
raise web.HTTPError(500, str(e)) from e
128+
129+
78130
default_handlers = [
131+
(r"/api/events", EventHandler),
79132
(r"/api/events/subscribe", SubscribeWebsocket),
80133
]

tests/services/events/test_api.py

Lines changed: 135 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,41 @@
1+
import io
12
import json
3+
import logging
24
import pathlib
35

46
import pytest
7+
import tornado
8+
from jupyter_telemetry.eventlog import _skip_message
9+
from pythonjsonlogger import jsonlogger
10+
11+
from tests.utils import expected_http_error
512

613

714
@pytest.fixture
8-
def event_bus(jp_serverapp):
15+
def eventbus_sink(jp_serverapp):
916
event_bus = jp_serverapp.event_bus
1017
# Register the event schema defined in this directory.
1118
schema_file = pathlib.Path(__file__).parent / "mock_event.yaml"
1219
event_bus.register_schema_file(schema_file)
13-
#
1420
event_bus.allowed_schemas = ["event.mock.jupyter.org/message"]
21+
22+
sink = io.StringIO()
23+
formatter = jsonlogger.JsonFormatter(json_serializer=_skip_message)
24+
handler = logging.StreamHandler(sink)
25+
handler.setFormatter(formatter)
26+
event_bus.handlers = [handler]
27+
event_bus.log.addHandler(handler)
28+
29+
return event_bus, sink
30+
31+
32+
@pytest.fixture
33+
def event_bus(eventbus_sink):
34+
event_bus, sink = eventbus_sink
1535
return event_bus
1636

1737

1838
async def test_subscribe_websocket(jp_ws_fetch, event_bus):
19-
# Open a websocket connection.
2039
ws = await jp_ws_fetch("/api/events/subscribe")
2140

2241
event_bus.record_event(
@@ -26,7 +45,119 @@ async def test_subscribe_websocket(jp_ws_fetch, event_bus):
2645
)
2746
message = await ws.read_message()
2847
event_data = json.loads(message)
29-
# Close websocket
3048
ws.close()
3149

3250
assert event_data.get("event_message") == "Hello, world!"
51+
52+
53+
payload_1 = """\
54+
{
55+
"schema_name": "event.mock.jupyter.org/message",
56+
"version": 1,
57+
"event": {
58+
"event_message": "Hello, world!"
59+
},
60+
"timestamp": "2022-05-26T12:50:00+06:00Z"
61+
}
62+
"""
63+
64+
payload_2 = """\
65+
{
66+
"schema_name": "event.mock.jupyter.org/message",
67+
"version": 1,
68+
"event": {
69+
"event_message": "Hello, world!"
70+
}
71+
}
72+
"""
73+
74+
75+
@pytest.mark.parametrize("payload", [payload_1, payload_2])
76+
async def test_post_event(jp_fetch, eventbus_sink, payload):
77+
event_bus, sink = eventbus_sink
78+
79+
r = await jp_fetch("api", "events", method="POST", body=payload)
80+
assert r.code == 204
81+
82+
output = sink.getvalue()
83+
assert output
84+
input = json.loads(payload)
85+
data = json.loads(output)
86+
assert input["event"]["event_message"] == data["event_message"]
87+
assert data["__timestamp__"]
88+
if "timestamp" in input:
89+
assert input["timestamp"] == data["__timestamp__"]
90+
91+
92+
payload_3 = """\
93+
{
94+
"schema_name": "event.mock.jupyter.org/message",
95+
"event": {
96+
"event_message": "Hello, world!"
97+
}
98+
}
99+
"""
100+
101+
payload_4 = """\
102+
{
103+
"version": 1,
104+
"event": {
105+
"event_message": "Hello, world!"
106+
}
107+
}
108+
"""
109+
110+
payload_5 = """\
111+
{
112+
"schema_name": "event.mock.jupyter.org/message",
113+
"version": 1
114+
}
115+
"""
116+
117+
payload_6 = """\
118+
{
119+
"schema_name": "event.mock.jupyter.org/message",
120+
"version": 1,
121+
"event": {
122+
"event_message": "Hello, world!"
123+
},
124+
"timestamp": "2022-05-26 12:50:00"
125+
}
126+
"""
127+
128+
129+
@pytest.mark.parametrize("payload", [payload_3, payload_4, payload_5, payload_6])
130+
async def test_post_event_400(jp_fetch, event_bus, payload):
131+
with pytest.raises(tornado.httpclient.HTTPClientError) as e:
132+
await jp_fetch("api", "events", method="POST", body=payload)
133+
134+
expected_http_error(e, 400)
135+
136+
137+
payload_7 = """\
138+
{
139+
"schema_name": "event.mock.jupyter.org/message",
140+
"version": 1,
141+
"event": {
142+
"message": "Hello, world!"
143+
}
144+
}
145+
"""
146+
147+
payload_8 = """\
148+
{
149+
"schema_name": "event.mock.jupyter.org/message",
150+
"version": 2,
151+
"event": {
152+
"message": "Hello, world!"
153+
}
154+
}
155+
"""
156+
157+
158+
@pytest.mark.parametrize("payload", [payload_7, payload_8])
159+
async def test_post_event_500(jp_fetch, event_bus, payload):
160+
with pytest.raises(tornado.httpclient.HTTPClientError) as e:
161+
await jp_fetch("api", "events", method="POST", body=payload)
162+
163+
expected_http_error(e, 500)

0 commit comments

Comments
 (0)