Skip to content
193 changes: 104 additions & 89 deletions jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,24 @@ def __init__(
self._logger = logger
self._save_delay = save_delay

self._update_lock = asyncio.Lock()
self._initialization_lock = asyncio.Lock()
self._cleaner: asyncio.Task | None = None
self._saving_document: asyncio.Task | None = None
self._messages: dict[str, asyncio.Lock] = {}

# the task currently saving to disk via FileLoader, if any.
self._save_task: asyncio.Task | None = None

# flag that indicates whether a previous call to
# `self._maybe_save_document()` is waiting to save.
#
# if `self._maybe_save_document()` is called while this flag is `True`,
# then the call does nothing, as a previous task will save the Ydoc
# within `self._save_delay` seconds.
self._waiting_to_save = False

# flag that indicates whether a previous call to
# `self._maybe_save_document()` should call itself again after
# `self._save_task` finishes. this is set to `True` when a document
# update occurs while `self._save_task` is running.
self._should_resave = False

# Listen for document changes
self._document.observe(self._on_document_change)
Expand Down Expand Up @@ -78,7 +91,8 @@ async def initialize(self) -> None:
"""
Initializes the room.

This method is thread safe so only one client can initialize the room.
This method is not coroutine-safe, i.e. consumers must await this method
before calling any other methods.

To initialize the room, we check if the content was already in the store
as a Y updates and if it is up to date with the content on disk. In this
Expand All @@ -89,64 +103,59 @@ async def initialize(self) -> None:
It is important to set the ready property in the parent class (`self.ready = True`),
this setter will subscribe for updates on the shared document.
"""
async with self._initialization_lock:
if self.ready: # type: ignore[has-type]
return
if self.ready: # type: ignore[has-type]
return

self.log.info("Initializing room %s", self._room_id)
self.log.info("Initializing room %s", self._room_id)
model = await self._file.load_content(self._file_format, self._file_type)

model = await self._file.load_content(self._file_format, self._file_type)
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass

if not read_from_source and self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
)
self.log.info(
"Content in file %s is out-of-sync with the ystore %s",
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

# if YStore updates and source file are out-of-sync, resync updates with source
if read_from_source:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info(
"Content in room %s loaded from file %s", self._room_id, self._file.path
)
self._document.source = model["content"]

if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)

async with self._update_lock:
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
LogLevel.INFO,
"load",
"Content loaded from the store {}".format(
self.ystore.__class__.__qualname__
),
)
self.log.info(
"Content in room %s loaded from the ystore %s",
self._room_id,
self.ystore.__class__.__name__,
)
read_from_source = False
except YDocNotFound:
# YDoc not found in the YStore, create the document from the source file (no change history)
pass

if not read_from_source:
# if YStore updates and source file are out-of-sync, resync updates with source
if self._document.source != model["content"]:
# TODO: Delete document from the store.
self._emit(
LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore."
)
self.log.info(
"Content in file %s is out-of-sync with the ystore %s",
self._file.path,
self.ystore.__class__.__name__,
)
read_from_source = True

if read_from_source:
self._emit(LogLevel.INFO, "load", "Content loaded from disk.")
self.log.info(
"Content in room %s loaded from file %s", self._room_id, self._file.path
)
self._document.source = model["content"]

if self.ystore:
await self.ystore.encode_state_as_update(self.ydoc)

self._document.dirty = False
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
data = {"level": level.value, "room": self._room_id, "path": self._file.path}
Expand All @@ -165,8 +174,8 @@ def stop(self) -> None:
"""
super().stop()
# TODO: Should we cancel or wait ?
if self._saving_document:
self._saving_document.cancel()
if self._save_task:
self._save_task.cancel()

self._document.unobserve()
self._file.unobserve(self.room_id)
Expand All @@ -193,9 +202,7 @@ async def _on_outofband_change(self) -> None:
self._emit(LogLevel.ERROR, None, msg)
return

async with self._update_lock:
self._document.source = model["content"]
self._document.dirty = False
self._document.source = model["content"]

def _on_document_change(self, target: str, event: Any) -> None:
"""
Expand All @@ -212,49 +219,60 @@ def _on_document_change(self, target: str, event: Any) -> None:
document. This tasks are debounced (60 seconds by default) so we
need to cancel previous tasks before creating a new one.
"""
if self._update_lock.locked():
return

self._saving_document = asyncio.create_task(
self._maybe_save_document(self._saving_document)
)
asyncio.create_task(self._maybe_save_document())

async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> None:
async def _maybe_save_document(self) -> None:
"""
Saves the content of the document to disk.

### Note:
There is a save delay to debounce the save since we could receive a high
There is a save delay to throttle the save since we could receive a high
amount of changes in a short period of time. This way we can cancel the
previous save.
"""
if self._save_delay is None:
# TODO: fix this; if _save_delay is unset, then this never writes to disk
return

if saving_document is not None and not saving_document.done():
# the document is being saved, cancel that
saving_document.cancel()
if self._waiting_to_save:
# if a previously spawned `self._maybe_save_document()` task is
# waiting to save, then that task will save the Ydoc within
# `self._save_delay` seconds. therefore we can return early.
return

if self._save_task and not self._save_task.done():
# if we are currently saving, then set the `_should_resave`
# flag. this indicates to the currently running `self._save_task`
# that it should re-call this method after it completes.
self._should_resave = True
return

# save after `self._save_delay` seconds of inactivity
self._waiting_to_save = True
await asyncio.sleep(self._save_delay)
self._waiting_to_save = False

# all async code (i.e. await statements) must be part of this try/except block
# because this coroutine is run in a cancellable task and cancellation is handled here

try:
# save after X seconds of inactivity
await asyncio.sleep(self._save_delay)

# do not write to `self._document` in this `try` block, as that will
# trigger the observer and result in a save loop.
self.log.info("Saving the content from room %s", self._room_id)
await self._file.maybe_save_content(
loop = asyncio.get_running_loop()
self._save_task = loop.create_task(self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
"content": self._document.source,
}
)
async with self._update_lock:
self._document.dirty = False

))
await self._save_task
self._emit(LogLevel.INFO, "save", "Content saved.")

if self._should_resave:
self._should_resave = False
asyncio.create_task(self._maybe_save_document())

except asyncio.CancelledError:
return

Expand All @@ -268,10 +286,7 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No
self._emit(LogLevel.ERROR, None, msg)
return None

async with self._update_lock:
self._document.source = model["content"]
self._document.dirty = False

self._document.source = model["content"]
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes while saving.")

except Exception as e:
Expand Down