Skip to content

Commit dcc0e18

Browse files
committed
aio.core: Add StdinStdoutProcessor util
Signed-off-by: Ryan Northey <ryan@synca.io>
1 parent 9681b3e commit dcc0e18

File tree

11 files changed

+741
-4
lines changed

11 files changed

+741
-4
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ pypi: https://pypi.org/project/aio.api.nist
7272

7373
#### [aio.core](aio.core)
7474

75-
version: 0.8.7.dev0
75+
version: 0.8.7
7676

7777
pypi: https://pypi.org/project/aio.core
7878

aio.core/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.8.7-dev
1+
0.8.7

aio.core/aio/core/BUILD

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ pytooling_library(
3232
"functional/utils.py",
3333
"log/__init__.py",
3434
"log/logging.py",
35+
"pipe/__init__.py",
36+
"pipe/abstract/__init__.py",
37+
"pipe/abstract/pipe.py",
38+
"pipe/interface.py",
39+
"pipe/pipe.py",
3540
"stream/__init__.py",
3641
"stream/base.py",
3742
"stream/_reader.py",
@@ -43,5 +48,6 @@ pytooling_library(
4348
"tasks/__init__.py",
4449
"tasks/exceptions.py",
4550
"tasks/tasks.py",
51+
"utils.py",
4652
],
4753
)

aio.core/aio/core/__init__.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,20 @@
44
directory,
55
event,
66
functional,
7+
pipe,
78
stream,
89
subprocess,
9-
tasks)
10+
tasks,
11+
utils)
1012

1113

1214
__all__ = (
1315
"dev",
1416
"directory",
1517
"event",
1618
"functional",
19+
"pipe",
1720
"stream",
1821
"subprocess",
19-
"tasks")
22+
"tasks",
23+
"utils")

