Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions jupyter_rtc_core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .handlers import RouteHandler, FileIDIndexHandler
from .websockets import GlobalAwarenessWebsocket, YRoomWebsocket
from .rooms.yroom_manager import YRoomManager
from .outputs import OutputsManager, outputs_handlers

class RtcExtensionApp(ExtensionApp):
name = "jupyter_rtc_core"
Expand All @@ -22,9 +23,10 @@ class RtcExtensionApp(ExtensionApp):
(r"api/collaboration/room/(.*)", YRoomWebsocket),
# # handler that just adds compatibility with Jupyter Collaboration's frontend
# (r"api/collaboration/session/(.*)", YRoomSessionHandler),
(r"api/fileid/index", FileIDIndexHandler)
(r"api/fileid/index", FileIDIndexHandler),
*outputs_handlers
]

yroom_manager_class = Type(
klass=YRoomManager,
help="""YRoom Manager Class.""",
Expand All @@ -35,6 +37,18 @@ class RtcExtensionApp(ExtensionApp):
def yroom_manager(self) -> YRoomManager | None:
return self.settings.get("yroom_manager", None)

outputs_manager_class = Type(
klass=OutputsManager,
help="Outputs manager class.",
default_value=OutputsManager
).tag(config=True)

outputs_manager = Instance(
klass=OutputsManager,
help="An instance of the OutputsManager",
allow_none=True
).tag(config=True)
Comment on lines +46 to +50
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be a trait? It seems like this can be defined as a property:

Suggested change
outputs_manager = Instance(
klass=OutputsManager,
help="An instance of the OutputsManager",
allow_none=True
).tag(config=True)
@property
def outputs_manager(self) -> OutputsManager:
return self.settings["outputs_manager"]

This has the added benefit of clearly distinguishing the two attributes:

  • outputs_manager_class: a configurable class trait
  • outputs_manager: a property on RtcExtensionApp, set by the extension itself

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same goes for yroom_manager above.


def initialize(self):
super().initialize()

Expand All @@ -56,8 +70,11 @@ def get_fileid_manager():
loop=loop,
log=log
)

# Initialize OutputsManager
self.outputs_manager = self.outputs_manager_class(config=self.config)
self.settings["outputs_manager"] = self.outputs_manager


def _link_jupyter_server_extension(self, server_app):
"""Setup custom config needed by this extension."""
c = Config()
Expand Down
55 changes: 33 additions & 22 deletions jupyter_rtc_core/kernels/kernel_client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
"""
A new Kernel client that is aware of ydocuments.
"""
import anyio
import asyncio
import json
import typing as t
from traitlets import Set
from traitlets import Instance
from traitlets import Any
from .utils import LRUCache

from traitlets import Set, Instance, Any, Type, default
from jupyter_client.asynchronous.client import AsyncKernelClient
import anyio

from .utils import LRUCache
from jupyter_rtc_core.rooms.yroom import YRoom
from jupyter_rtc_core.outputs import OutputProcessor


class DocumentAwareKernelClient(AsyncKernelClient):
Expand All @@ -35,6 +36,21 @@ class DocumentAwareKernelClient(AsyncKernelClient):
# status messages.
_yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set())

output_processor = Instance(
OutputProcessor,
allow_none=True
)

output_process_class = Type(
klass=OutputProcessor,
default_value=OutputProcessor
).tag(config=True)

@default("output_processor")
def _default_output_processor(self) -> OutputProcessor:
self.log.info("Creating output processor")
return OutputProcessor(parent=self, config=self.config)

async def start_listening(self):
"""Start listening to messages coming from the kernel.

Expand Down Expand Up @@ -77,7 +93,8 @@ def handle_incoming_message(self, channel_name: str, msg: list[bytes]):
# Cache the message ID and its socket name so that
# any response message can be mapped back to the
# source channel.
header = header = json.loads(msg[0])
self.output_processor.process_incoming_message(channel=channel_name, msg=msg)
header = json.loads(msg[0]) # TODO: use session.unpack
msg_id = header["msg_id"]
self.message_source_cache[msg_id] = channel_name
channel = getattr(self, f"{channel_name}_channel")
Expand Down Expand Up @@ -158,10 +175,9 @@ async def handle_outgoing_message(self, channel_name: str, msg: list[bytes]):
# Intercept messages that are IOPub focused.
if channel_name == "iopub":
message_returned = await self.handle_iopub_message(msg)
# TODO: If the message is not returned by the iopub handler, then
# If the message is not returned by the iopub handler, then
# return here and do not forward to listeners.
if not message_returned:
self.log.warn(f"If message is handled donot forward after adding output manager")
return

# Update the last activity.
Expand All @@ -181,29 +197,25 @@ async def handle_iopub_message(self, msg: list[bytes]) -> t.Optional[list[bytes]
Returns
-------
Returns the message if it should be forwarded to listeners. Otherwise,
returns `None` and keeps (i.e. intercepts) the message from going
returns `None` and prevents (i.e. intercepts) the message from going
to listeners.
"""
# NOTE: Here's where we will inject the kernel state
# into the awareness of a document.

try:
dmsg = self.session.deserialize(msg, content=False)
except Exception as e:
self.log.error(f"Error deserializing message: {e}")
raise

