Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions app/c_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
class Connection:
"""Wraps asyncio StreamReader/StreamWriter to provide send/recv interface.

Python 3.12 removed the deprecated asyncio.Transport send/recv methods.
This class provides a compatible interface using the modern
StreamReader/StreamWriter API.
"""

def __init__(self, reader, writer):
self.reader = reader
self.writer = writer

async def recv(self, num_bytes):
"""Read up to num_bytes from the stream."""
return await self.reader.read(num_bytes)

async def send(self, data):
"""Write data to the stream and flush."""
if isinstance(data, str):
data = data.encode()
self.writer.write(data)
await self.writer.drain()

def close(self):
"""Close the underlying writer."""
self.writer.close()
Comment on lines +25 to +26
5 changes: 3 additions & 2 deletions app/h_terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ def __init__(self, tag):
@staticmethod
async def run(socket, path, services):
session_id = path.split('/')[2]
cmd = await socket.recv()
msg = await socket.receive()
cmd = msg.data if hasattr(msg, 'data') else str(msg)
Comment on lines +16 to +17
handler = services.get('term_svc').socket_conn.tcp_handler
paw = next(i.paw for i in handler.sessions if i.id == int(session_id))
services.get('contact_svc').report['websocket'].append(
dict(paw=paw, date=datetime.now(timezone.utc).strftime(BaseWorld.TIME_FORMAT), cmd=cmd)
)
status, pwd, reply, response_time = await handler.send(session_id, cmd)
await socket.send(json.dumps(dict(response=reply.strip(), pwd=pwd, status=status, response_time=response_time)))
await socket.send_str(json.dumps(dict(response=reply.strip(), pwd=pwd, status=status, response_time=response_time)))
Empty file added tests/__init__.py
Empty file.
128 changes: 128 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import asyncio
import unittest


class TestConnection(unittest.TestCase):
"""Tests for the Connection class that wraps asyncio streams."""

def test_connection_init(self):
"""Connection stores reader and writer references."""
from app.c_connection import Connection

mock_reader = object()
mock_writer = object()
conn = Connection(mock_reader, mock_writer)
self.assertIs(conn.reader, mock_reader)
self.assertIs(conn.writer, mock_writer)

def test_recv(self):
"""Connection.recv reads from the underlying StreamReader."""
from app.c_connection import Connection

class MockReader:
def __init__(self):
self.read_called_with = None

async def read(self, n):
self.read_called_with = n
return b"hello"

class MockWriter:
pass

reader = MockReader()
conn = Connection(reader, MockWriter())
result = asyncio.get_event_loop().run_until_complete(conn.recv(1024))
self.assertEqual(result, b"hello")
Comment on lines +33 to +36
self.assertEqual(reader.read_called_with, 1024)

def test_send_bytes(self):
"""Connection.send writes bytes to the underlying StreamWriter."""
from app.c_connection import Connection

class MockReader:
pass

class MockWriter:
def __init__(self):
self.written = None
self.drained = False

def write(self, data):
self.written = data

async def drain(self):
self.drained = True

writer = MockWriter()
conn = Connection(MockReader(), writer)
asyncio.get_event_loop().run_until_complete(conn.send(b"world"))
self.assertEqual(writer.written, b"world")
self.assertTrue(writer.drained)
Comment on lines +57 to +61

def test_send_str_encodes_to_bytes(self):
"""Connection.send encodes str to bytes before writing."""
from app.c_connection import Connection

class MockReader:
pass

class MockWriter:
def __init__(self):
self.written = None

def write(self, data):
self.written = data

async def drain(self):
pass

writer = MockWriter()
conn = Connection(MockReader(), writer)
asyncio.get_event_loop().run_until_complete(conn.send("text"))
self.assertEqual(writer.written, b"text")
Comment on lines +80 to +83

def test_close(self):
"""Connection.close closes the underlying writer."""
from app.c_connection import Connection

class MockReader:
pass

class MockWriter:
def __init__(self):
self.closed = False

def close(self):
self.closed = True

writer = MockWriter()
conn = Connection(MockReader(), writer)
conn.close()
self.assertTrue(writer.closed)


class TestHandleWebSocket(unittest.TestCase):
"""Tests for the updated WebSocket API usage in h_terminal.py Handle."""

def test_handle_uses_receive_and_send_str(self):
"""Verify Handle.run uses socket.receive() and socket.send_str()."""
import ast
with open("app/h_terminal.py") as f:
source = f.read()

Comment on lines +111 to +113
# Ensure deprecated methods are not used
self.assertNotIn("socket.recv()", source,
"Should use socket.receive() instead of deprecated socket.recv()")
self.assertNotIn("await socket.send(", source,
"Should use socket.send_str() instead of deprecated socket.send()")

# Ensure modern methods are used
self.assertIn("socket.receive()", source,
"Should call socket.receive() for reading WebSocket messages")
self.assertIn("socket.send_str(", source,
"Should call socket.send_str() for sending WebSocket messages")
Comment on lines +114 to +124
Comment on lines +114 to +124


if __name__ == "__main__":
unittest.main()
Loading