Skip to content

Commit 4ba1149

Browse files
joamagclaudeCopilot
authored
feat: add backward-compatible Container support for Agent/Protocol arhitecture (#47)
* feat: add backward-compatible Container support for Agent/Protocol architecture Enables the Container to multiplex both old Base-derived and new Agent/Protocol-derived objects on a shared poll, fixing AttributeError when running ConsulProxyServer. Adds _container_loop bridging, event relay, proxy throttle fixes, and comprehensive test coverage for container setup and proxy data flow. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * docs: split compat.md into architecture and compatibility docs Adds architecture.md covering the event loop, Protocol/Transport, Agent, and Container design. Refocuses compat.md on execution modes, the CompatLoop adapter, and native vs compat performance tradeoffs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: new bi-directional asyncio compatibility design in architecture.md * chore: executed black * fix: for some tests * fix: compatibility issues in unit tests * chore: small doc fixes in architecture.md * chore: new assert in validation * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * chore: remove redundant container loop assignment in load_base() * chore: improved commenting --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent dca1595 commit 4ba1149

File tree

13 files changed

+1571
-18
lines changed

13 files changed

+1571
-18
lines changed

CHANGELOG.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12-
*
12+
* Base-compatible stub methods on `Agent` so that `Container` works with both old (`Base`) and new (`Agent`/`Protocol`) architectures without defensive guards
13+
* `ClientAgent.connect()` method with `_container_loop` support for protocol-based connections to join the container's shared poll
14+
* Event relay system in `ClientAgent._relay_protocol_events()` bridging protocol events to client-level observers
15+
* Container tests (`netius.test.base.container`) covering setup, event bindings, lifecycle, and cleanup
16+
* Data flow tests for `ReverseProxyServer` covering request routing, response relay, error handling, and lifecycle management
1317

1418
### Changed
1519

16-
*
20+
* `Container.apply_base()` now sets `_container_loop` on non-`Base` objects to enable dual architecture multiplexing
21+
* `HTTPClient.method()` uses `_container_loop` as default loop when available for container integration
22+
* Proxy throttle methods use `self.reads()` / `self.writes()` since connections are owned by the proxy server
1723

1824
### Fixed
1925

20-
*
26+
* `AttributeError` when running `ConsulProxyServer` due to `Agent` subclasses missing `load`, `unload`, `ticks`, and other `Base`-expected methods
2127

2228
## [1.21.0] - 2026-02-07
2329

doc/architecture.md

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# Architecture
2+
3+
Netius is a networking library with its own event loop, a Protocol/Transport layer that is
4+
API-compatible with Python's asyncio, and a Container system for multiplexing multiple services
5+
on a shared poll.
6+
7+
## Bi-directional asyncio Compatibility
8+
9+
A core design principle of Netius is **bi-directional compatibility** with Python's asyncio. Rather
10+
than being a one-way wrapper, compatibility works in both directions across three layers:
11+
12+
**Protocol layer** - Netius protocols (`Protocol`, `StreamProtocol`, `DatagramProtocol`) implement
13+
the same interface as `asyncio.Protocol`: `connection_made()`, `connection_lost()`,
14+
`data_received()`, `pause_writing()`, `resume_writing()`. A protocol written for Netius runs on
15+
asyncio's event loop with zero changes, and a protocol written for asyncio runs on Netius.
16+
17+
**Transport layer** - Netius transports (`Transport`, `TransportStream`, `TransportDatagram`)
18+
implement the `asyncio.Transport` interface: `write()`, `close()`, `abort()`,
19+
`get_write_buffer_size()`, `get_extra_info()`. Internally they wrap Netius `Connection` objects, but
20+
protocols only see the standard interface. When running on asyncio, Python provides its own
21+
transports with the same API - protocols don't know the difference.
22+
23+
**Event loop layer** - `CompatLoop` wraps a Netius `Base` loop and presents the full
24+
`asyncio.AbstractEventLoop` interface (`call_soon`, `create_connection`, `create_server`,
25+
`create_task`, `run_until_complete`, etc.). This means code that expects an asyncio loop - including
26+
third-party libraries - can run on Netius. Conversely, when `ASYNCIO=1` is set, Netius uses
27+
Python's native asyncio loop directly, with Netius protocols plugging in through the standard
28+
protocol/transport interfaces.
29+
30+
This bi-directional design means Netius is not locked into its own ecosystem. Protocols are portable
31+
across loops, the transport API is interchangeable, and the loop itself can be either Netius or
32+
asyncio depending on deployment needs. The native Netius loop is the default for performance (see
33+
[compat.md](compat.md)), but asyncio is always available as a drop-in alternative.
34+
35+
## Event Loop
36+
37+
The event loop lives in `Base` (`netius.base.common`) and is built on OS-level I/O multiplexing:
38+
epoll (Linux), kqueue (macOS/BSD), poll (POSIX), or select (fallback).
39+
40+
The core cycle in `Base.loop()`:
41+
42+
```text
43+
while running:
44+
ticks() # timers, delayed callbacks, housekeeping
45+
reads, writes, errors = poll.poll()
46+
reads(sockets) # dispatch readable sockets
47+
writes(sockets) # dispatch writable sockets
48+
errors(sockets) # dispatch error sockets
49+
```
50+
51+
Each socket is registered via `sub_read(socket, owner=self)` - the `owner` is the `Base` that
52+
created the connection. This ownership is used by the Container for routing (see below).
53+
54+
### Read Path
55+
56+
```text
57+
poll -> on_read(socket) -> connection.recv()
58+
-> on_data_base(connection, data) -> connection.set_data(data)
59+
-> connection triggers "data" event
60+
```
61+
62+
`Connection` is the Netius-native socket abstraction handling buffering, SSL, and flow control.
63+
64+
## Protocol / Transport
65+
66+
Netius protocols implement the same interface as `asyncio.Protocol`, so they run on either event
67+
loop unchanged.
68+
69+
```text
70+
Protocol connection_made(), connection_lost(), pause/resume_writing
71+
+-- StreamProtocol data_received(), send()
72+
+-- DatagramProtocol datagram_received(), send_to()
73+
```
74+
75+
`StreamProtocol` also exposes backward-compat delegation properties (`socket`, `renable`,
76+
`is_throttleable()`) that reach through to the underlying `Connection`.
77+
78+
The `Transport` classes (`netius.base.transport`) wrap a `Connection` and bridge the two worlds:
79+
80+
- Binds to `Connection` `"data"` / `"close"` events
81+
- Forwards to `protocol.data_received()` / `protocol.connection_lost()`
82+
- `transport.write(data)` calls `connection.send(data, delay=False)`
83+
84+
Wiring is set up by `transport._set_compat(protocol)` which binds events and calls
85+
`protocol.connection_made(transport)`.
86+
87+
## Agent
88+
89+
`Agent` (`netius.base.agent`) is the entry point for protocol-based implementations.
90+
91+
```text
92+
Agent Base-compatible stubs (load, unload, ticks, on_start, on_stop)
93+
+-- ClientAgent connect(), event relay, thread-local caching, _container_loop
94+
+-- ServerAgent
95+
```
96+
97+
`ClientAgent.connect()` creates a protocol, calls `connect_stream(loop=self._container_loop)`,
98+
and relays protocol events (`"open"` -> `"connect"`, `"close"` -> `"close"`) so observers on the
99+
client receive events from all managed protocols.
100+
101+
Agent provides Base-compatible stubs so it can participate in a Container without defensive guards.
102+
103+
## Container
104+
105+
The `Container` (`netius.base.container`) multiplexes multiple `Base` and `Agent` instances on a
106+
shared poll. Used by composite servers like `ProxyServer` (front-end server + back-end clients).
107+
108+
```python
109+
def loop(self):
110+
while self._running:
111+
self.ticks()
112+
result = self.poll.poll_owner() # events grouped by owning Base
113+
for base, (reads, writes, errors) in result.items():
114+
base.reads(reads)
115+
base.writes(writes)
116+
base.errors(errors)
117+
```
118+
119+
### Socket Ownership
120+
121+
`Base.sub_read(socket)` registers `owner=self` in the poll. `poll_owner()` groups ready sockets
122+
by owner and returns a dict, so the Container routes each group to the correct Base.
123+
124+
### _container_loop
125+
126+
Agent-based objects need `_container_loop` (set to `Container.owner` by `apply_base()`) so that
127+
`connect_stream()` calls `owner.connect()` instead of `Container.connect()`. This ensures:
128+
129+
1. Connections are owned by the right Base (eg ProxyServer)
130+
2. `poll_owner()` routes their events to that Base's `on_read()` -> `on_data_base()`
131+
3. Data reaches the transport -> protocol layer
132+
133+
Without it, `connect_stream(loop=None)` falls back to `common.get_loop()` which returns the
134+
Container (as `Base._MAIN`). Connections owned by the Container would not be routed through the
135+
correct `on_data_base()` bridging path.

doc/compat.md

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,105 @@
11
# Compatibility with asyncio
22

3-
As part of the effort to make Netius more compatible with asyncio, the following
4-
changes have been made:
3+
The `netius.base.compat` module allows Netius protocols to run on different event loops. Each
4+
bootstrap function (`connect_stream`, `serve_stream`, `build_datagram`) dispatches to a **native**
5+
or **compat** variant based on runtime configuration.
56

6-
- The `netius` module provides a `COMPAT` mode that allows it to be used to allows Netius protocols
7-
to be used with asyncio. This mode is enabled by setting the `COMPAT` environment variable to `True`.
7+
## Execution Modes
88

9-
## Testing
9+
### Native (Default)
1010

11-
To run the echo server Protocol implementation using netius run:
11+
Uses the Netius event loop directly. Fastest path - pure callbacks, no asyncio overhead.
1212

1313
```bash
14-
PYTHONPATH=. python3 netius/servers/echo.py
14+
PYTHONPATH=. python3 netius/servers/echo.py
1515
```
1616

17-
To use the compat version meaning that an asyncio-like interface will be used underneath the hoods use:
17+
### Compat
18+
19+
Wraps the Netius event loop in `CompatLoop` to present an `asyncio.AbstractEventLoop` interface.
20+
Allows third-party asyncio protocols to run on Netius event loop.
1821

1922
```bash
2023
COMPAT=1 PYTHONPATH=. python3 netius/servers/echo.py
2124
```
2225

23-
To use the compat version and make use of the native asyncio event loop use the following:
26+
### Asyncio
27+
28+
Replaces the Netius event loop with Python's native asyncio loop. Netius protocols work because
29+
they implement the standard asyncio Protocol interface. Implies compat mode.
2430

2531
```bash
2632
COMPAT=1 ASYNCIO=1 PYTHONPATH=. python3 netius/servers/echo.py
2733
```
34+
35+
## Dispatch
36+
37+
`is_compat()` returns `True` when `COMPAT=1` or `ASYNCIO=1` is set:
38+
39+
```python
40+
def connect_stream(*args, **kwargs):
41+
if is_compat():
42+
return _connect_stream_compat(...)
43+
else:
44+
return _connect_stream_native(...)
45+
```
46+
47+
The same pattern applies to `serve_stream()` and `build_datagram()`.
48+
49+
## Native Path
50+
51+
Calls `Base.connect()` / `Base.serve()` directly with callbacks:
52+
53+
```python
54+
def _connect_stream_native(..., loop=None):
55+
loop = loop or common.get_loop()
56+
protocol = protocol_factory()
57+
loop.connect(host, port, ssl=ssl, callback=on_complete)
58+
# on_complete creates TransportStream, wires protocol
59+
```
60+
61+
No futures, no coroutines, no translation layer. SSL parameters go straight to `Base.connect()`
62+
which wraps the socket at the OS level.
63+
64+
## Compat Path
65+
66+
Routes through the asyncio API exposed by `CompatLoop`:
67+
68+
```python
69+
def _connect_stream_compat(..., loop=None):
70+
loop = loop or common.get_loop()
71+
protocol = protocol_factory()
72+
connect = loop.create_connection(build_protocol, host=host, port=port, ssl=ssl)
73+
future = loop.create_task(connect)
74+
future.add_done_callback(on_connect)
75+
```
76+
77+
`CompatLoop.create_connection()` internally calls `Base.connect()` but wraps the result in a
78+
Future and wires the protocol inside a generator-based coroutine.
79+
80+
## CompatLoop
81+
82+
Wraps a Netius `Base` loop and translates asyncio API calls:
83+
84+
| asyncio API | Netius equivalent |
85+
| ---------------------------- | ------------------------------------------ |
86+
| `call_soon(cb)` | `Base.delay(cb, immediately=True)` |
87+
| `call_later(delay, cb)` | `Base.delay(cb, timeout=delay)` |
88+
| `create_future()` | `Base.build_future()` |
89+
| `create_task(coroutine)` | `Base.ensure(coroutine)` wrapped in `Task` |
90+
| `create_connection(...)` | `Base.connect(...)` wrapped in a Future |
91+
| `create_server(...)` | `Base.serve(...)` wrapped in a Future |
92+
| `run_until_complete(future)` | `Base.run_coroutine(future)` |
93+
| `run_forever()` | `Base.run_forever()` |
94+
| `stop()` | `Base.pause()` |
95+
96+
Attributes not explicitly implemented fall through to the underlying `Base` via `__getattr__`.
97+
98+
## Why Native is Faster
99+
100+
1. **No Future/Task overhead.** Native uses direct callbacks. Compat allocates a Future, creates a Task, and dispatches through done-callbacks for every operation.
101+
2. **No translation layer.** Native calls `Base.connect()` directly. Compat routes every call through `CompatLoop` which translates asyncio API into Netius equivalents.
102+
3. **No coroutine frames.** Compat's `_create_connection()` and `_create_server()` are generator coroutines (`yield future`). Native is purely callback-driven.
103+
4. **Direct SSL.** Native passes SSL parameters to `Base.connect()` for OS-level wrapping. Compat constructs an `ssl.SSLContext` and routes through asyncio's SSL layer.
104+
105+
The difference accumulates under high connection rates where per-connection overhead matters.

src/netius/base/agent.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import threading
3232

33+
from . import compat
3334
from . import legacy
3435
from . import observer
3536

@@ -49,8 +50,26 @@ class Agent(observer.Observable):
4950
5051
For complex protocols instantiation may be useful to provided extra
5152
flexibility and context for abstract operations.
53+
54+
Provides Base-compatible stub methods so that Agents can be added
55+
to a Container alongside Base-derived objects without requiring
56+
the Container to use defensive guards. The new code (Agent) adapts
57+
to the old code (Container), keeping retro-compatibility.
5258
"""
5359

60+
_container_loop = None
61+
""" Reference to the container's owner (a Base instance) set by
62+
`Container.apply_base()` when this agent is added to a container,
63+
enables protocol connections to join the container's shared poll """
64+
65+
@property
66+
def name(self):
67+
return self.__class__.__name__
68+
69+
@property
70+
def connections(self):
71+
return []
72+
5473
@classmethod
5574
def cleanup_s(cls):
5675
pass
@@ -62,6 +81,27 @@ def cleanup(self, destroy=True):
6281
def destroy(self):
6382
observer.Observable.destroy(self)
6483

84+
def load(self):
85+
pass
86+
87+
def unload(self):
88+
pass
89+
90+
def ticks(self):
91+
pass
92+
93+
def on_start(self):
94+
pass
95+
96+
def on_stop(self):
97+
pass
98+
99+
def connections_dict(self, full=False, parent=False):
100+
return dict()
101+
102+
def info_dict(self, full=False):
103+
return dict(name=self.name)
104+
65105

66106
class ClientAgent(Agent):
67107

@@ -89,6 +129,57 @@ def get_client_s(cls, *args, **kwargs):
89129
cls._clients[tid] = client
90130
return client
91131

132+
def connect(self, host, port, ssl=False, *args, **kwargs):
133+
"""
134+
Creates a new protocol based connection to the provided
135+
host and port, using the container's shared event loop when
136+
available (dual architecture support).
137+
138+
:type host: String
139+
:param host: The hostname or IP address of the remote
140+
host to which the connection should be made.
141+
:type port: int
142+
:param port: The port number of the remote host to
143+
which the connection should be made.
144+
:type ssl: bool
145+
:param ssl: If the connection should be established using
146+
a secure SSL/TLS channel.
147+
:rtype: Protocol
148+
:return: The protocol instance that represents the newly
149+
created connection.
150+
"""
151+
152+
cls = self.__class__
153+
protocol = cls.protocol()
154+
compat.connect_stream(
155+
lambda: protocol,
156+
host=host,
157+
port=port,
158+
ssl=ssl,
159+
loop=self._container_loop,
160+
*args,
161+
**kwargs
162+
)
163+
self._relay_protocol_events(protocol)
164+
return protocol
165+
166+
def _relay_protocol_events(self, protocol):
167+
"""
168+
Relays protocol events through this client agent so that
169+
observers that bind on the client (eg proxy servers) receive
170+
events from all managed protocols.
171+
172+
Subclasses should override this method and call the parent
173+
implementation to add protocol-specific event relays.
174+
175+
:type protocol: Protocol
176+
:param protocol: The protocol instance whose events should
177+
be relayed through this client agent.
178+
"""
179+
180+
protocol.bind("open", lambda protocol: self.trigger("connect", self, protocol))
181+
protocol.bind("close", lambda protocol: self.trigger("close", self, protocol))
182+
92183

93184
class ServerAgent(Agent):
94185
pass

0 commit comments

Comments
 (0)