Skip to content

Commit 5c49253

Browse files
rajmusukuRaj MusukuZsailerkevin-bates
authored
Emit events from the kernels service and gateway client (#1252)
* Emit events from the kernels service and gateway client * clean-up after code review * Make GatewayKernelManager a compatible subclass to ServerKernelManager * Add events to gateway lifecycle test * Access event_logger trait from ServerApp --------- Co-authored-by: Raj Musuku <[email protected]> Co-authored-by: Zach Sailer <[email protected]> Co-authored-by: Kevin Bates <[email protected]>
1 parent 8144a0d commit 5c49253

File tree

9 files changed

+459
-13
lines changed

9 files changed

+459
-13
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
"$id": https://events.jupyter.org/jupyter_server/gateway_client/v1
2+
version: 1
3+
title: Gateway Client activities.
4+
personal-data: true
5+
description: |
6+
Record events of a gateway client.
7+
type: object
8+
required:
9+
- status
10+
- msg
11+
properties:
12+
status:
13+
enum:
14+
- error
15+
- success
16+
description: |
17+
Status received by Gateway client based on the rest api operation to gateway kernel.
18+
19+
This is a required field.
20+
21+
Possible values:
22+
23+
1. error
24+
Error response from a rest api operation to gateway kernel.
25+
26+
2. success
27+
Success response from a rest api operation to gateway kernel.
28+
status_code:
29+
type: number
30+
description: |
31+
Http response codes from a rest api operation to gateway kernel.
32+
Examples: 200, 400, 502, 503, 599 etc.
33+
msg:
34+
type: string
35+
description: |
36+
Description of the event being emitted.
37+
gateway_url:
38+
type: string
39+
description: |
40+
Gateway url where the remote server exist.
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"$id": https://events.jupyter.org/jupyter_server/kernel_actions/v1
2+
version: 1
3+
title: Kernel Manager activities
4+
personal-data: true
5+
description: |
6+
Record events of a kernel manager.
7+
type: object
8+
required:
9+
- action
10+
- kernel_id
11+
- msg
12+
properties:
13+
action:
14+
enum:
15+
- start
16+
- interrupt
17+
- shutdown
18+
- restart
19+
description: |
20+
Action performed by the Kernel Manager.
21+
22+
This is a required field.
23+
24+
Possible values:
25+
26+
1. start
27+
A kernel has been started with the given kernel id.
28+
29+
2. interrupt
30+
A kernel has been interrupted for the given kernel id.
31+
32+
3. shutdown
33+
A kernel has been shut down for the given kernel id.
34+
35+
4. restart
36+
A kernel has been restarted for the given kernel id.
37+
kernel_id:
38+
type: string
39+
description: |
40+
Kernel id.
41+
42+
This is a required field.
43+
kernel_name:
44+
type: string
45+
description: |
46+
Name of the kernel.
47+
status:
48+
enum:
49+
- error
50+
- success
51+
description: |
52+
Status received from a rest api operation to kernel server.
53+
54+
This is a required field.
55+
56+
Possible values:
57+
58+
1. error
59+
Error response from a rest api operation to kernel server.
60+
61+
2. success
62+
Success response from a rest api operation to kernel server.
63+
status_code:
64+
type: number
65+
description: |
66+
Http response codes from a rest api operation to kernel server.
67+
Examples: 200, 400, 502, 503, 599 etc
68+
msg:
69+
type: string
70+
description: |
71+
Description of the event specified in action.

jupyter_server/gateway/gateway_client.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,31 @@
1212
from http.cookies import SimpleCookie
1313
from socket import gaierror
1414

15+
from jupyter_events import EventLogger
1516
from tornado import web
1617
from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPResponse
17-
from traitlets import Bool, Float, Int, TraitError, Type, Unicode, default, observe, validate
18+
from traitlets import (
19+
Bool,
20+
Float,
21+
Instance,
22+
Int,
23+
TraitError,
24+
Type,
25+
Unicode,
26+
default,
27+
observe,
28+
validate,
29+
)
1830
from traitlets.config import LoggingConfigurable, SingletonConfigurable
1931

32+
from jupyter_server import DEFAULT_EVENTS_SCHEMA_PATH, JUPYTER_SERVER_EVENTS_URI
33+
34+
ERROR_STATUS = "error"
35+
SUCCESS_STATUS = "success"
36+
STATUS_KEY = "status"
37+
STATUS_CODE_KEY = "status_code"
38+
MESSAGE_KEY = "msg"
39+
2040
if ty.TYPE_CHECKING:
2141
from http.cookies import Morsel
2242

@@ -71,10 +91,30 @@ def get_token(
7191
class GatewayClient(SingletonConfigurable):
7292
"""This class manages the configuration. It's its own singleton class so
7393
that we can share these values across all objects. It also contains some
74-
helper methods to build request arguments out of the various config
7594
options.
95+
helper methods to build request arguments out of the various config
7696
"""
7797

98+
event_schema_id = JUPYTER_SERVER_EVENTS_URI + "/gateway_client/v1"
99+
event_logger = Instance(EventLogger).tag(config=True)
100+
101+
@default("event_logger")
102+
def _default_event_logger(self):
103+
if self.parent and hasattr(self.parent, "event_logger"):
104+
# Event logger is attached from serverapp.
105+
return self.parent.event_logger
106+
else:
107+
# If parent does not have an event logger, create one.
108+
logger = EventLogger()
109+
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "gateway_client" / "v1.yaml"
110+
logger.register_event_schema(schema_path)
111+
self.log.info("Event is registered in GatewayClient.")
112+
return logger
113+
114+
def emit(self, data):
115+
"""Emit event using the core event schema from Jupyter Server's Gateway Client."""
116+
self.event_logger.emit(schema_id=self.event_schema_id, data=data)
117+
78118
url = Unicode(
79119
default_value=None,
80120
allow_none=True,
@@ -97,7 +137,9 @@ def _url_validate(self, proposal):
97137
value = proposal["value"]
98138
# Ensure value, if present, starts with 'http'
99139
if value is not None and len(value) > 0 and not str(value).lower().startswith("http"):
100-
raise TraitError("GatewayClient url must start with 'http': '%r'" % value)
140+
message = "GatewayClient url must start with 'http': '%r'" % value
141+
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
142+
raise TraitError(message)
101143
return value
102144

103145
ws_url = Unicode(
@@ -123,7 +165,9 @@ def _ws_url_validate(self, proposal):
123165
value = proposal["value"]
124166
# Ensure value, if present, starts with 'ws'
125167
if value is not None and len(value) > 0 and not str(value).lower().startswith("ws"):
126-
raise TraitError("GatewayClient ws_url must start with 'ws': '%r'" % value)
168+
message = "GatewayClient ws_url must start with 'ws': '%r'" % value
169+
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
170+
raise TraitError(message)
127171
return value
128172

129173
kernels_endpoint_default_value = "/api/kernels"
@@ -728,6 +772,9 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
728772
# NOTE: We do this here since this handler is called during the server's startup and subsequent refreshes
729773
# of the tree view.
730774
except HTTPClientError as e:
775+
GatewayClient.instance().emit(
776+
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: e.code, MESSAGE_KEY: str(e.message)}
777+
)
731778
error_reason = f"Exception while attempting to connect to Gateway server url '{GatewayClient.instance().url}'"
732779
error_message = e.message
733780
if e.response:
@@ -744,12 +791,18 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
744791
"Ensure gateway url is valid and the Gateway instance is running.",
745792
) from e
746793
except ConnectionError as e:
794+
GatewayClient.instance().emit(
795+
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 503, MESSAGE_KEY: str(e)}
796+
)
747797
raise web.HTTPError(
748798
503,
749799
f"ConnectionError was received from Gateway server url '{GatewayClient.instance().url}'. "
750800
"Check to be sure the Gateway instance is running.",
751801
) from e
752802
except gaierror as e:
803+
GatewayClient.instance().emit(
804+
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 404, MESSAGE_KEY: str(e)}
805+
)
753806
raise web.HTTPError(
754807
404,
755808
f"The Gateway server specified in the gateway_url '{GatewayClient.instance().url}' doesn't "

jupyter_server/gateway/managers.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@
1515
from jupyter_client.asynchronous.client import AsyncKernelClient
1616
from jupyter_client.clientabc import KernelClientABC
1717
from jupyter_client.kernelspec import KernelSpecManager
18-
from jupyter_client.manager import AsyncKernelManager
1918
from jupyter_client.managerabc import KernelManagerABC
2019
from jupyter_core.utils import ensure_async
2120
from tornado import web
2221
from tornado.escape import json_decode, json_encode, url_escape, utf8
2322
from traitlets import DottedObjectName, Instance, Type, default
2423

25-
from .._tz import UTC
26-
from ..services.kernels.kernelmanager import AsyncMappingKernelManager
24+
from .._tz import UTC, utcnow
25+
from ..services.kernels.kernelmanager import (
26+
AsyncMappingKernelManager,
27+
ServerKernelManager,
28+
emit_kernel_action_event,
29+
)
2730
from ..services.sessions.sessionmanager import SessionManager
2831
from ..utils import url_path_join
2932
from .gateway_client import GatewayClient, gateway_request
@@ -79,7 +82,6 @@ async def start_kernel(self, *, kernel_id=None, path=None, **kwargs):
7982
await km.start_kernel(kernel_id=kernel_id, **kwargs)
8083
kernel_id = km.kernel_id
8184
self._kernels[kernel_id] = km
82-
8385
# Initialize culling if not already
8486
if not self._initialized_culler:
8587
self.initialize_culler()
@@ -366,7 +368,7 @@ async def kernel_culled(self, kernel_id: str) -> bool: # typing: ignore
366368
return km is None
367369

368370

369-
class GatewayKernelManager(AsyncKernelManager):
371+
class GatewayKernelManager(ServerKernelManager):
370372
"""Manages a single kernel remotely via a Gateway Server."""
371373

372374
kernel_id: Optional[str] = None # type:ignore[assignment]
@@ -385,7 +387,8 @@ def __init__(self, **kwargs):
385387
self.kernel_url: str
386388
self.kernel = self.kernel_id = None
387389
# simulate busy/activity markers:
388-
self.execution_state = self.last_activity = None
390+
self.execution_state = "starting"
391+
self.last_activity = utcnow()
389392

390393
@property
391394
def has_kernel(self):
@@ -458,6 +461,9 @@ async def refresh_model(self, model=None):
458461
# Kernel management
459462
# --------------------------------------------------------------------------
460463

464+
@emit_kernel_action_event(
465+
success_msg="Kernel {kernel_id} was started.",
466+
)
461467
async def start_kernel(self, **kwargs):
462468
"""Starts a kernel via HTTP in an asynchronous manner.
463469
@@ -509,6 +515,9 @@ async def start_kernel(self, **kwargs):
509515
self.kernel = await self.refresh_model()
510516
self.log.info(f"GatewayKernelManager using existing kernel: {self.kernel_id}")
511517

518+
@emit_kernel_action_event(
519+
success_msg="Kernel {kernel_id} was shutdown.",
520+
)
512521
async def shutdown_kernel(self, now=False, restart=False):
513522
"""Attempts to stop the kernel process cleanly via HTTP."""
514523

@@ -523,6 +532,9 @@ async def shutdown_kernel(self, now=False, restart=False):
523532
else:
524533
raise
525534

535+
@emit_kernel_action_event(
536+
success_msg="Kernel {kernel_id} was restarted.",
537+
)
526538
async def restart_kernel(self, **kw):
527539
"""Restarts a kernel via HTTP."""
528540
if self.has_kernel:
@@ -537,6 +549,9 @@ async def restart_kernel(self, **kw):
537549
)
538550
self.log.debug("Restart kernel response: %d %s", response.code, response.reason)
539551

552+
@emit_kernel_action_event(
553+
success_msg="Kernel {kernel_id} was interrupted.",
554+
)
540555
async def interrupt_kernel(self):
541556
"""Interrupts the kernel via an HTTP request."""
542557
if self.has_kernel:
@@ -556,8 +571,10 @@ async def is_alive(self):
556571
if self.has_kernel:
557572
# Go ahead and issue a request to get the kernel
558573
self.kernel = await self.refresh_model()
574+
self.log.debug(f"The kernel: {self.kernel} is alive.")
559575
return True
560576
else: # we don't have a kernel
577+
self.log.debug(f"The kernel: {self.kernel} no longer exists.")
561578
return False
562579

563580
def cleanup_resources(self, restart=False):

jupyter_server/serverapp.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,6 +1964,8 @@ def init_event_logger(self):
19641964
# events URI, `JUPYTER_SERVER_EVENTS_URI`.
19651965
schema_ids = [
19661966
"https://events.jupyter.org/jupyter_server/contents_service/v1",
1967+
"https://events.jupyter.org/jupyter_server/gateway_client/v1",
1968+
"https://events.jupyter.org/jupyter_server/kernel_actions/v1",
19671969
]
19681970
for schema_id in schema_ids:
19691971
# Get the schema path from the schema ID.

0 commit comments

Comments
 (0)