From 0ec736cab4a5ea8060cc019bad6832f02d19c115 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Mon, 28 Nov 2022 18:47:48 +0000 Subject: [PATCH 1/2] squash doc history in separate task --- tests/test_ystore.py | 32 ++++++++++----------- ypy_websocket/ystore.py | 61 +++++++++++++++++++++++------------------ 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/tests/test_ystore.py b/tests/test_ystore.py index 901f39d..a2a5dc2 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -1,9 +1,7 @@ import asyncio import os import tempfile -import time from pathlib import Path -from unittest.mock import patch import aiosqlite import pytest @@ -31,7 +29,7 @@ class MyTempFileYStore(TempFileYStore): class MySQLiteYStore(SQLiteYStore): db_path = MY_SQLITE_YSTORE_DB_PATH - document_ttl = 1000 + document_ttl = 1 def __init__(self, *args, delete_db=False, **kwargs): if delete_db: @@ -61,29 +59,29 @@ async def test_ystore(YStore): assert i == len(data) +async def count_yupdates(db): + """Returns number of yupdates in a SQLite DB given a connection.""" + return (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] + + @pytest.mark.asyncio async def test_document_ttl_sqlite_ystore(test_ydoc): store_name = "my_store" ystore = MySQLiteYStore(store_name, delete_db=True) - now = time.time() for i in range(3): # assert that adding a record before document TTL doesn't delete document history - with patch("time.time") as mock_time: - mock_time.return_value = now - await ystore.write(test_ydoc.update()) - async with aiosqlite.connect(ystore.db_path) as db: - assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[ - 0 - ] == i + 1 - - # assert that adding a record after document TTL deletes previous document history - with patch("time.time") as mock_time: - mock_time.return_value = now + ystore.document_ttl + 1 await ystore.write(test_ydoc.update()) async with aiosqlite.connect(ystore.db_path) as db: - # two updates in DB: one squashed update and the new update - assert (await (await db.execute("SELECT count(*) FROM yupdates")).fetchone())[0] == 2 + assert (await count_yupdates(db)) == i + 1 + + await asyncio.sleep(ystore.document_ttl + 0.1) + + # assert that adding a record after document TTL squashes previous document history + await ystore.write(test_ydoc.update()) + async with aiosqlite.connect(ystore.db_path) as db: + # two updates in DB: one squashed update and the new update + assert (await count_yupdates(db)) == 2 @pytest.mark.asyncio diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py index 251d23b..2f4bf2c 100644 --- a/ypy_websocket/ystore.py +++ b/ypy_websocket/ystore.py @@ -177,6 +177,7 @@ def __init__(self, path: str, metadata_callback: Optional[Callable] = None, log= self.metadata_callback = metadata_callback self.log = log or logging.getLogger(__name__) self.db_initialized = asyncio.create_task(self.init_db()) + self._squash_task: Optional[asyncio.Task] = None async def init_db(self): create_db = False @@ -231,36 +232,44 @@ async def read(self) -> AsyncIterator[Tuple[bytes, bytes, float]]: # type: igno async def write(self, data: bytes) -> None: await self.db_initialized async with aiosqlite.connect(self.db_path) as db: - # first, determine time elapsed since last update - cursor = await db.execute( - "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", - (self.path,), + # write this update to the DB + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (self.path, data, metadata, time.time()), ) - row = await cursor.fetchone() - diff = (time.time() - row[0]) if row else 0 - - if self.document_ttl is not None and diff > self.document_ttl: - # squash updates - ydoc = Y.YDoc() - async with db.execute( - "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,) - ) as cursor: - async for update, in cursor: - Y.apply_update(ydoc, update) - # delete history - await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) - # insert squashed updates - squashed_update = Y.encode_state_as_update(ydoc) - metadata = await self.get_metadata() - await db.execute( - "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, squashed_update, metadata, time.time()), - ) + await db.commit() + # create task that squashes document history after document_ttl + self._create_squash_task() - # finally, write this update to the DB + async def _squash_coroutine(self): + await asyncio.sleep(self.document_ttl) + async with aiosqlite.connect(self.db_path) as db: + # squash updates + ydoc = Y.YDoc() + async with db.execute( + "SELECT yupdate FROM yupdates WHERE path = ?", (self.path,) + ) as cursor: + async for update, in cursor: + Y.apply_update(ydoc, update) + # delete history + await db.execute("DELETE FROM yupdates WHERE path = ?", (self.path,)) + # insert squashed updates + squashed_update = Y.encode_state_as_update(ydoc) metadata = await self.get_metadata() await db.execute( "INSERT INTO yupdates VALUES (?, ?, ?, ?)", - (self.path, data, metadata, time.time()), + (self.path, squashed_update, metadata, time.time()), ) await db.commit() + + def _create_squash_task(self) -> None: + """Creates a task that squashes document history after self.document_ttl + and binds it to the _squash_task attribute. If a task already exists, + this cancels the existing task.""" + if self.document_ttl is None: + return + if self._squash_task is not None: + self._squash_task.cancel() + + self._squash_task = asyncio.create_task(self._squash_coroutine()) From e39439dfc18dff7aa94ec777d653280668c7f3e9 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 1 Dec 2022 20:03:27 +0000 Subject: [PATCH 2/2] squash on init, add test for simultaneous write at document ttl --- tests/test_ystore.py | 51 +++++++++++++++++++++++++++++++++++++---- ypy_websocket/ystore.py | 21 ++++++++++++++--- 2 files changed, 65 insertions(+), 7 deletions(-) diff --git a/tests/test_ystore.py b/tests/test_ystore.py index a2a5dc2..bdde180 100644 --- a/tests/test_ystore.py +++ b/tests/test_ystore.py @@ -1,7 +1,9 @@ import asyncio import os import tempfile +import time from pathlib import Path +from unittest.mock import patch import aiosqlite import pytest @@ -66,6 +68,7 @@ async def count_yupdates(db): @pytest.mark.asyncio async def test_document_ttl_sqlite_ystore(test_ydoc): + """Assert that document history is squashed after the document TTL.""" store_name = "my_store" ystore = MySQLiteYStore(store_name, delete_db=True) @@ -75,13 +78,53 @@ async def test_document_ttl_sqlite_ystore(test_ydoc): async with aiosqlite.connect(ystore.db_path) as db: assert (await count_yupdates(db)) == i + 1 - await asyncio.sleep(ystore.document_ttl + 0.1) + await ystore._squash_task - # assert that adding a record after document TTL squashes previous document history + async with aiosqlite.connect(ystore.db_path) as db: + assert (await count_yupdates(db)) == 1 + + +@pytest.mark.asyncio +async def test_document_ttl_simultaneous_write_sqlite_ystore(test_ydoc): + """Assert that document history is squashed after the document TTL, and a + write that happens at the same time is also squashed.""" + store_name = "my_store" + ystore = MySQLiteYStore(store_name, delete_db=True) + + for i in range(3): + await ystore.write(test_ydoc.update()) + async with aiosqlite.connect(ystore.db_path) as db: + assert (await count_yupdates(db)) == i + 1 + + await asyncio.sleep(ystore.document_ttl) await ystore.write(test_ydoc.update()) + await ystore._squash_task + + async with aiosqlite.connect(ystore.db_path) as db: + assert (await count_yupdates(db)) == 1 + + +@pytest.mark.asyncio +async def test_document_ttl_init_sqlite_ystore(test_ydoc): + """Assert that document history is squashed on init if the document TTL has + already elapsed since last update.""" + store_name = "my_store" + ystore = MySQLiteYStore(store_name, delete_db=True) + now = time.time() + + with patch("time.time") as mock_time: + mock_time.return_value = now - ystore.document_ttl - 1 + for i in range(3): + await ystore.write(test_ydoc.update()) + async with aiosqlite.connect(ystore.db_path) as db: + assert (await count_yupdates(db)) == i + 1 + + del ystore + ystore = MySQLiteYStore(store_name) + await ystore.db_initialized + async with aiosqlite.connect(ystore.db_path) as db: - # two updates in DB: one squashed update and the new update - assert (await count_yupdates(db)) == 2 + assert (await count_yupdates(db)) == 1 @pytest.mark.asyncio diff --git a/ypy_websocket/ystore.py b/ypy_websocket/ystore.py index 2f4bf2c..0a9f993 100644 --- a/ypy_websocket/ystore.py +++ b/ypy_websocket/ystore.py @@ -213,6 +213,17 @@ async def init_db(self): await db.execute(f"PRAGMA user_version = {self.version}") await db.commit() + # squash updates if document TTL already elapsed + async with aiosqlite.connect(self.db_path) as db: + cursor = await db.execute( + "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", + (self.path,), + ) + row = await cursor.fetchone() + diff = (time.time() - row[0]) if row else 0 + if self.document_ttl is not None and diff > self.document_ttl: + await self._squash() + async def read(self) -> AsyncIterator[Tuple[bytes, bytes, float]]: # type: ignore await self.db_initialized try: @@ -242,8 +253,8 @@ async def write(self, data: bytes) -> None: # create task that squashes document history after document_ttl self._create_squash_task() - async def _squash_coroutine(self): - await asyncio.sleep(self.document_ttl) + async def _squash(self): + """Squashes document history into a single Y update.""" async with aiosqlite.connect(self.db_path) as db: # squash updates ydoc = Y.YDoc() @@ -263,6 +274,10 @@ async def _squash_coroutine(self): ) await db.commit() + async def _squash_later(self): + await asyncio.sleep(self.document_ttl) + await self._squash() + def _create_squash_task(self) -> None: """Creates a task that squashes document history after self.document_ttl and binds it to the _squash_task attribute. If a task already exists, @@ -272,4 +287,4 @@ def _create_squash_task(self) -> None: if self._squash_task is not None: self._squash_task.cancel() - self._squash_task = asyncio.create_task(self._squash_coroutine()) + self._squash_task = asyncio.create_task(self._squash_later())