# NOTE: Inject display data into ydoc.
if dmsg["msg_type"] == "display_data":
# Forward to all yrooms.
for yroom in self._yrooms:
update_document_message = b""
# yroom.add_message(update_document_message)
if self.output_processor is not None and dmsg["msg_type"] in ("stream", "display_data", "execute_result", "error"):
dmsg = self.output_processor.process_outgoing_message(dmsg)

# TODO: returning message temporarily to not break UI
# If the message isn't handled above, return it and it will
# be forwarded to all listeners
return msg
# If process_outgoing_message returns None, return None so the message isn't
# sent to clients, otherwise return the original serialized message.
if dmsg is None:
return None
else:
return msg

def send_kernel_awareness(self, kernel_status: dict):
"""
Expand All @@ -218,7 +230,6 @@ def send_kernel_awareness(self, kernel_status: dict):
awareness.set_local_state_field("kernel", kernel_status)
self.log.debug(f"current state: {awareness.get_local_state()} room_id: {yroom.room_id}")


async def add_yroom(self, yroom: YRoom):
"""
Register a YRoom with this kernel client. YRooms will
Expand Down
66 changes: 46 additions & 20 deletions jupyter_rtc_core/outputs/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,10 @@

from traitlets.config import LoggingConfigurable
from traitlets import (
Any,
Bool,
Dict,
Instance,
List,
TraitError,
Type,
Unicode,
Int,
default,
validate,
default
)

from jupyter_core.paths import jupyter_runtime_dir
Expand All @@ -36,7 +29,6 @@ def _default_outputs_path(self):

def _ensure_path(self, file_id, cell_id):
nested_dir = self.outputs_path / file_id / cell_id
self.log.info(f"Creating directory: {nested_dir}")
nested_dir.mkdir(parents=True, exist_ok=True)

def _build_path(self, file_id, cell_id=None, output_index=None):
Expand All @@ -49,7 +41,6 @@ def _build_path(self, file_id, cell_id=None, output_index=None):

def get_output(self, file_id, cell_id, output_index):
"""Get an outputs by file_id, cell_id, and output_index."""
self.log.info(f"OutputsManager.get: {file_id} {cell_id} {output_index}")
path = self._build_path(file_id, cell_id, output_index)
if not os.path.isfile(path):
raise FileNotFoundError(f"The output file doesn't exist: {path}")
Expand All @@ -67,12 +58,15 @@ def get_stream(self, file_id, cell_id):
return output

def write(self, file_id, cell_id, output):
"""Write a new output for file_id and cell_id."""
self.log.info(f"OutputsManager.write: {file_id} {cell_id} {output}")
result = self.write_output(file_id, cell_id, output)
"""Write a new output for file_id and cell_id.

Returns a placeholder output (pycrdt.Map) or None if no placeholder
output should be written to the ydoc.
"""
placeholder = self.write_output(file_id, cell_id, output)
if output["output_type"] == "stream" and self.stream_limit is not None:
result = self.write_stream(file_id, cell_id, output)
return result
placeholder = self.write_stream(file_id, cell_id, output, placeholder)
return placeholder

def write_output(self, file_id, cell_id, output):
self._ensure_path(file_id, cell_id)
Expand All @@ -84,9 +78,10 @@ def write_output(self, file_id, cell_id, output):
with open(path, "w", encoding="utf-8") as f:
f.write(data)
url = f"/api/outputs/{file_id}/{cell_id}/{index}.output"
return Map({})
self.log.info(f"Wrote output: {url}")
return create_placeholder_output(output["output_type"], url)

def write_stream(self, file_id, cell_id, output) -> Map:
def write_stream(self, file_id, cell_id, output, placeholder) -> Map:
# How many stream outputs have been written for this cell previously
count = self._stream_count.get(cell_id, 0)

Expand All @@ -98,15 +93,16 @@ def write_stream(self, file_id, cell_id, output) -> Map:
with open(path, "a", encoding="utf-8") as f:
f.write(text)
url = f"/api/outputs/{file_id}/{cell_id}/stream"
self.log.info(f"Wrote stream: {url}")

# Increment the count
count = count + 1
self._stream_count[cell_id] = count

# Now create the placeholder output
if count < self.stream_limit:
# Return the original if we haven't reached the limit
placeholder = Map(output)
# Return the original placeholder if we haven't reached the limit
placeholder = placeholder
elif count == self.stream_limit:
# Return a link to the full stream output
placeholder = Map({
Expand All @@ -116,6 +112,7 @@ def write_stream(self, file_id, cell_id, output) -> Map:
}
})
elif count > self.stream_limit:
# Return None to indicate that no placeholder should be written to the ydoc
placeholder = None
return placeholder

Expand All @@ -130,4 +127,33 @@ def clear(self, file_id, cell_id=None):
except KeyError:
pass
path = self._build_path(file_id, cell_id)
shutil.rmtree(path)
try:
shutil.rmtree(path)
except FileNotFoundError:
pass


def create_placeholder_output(output_type: str, url: str):
metadata = dict(url=url)
if output_type == "stream":
output = Map({
"output_type": "stream",
"text": "",
"metadata": metadata
})
elif output_type == "display_data":
output = Map({
"output_type": "display_data",
"metadata": metadata
})
elif output_type == "execute_result":
output = Map({
"output_type": "execute_result",
"metadata": metadata
})
elif output_type == "error":
output = Map({
"output_type": "error",
"metadata": metadata
})
return output
Loading
Loading