Skip to content

Commit 7599967

Browse files
adding sqlite3 queue
1 parent 49cd2af commit 7599967

File tree

3 files changed

+179
-4
lines changed

3 files changed

+179
-4
lines changed

src/mcp_utils/queue.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
"""
44

55
import logging
6+
import sqlite3
7+
import time
68
from typing import Protocol
79

810
import msgspec
@@ -108,3 +110,105 @@ def clear_session(self, session_id: str) -> None:
108110
queue_key = self._get_queue_key(session_id)
109111
self.redis.delete(queue_key)
110112
logger.debug(f"Redis: Clearing session: {session_id}")
113+
114+
115+
class SQLiteResponseQueue(ResponseQueueProtocol):
116+
"""
117+
A SQLite-backed queue implementation for MCP responses.
118+
Each session is a logical queue keyed by `session_id`.
119+
120+
Uses a simple table and transactional pop to ensure single-delivery.
121+
"""
122+
123+
def __init__(self, db_path: str = ":memory:"):
124+
"""Initialize and create tables if needed.
125+
126+
Args:
127+
db_path: Path to the SQLite database file. Defaults to in-memory.
128+
"""
129+
# isolation_level=None -> autocommit mode so we can manage BEGIN/COMMIT explicitly
130+
self.conn = sqlite3.connect(
131+
db_path, timeout=30, check_same_thread=False, isolation_level=None
132+
)
133+
self.conn.execute(
134+
"""
135+
CREATE TABLE IF NOT EXISTS response_queue (
136+
id INTEGER PRIMARY KEY AUTOINCREMENT,
137+
session_id TEXT NOT NULL,
138+
payload TEXT NOT NULL,
139+
created_at REAL DEFAULT (strftime('%s','now'))
140+
)
141+
"""
142+
)
143+
self.conn.execute(
144+
"""
145+
CREATE INDEX IF NOT EXISTS idx_response_queue_session_id
146+
ON response_queue(session_id, id)
147+
"""
148+
)
149+
150+
def push_response(self, session_id: str, response: MCPResponse) -> None:
151+
"""Insert a response payload for a session."""
152+
payload = msgspec.json.encode(response).decode()
153+
logger.debug(f"SQLite: Saving response for session: {session_id}: {payload}")
154+
self.conn.execute(
155+
"INSERT INTO response_queue (session_id, payload) VALUES (?, ?)",
156+
(session_id, payload),
157+
)
158+
159+
def _pop_one(self, session_id: str) -> str | None:
160+
"""Atomically pop the oldest payload for the session, if any."""
161+
try:
162+
cur = self.conn.cursor()
163+
cur.execute("BEGIN IMMEDIATE")
164+
row = cur.execute(
165+
"SELECT id, payload FROM response_queue WHERE session_id = ? ORDER BY id LIMIT 1",
166+
(session_id,),
167+
).fetchone()
168+
if not row:
169+
cur.execute("ROLLBACK")
170+
return None
171+
row_id, payload = row
172+
cur.execute("DELETE FROM response_queue WHERE id = ?", (row_id,))
173+
cur.execute("COMMIT")
174+
return payload
175+
except sqlite3.Error as e:
176+
logger.error(f"SQLite pop error: {e}")
177+
try:
178+
self.conn.execute("ROLLBACK")
179+
except Exception:
180+
pass
181+
return None
182+
183+
def wait_for_response(self, session_id: str, timeout: float | None = None) -> str | None:
184+
"""
185+
Wait for and pop the next response for a session.
186+
187+
If timeout is None, waits indefinitely using polling.
188+
If timeout is 0, returns immediately if none available.
189+
"""
190+
# Immediate, non-blocking attempt
191+
payload = self._pop_one(session_id)
192+
if payload is not None:
193+
return payload
194+
195+
if timeout == 0:
196+
return None
197+
198+
# Poll until available or timeout
199+
start = time.time()
200+
while True:
201+
payload = self._pop_one(session_id)
202+
if payload is not None:
203+
return payload
204+
if timeout is not None and (time.time() - start) >= timeout:
205+
return None
206+
time.sleep(0.1)
207+
208+
def clear_session(self, session_id: str) -> None:
209+
"""Remove all queued items for a session."""
210+
logger.debug(f"SQLite: Clearing session: {session_id}")
211+
self.conn.execute(
212+
"DELETE FROM response_queue WHERE session_id = ?",
213+
(session_id,),
214+
)

tests/test_core.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
)
2727

2828

29-
class TestResponseQueue(ResponseQueueProtocol):
29+
class DemoResponseQueue(ResponseQueueProtocol):
3030
"""Mock response queue for testing."""
3131

3232
def __init__(self) -> None:
@@ -58,13 +58,13 @@ def clear_session(self, session_id: str) -> None:
5858

5959

6060
@pytest.fixture
61-
def response_queue() -> TestResponseQueue:
61+
def response_queue() -> DemoResponseQueue:
6262
"""Create a test response queue."""
63-
return TestResponseQueue()
63+
return DemoResponseQueue()
6464

6565

6666
@pytest.fixture
67-
def server(response_queue: TestResponseQueue) -> MCPServer:
67+
def server(response_queue: DemoResponseQueue) -> MCPServer:
6868
"""Create a test MCP server."""
6969
return MCPServer(
7070
name="Test Server",

tests/test_queue_sqlite.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""Tests for SQLiteResponseQueue implementation."""
2+
3+
import json
4+
from typing import Any
5+
6+
import pytest
7+
8+
from mcp_utils.queue import SQLiteResponseQueue
9+
from mcp_utils.schema import MCPResponse
10+
11+
12+
@pytest.fixture()
13+
def q(tmp_path) -> SQLiteResponseQueue:
14+
return SQLiteResponseQueue(db_path=str(tmp_path / "queue.db"))
15+
16+
17+
def build_response(resp_id: str | int, result: dict[str, Any] | None = None) -> MCPResponse:
18+
return MCPResponse(jsonrpc="2.0", id=resp_id, result=result or {})
19+
20+
21+
def test_push_and_pop_single(q: SQLiteResponseQueue) -> None:
22+
session = "s1"
23+
resp = build_response("1", {"x": 1})
24+
q.push_response(session, resp)
25+
26+
data = q.wait_for_response(session, timeout=0)
27+
assert data is not None
28+
payload = json.loads(data)
29+
assert payload["jsonrpc"] == "2.0"
30+
assert payload["id"] == "1"
31+
assert payload["result"] == {"x": 1}
32+
33+
34+
def test_timeout_behavior(q: SQLiteResponseQueue) -> None:
35+
session = "s-timeout"
36+
assert q.wait_for_response(session, timeout=0) is None
37+
assert q.wait_for_response(session, timeout=0.1) is None
38+
39+
40+
def test_clear_session(q: SQLiteResponseQueue) -> None:
41+
session = "s-clear"
42+
q.push_response(session, build_response("1"))
43+
q.push_response(session, build_response("2"))
44+
q.clear_session(session)
45+
assert q.wait_for_response(session, timeout=0) is None
46+
47+
48+
def test_fifo_ordering(q: SQLiteResponseQueue) -> None:
49+
session = "s-fifo"
50+
q.push_response(session, build_response("1", {"n": 1}))
51+
q.push_response(session, build_response("2", {"n": 2}))
52+
53+
d1 = json.loads(q.wait_for_response(session, timeout=0))
54+
d2 = json.loads(q.wait_for_response(session, timeout=0))
55+
assert d1["id"] == "1"
56+
assert d2["id"] == "2"
57+
58+
59+
def test_isolation_between_sessions(q: SQLiteResponseQueue) -> None:
60+
s1, s2 = "s-a", "s-b"
61+
q.push_response(s1, build_response("a1"))
62+
q.push_response(s2, build_response("b1"))
63+
64+
d1 = json.loads(q.wait_for_response(s1, timeout=0))
65+
assert d1["id"] == "a1"
66+
# s1 should be empty now
67+
assert q.wait_for_response(s1, timeout=0) is None
68+
69+
d2 = json.loads(q.wait_for_response(s2, timeout=0))
70+
assert d2["id"] == "b1"
71+

0 commit comments

Comments
 (0)