Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Claude Code Guide for Callosum

## Project Overview

Callosum is an asyncio-based RPC library for Python that supports multiple transport backends (ZeroMQ, Redis, Thrift).

## Development Environment

This project uses **uv** as the package manager.

### Setup

```bash
uv sync --extra zeromq --extra redis --extra thrift
```

### Running Tests

```bash
# Run all tests
uv run pytest

# Run specific test file
uv run pytest tests/test_rpc.py -v

# Run specific test
uv run pytest tests/test_rpc.py::test_external_context_injection -v
```

### Type Checking

```bash
uv run mypy src/callosum
```

### Linting

```bash
uv run ruff check src/
uv run ruff format src/
```

## Project Structure

```
src/callosum/
├── lower/ # Transport layer (zeromq, redis)
│ ├── __init__.py # BaseTransport, AbstractBinder, AbstractConnector
│ ├── zeromq.py # ZeroMQ transport implementation
│ └── redis.py # Redis transport implementation
├── rpc/ # RPC layer
│ ├── channel.py # Peer class (main RPC interface)
│ ├── message.py # RPC message types
│ └── exceptions.py
├── auth.py # Authentication (CURVE for ZeroMQ)
├── ordering.py # Async schedulers
└── serial.py # Serialization utilities
```

## Key Classes

- `Peer` (rpc/channel.py): Main RPC interface for both client and server
- `ZeroMQBaseTransport` (lower/zeromq.py): ZeroMQ transport with context management
- `ZeroMQRPCTransport`: RPC-specific transport using ROUTER/DEALER sockets

## Optional Dependencies

Defined in `pyproject.toml` under `[project.optional-dependencies]`:
- `zeromq`: pyzmq for ZeroMQ transport
- `redis`: redis-py for Redis transport
- `thrift`: thriftpy2 for Thrift serialization
- `snappy`: python-snappy for compression
1 change: 1 addition & 0 deletions changes/43.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lower.zeromq: Add support for external `zmq.asyncio.Context` injection via `transport_opts["zctx"]` to allow sharing a single context across multiple Peer instances
8 changes: 6 additions & 2 deletions src/callosum/lower/zeromq.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ class ZeroMQBaseTransport(BaseTransport):

__slots__ = BaseTransport.__slots__ + (
"_zctx",
"_external_zctx",
"_zsock_opts",
"_zap_server",
"_zap_task",
Expand All @@ -501,7 +502,9 @@ def __init__(
super().__init__(authenticator, **kwargs)
self._zap_server = None
self._zap_task = None
self._zctx = zmq.asyncio.Context()
# Support external context injection via transport_opts["zctx"]
self._external_zctx = self.transport_opts.get("zctx") is not None
self._zctx = self.transport_opts.get("zctx") or zmq.asyncio.Context()
match self.authenticator:
case AbstractServerAuthenticator() as auth:
self._zap_server = ZAPServer(self._zctx, auth)
Expand All @@ -524,7 +527,8 @@ async def close(self) -> None:
self._zap_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._zap_task
if self._zctx is not None:
# Do not destroy externally injected context
if self._zctx is not None and not self._external_zctx:
self._zctx.destroy(linger=50)


Expand Down
94 changes: 94 additions & 0 deletions tests/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,97 @@ async def _do_request(idx: int) -> int:
assert e.args[0] == "ZeroDivisionError"
else:
assert call_results[idx] == idx


@pytest.mark.asyncio
async def test_external_context_injection() -> None:
"""Test that an externally injected zmq context is used and not destroyed on close."""
import zmq.asyncio

# Create an external context
external_zctx = zmq.asyncio.Context()

async def func(request: RPCMessage) -> str:
return "ok"

# Create server with external context
server = Peer(
bind=ZeroMQAddress("tcp://127.0.0.1:5021"),
transport=ZeroMQRPCTransport,
transport_opts={"zctx": external_zctx},
scheduler=ExitOrderedAsyncScheduler(),
serializer=lambda o: json.dumps(o).encode("utf8"),
deserializer=lambda b: json.loads(b),
)
server.handle_function("func", func)

# Create client with external context
client = Peer(
connect=ZeroMQAddress("tcp://localhost:5021"),
transport=ZeroMQRPCTransport,
transport_opts={"zctx": external_zctx},
serializer=lambda o: json.dumps(o).encode("utf8"),
deserializer=lambda b: json.loads(b),
)

# Verify the external context is used
server_transport = cast(ZeroMQRPCTransport, server._transport)
client_transport = cast(ZeroMQRPCTransport, client._transport)
assert server_transport._zctx is external_zctx
assert server_transport._external_zctx is True
assert client_transport._zctx is external_zctx
assert client_transport._external_zctx is True

async with server:
async with client:
result = await client.invoke("func", {})
assert result == "ok"

# Verify context is NOT destroyed after transport close
assert not external_zctx.closed

# Clean up
external_zctx.destroy(linger=0)


@pytest.mark.asyncio
async def test_internal_context_destroyed_on_close() -> None:
"""Test that internally created zmq context is destroyed on close."""

async def func(request: RPCMessage) -> str:
return "ok"

# Create server without external context (uses internal)
server = Peer(
bind=ZeroMQAddress("tcp://127.0.0.1:5022"),
transport=ZeroMQRPCTransport,
scheduler=ExitOrderedAsyncScheduler(),
serializer=lambda o: json.dumps(o).encode("utf8"),
deserializer=lambda b: json.loads(b),
)
server.handle_function("func", func)

# Create client without external context (uses internal)
client = Peer(
connect=ZeroMQAddress("tcp://localhost:5022"),
transport=ZeroMQRPCTransport,
serializer=lambda o: json.dumps(o).encode("utf8"),
deserializer=lambda b: json.loads(b),
)

# Verify internal context is created
server_transport = cast(ZeroMQRPCTransport, server._transport)
client_transport = cast(ZeroMQRPCTransport, client._transport)
assert server_transport._external_zctx is False
assert client_transport._external_zctx is False
server_zctx = server_transport._zctx
client_zctx = client_transport._zctx

async with server:
async with client:
result = await client.invoke("func", {})
assert result == "ok"

# Verify context IS destroyed after transport close
assert server_zctx.closed
assert client_zctx.closed