aio.core/aio/core/pipe/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
2+
from .abstract import AProcessProtocol, AStdinStdoutProcessor
3+
from .interface import IProcessProtocol, IStdinStdoutProcessor
4+
from .pipe import StdinStdoutProcessor
5+
6+
from . import abstract, interface, pipe
7+
8+
9+
__all__ = (
10+
"abstract",
11+
"AProcessProtocol",
12+
"AStdinStdoutProcessor",
13+
"interface",
14+
"IProcessProtocol",
15+
"IStdinStdoutProcessor",
16+
"pipe",
17+
"StdinStdoutProcessor")
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
2+
from .pipe import AProcessProtocol, AStdinStdoutProcessor
3+
4+
5+
__all__ = (
6+
"AProcessProtocol",
7+
"AStdinStdoutProcessor")
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
2+
import argparse
3+
import asyncio
4+
import io
5+
import sys
6+
from functools import cached_property
7+
from typing import Any, Callable, TextIO, Tuple
8+
9+
import abstracts
10+
11+
from aio.core.functional import async_property
12+
from aio.core.pipe import interface
13+
14+
15+
@abstracts.implementer(interface.IProcessProtocol)
16+
class AProcessProtocol(metaclass=abstracts.Abstraction):
17+
18+
def __init__(
19+
self,
20+
processor: interface.IProcessor,
21+
args: argparse.Namespace) -> None:
22+
self.processor = processor
23+
self.args = args
24+
25+
async def __call__(self, request: Any) -> Any:
26+
return await self.process(request)
27+
28+
@abstracts.interfacemethod
29+
async def process(self, request: Any) -> Any:
30+
raise NotImplementedError
31+
32+
33+
@abstracts.implementer(interface.IStdinStdoutProcessor)
34+
class AStdinStdoutProcessor(metaclass=abstracts.Abstraction):
35+
36+
def __init__(
37+
self,
38+
protocol: interface.IProcessProtocol,
39+
stdin: TextIO = sys.stdin,
40+
stdout: TextIO = sys.stdout,
41+
log: Callable[[str], None] = None) -> None:
42+
self._protocol = protocol
43+
self.stdin = stdin
44+
self.stdout = stdout
45+
self._log = log
46+
47+
async def __call__(self) -> None:
48+
await self.start()
49+
50+
@cached_property
51+
def connecting(self) -> asyncio.Lock:
52+
return asyncio.Lock()
53+
54+
@async_property(cache=True)
55+
async def connection(
56+
self) -> Tuple[
57+
asyncio.StreamReader,
58+
asyncio.StreamWriter]:
59+
await self.loop.connect_read_pipe(
60+
lambda: self.stream_protocol,
61+
self.stdin)
62+
return self.stream_reader, await self.stream_writer
63+
64+
@cached_property
65+
def in_q(self) -> asyncio.Queue:
66+
return asyncio.Queue()
67+
68+
@async_property
69+
async def listener(self) -> None:
70+
async with self.connecting:
71+
reader = await self.reader
72+
self.log(f"START LISTENING {reader}")
73+
while True:
74+
line = await reader.readline()
75+
if not line:
76+
self.log(f"CLOSING")
77+
break
78+
if not line.strip():
79+
continue
80+
await self.in_q.put(line.decode())
81+
self.log("STOP LISTENING")
82+
await self.in_q.put("")
83+
84+
@cached_property
85+
def loop(self) -> asyncio.AbstractEventLoop:
86+
return asyncio.get_event_loop()
87+
88+
@cached_property
89+
def out_q(self) -> asyncio.Queue:
90+
return asyncio.Queue()
91+
92+
@async_property
93+
async def processor(self) -> None:
94+
async with self.connecting:
95+
await self.protocol
96+
self.log("START PROCESSING")
97+
while True:
98+
recv = await self.recv()
99+
if not recv:
100+
break
101+
await self.send(await self.process(recv))
102+
self.complete()
103+
self.log("STOP PROCESSING")
104+
await self.send("")
105+
106+
@async_property(cache=True)
107+
async def protocol(self) -> interface.IProcessProtocol:
108+
return await self._protocol(self)
109+
110+
@async_property
111+
async def reader(self) -> asyncio.StreamReader:
112+
return (await self.connection)[0]
113+
114+
@async_property
115+
async def sender(self) -> None:
116+
async with self.connecting:
117+
writer = await self.writer
118+
self.log(f"START SEND {writer}")
119+
while True:
120+
outgoing = await self.out_q.get()
121+
if not outgoing:
122+
break
123+
self.out_q.task_done()
124+
writer.write(outgoing.encode("utf-8"))
125+
self.log("STOP SEND")
126+
127+
@cached_property
128+
def stream_protocol(self) -> asyncio.StreamReaderProtocol:
129+
return asyncio.StreamReaderProtocol(self.stream_reader)
130+
131+
@cached_property
132+
def stream_reader(self) -> asyncio.StreamReader:
133+
return asyncio.StreamReader()
134+
135+
@async_property(cache=True)
136+
async def stream_transport(
137+
self) -> Tuple[
138+
asyncio.WriteTransport,
139+
asyncio.streams.FlowControlMixin]:
140+
return await self.loop.connect_write_pipe(
141+
asyncio.streams.FlowControlMixin,
142+
self.stdout)
143+
144+
@async_property(cache=True)
145+
async def stream_writer(self) -> asyncio.StreamWriter:
146+
transport, flow = await self.stream_transport
147+
return asyncio.StreamWriter(
148+
transport,
149+
flow,
150+
self.stream_reader,
151+
self.loop)
152+
153+
@async_property
154+
async def writer(self) -> asyncio.StreamWriter:
155+
return (await self.connection)[1]
156+
157+
def complete(self) -> None:
158+
self.in_q.task_done()
159+
160+
def log(self, message: str) -> None:
161+
if self._log:
162+
self._log(f"{message}\n")
163+
164+
async def process(self, data: Any) -> Any:
165+
protocol = await self.protocol
166+
self.log(f"PROCESS: {protocol} {data}")
167+
return await protocol(data)
168+
169+
async def recv(self) -> Any:
170+
recv = await self.in_q.get()
171+
self.log(f"RECV: {recv}")
172+
return recv
173+
174+
async def send(self, msg: Any) -> Any:
175+
self.log(f"SEND: {msg}")
176+
return await self.out_q.put(msg)
177+
178+
async def start(self) -> None:
179+
self.log("PROCESSOR START")
180+
await asyncio.gather(
181+
asyncio.create_task(self.listener),
182+
asyncio.create_task(self.sender),
183+
asyncio.create_task(self.processor))
184+
self.log("PROCESSOR SHUTDOWN")
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
2+
import abstracts
3+
4+
5+
class IProcessor(metaclass=abstracts.Interface):
6+
7+
@abstracts.interfacemethod
8+
def __call__(self, *args):
9+
raise NotImplementedError
10+
11+
12+
class IStdinStdoutProcessor(IProcessor, metaclass=abstracts.Interface):
13+
14+
@abstracts.interfacemethod
15+
def __init__(self, processor, stdin=None, stdout=None, log=None):
16+
raise NotImplementedError
17+
18+
19+
class IProcessProtocol(metaclass=abstracts.Interface):
20+
21+
@abstracts.interfacemethod
22+
def __init__(self, *args, **kwargs):
23+
raise NotImplementedError
24+
25+
@abstracts.interfacemethod
26+
async def __call__(self, incoming):
27+
raise NotImplementedError

aio.core/aio/core/pipe/pipe.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
import abstracts
3+
4+
from aio.core import pipe
5+
6+
7+
@abstracts.implementer(pipe.IStdinStdoutProcessor)
8+
class StdinStdoutProcessor(pipe.AStdinStdoutProcessor):
9+
pass

aio.core/aio/core/utils.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
2+
3+
def dottedname_resolve(name, module=None):
4+
"""Resolve ``name`` to a Python object via imports / attribute lookups.
5+
6+
Lifted from `zope.dottedname.resolve`.
7+
8+
If ``module`` is None, ``name`` must be "absolute" (no leading dots).
9+
10+
If ``module`` is not None, and ``name`` is "relative" (has leading dots),
11+
the object will be found by navigating relative to ``module``.
12+
13+
Returns the object, if found. If not, propagates the error.
14+
"""
15+
name = name.split('.')
16+
if not name[0]:
17+
if module is None:
18+
raise ValueError("relative name without base module")
19+
module = module.split('.')
20+
name.pop(0)
21+
while not name[0]:
22+
module.pop()
23+
name.pop(0)
24+
name = module + name
25+
26+
used = name.pop(0)
27+
found = __import__(used)
28+
for n in name:
29+
used += '.' + n
30+
try:
31+
found = getattr(found, n)
32+
except AttributeError:
33+
__import__(used)
34+
found = getattr(found, n)
35+
36+
return found

0 commit comments

Comments
 (0)