Skip to content

Commit d2f0709

Browse files
committed
aio.core: Add StdinStdoutProcessor util
Signed-off-by: Ryan Northey <ryan@synca.io>
1 parent 816be6a commit d2f0709

File tree

4 files changed

+116
-0
lines changed

4 files changed

+116
-0
lines changed

aio.core/aio/core/pipe/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
from .abstract import AStdinStdoutProcessor
3+
from .pipe import StdinStdoutProcessor
4+
5+
from . import abstract
6+
7+
8+
__all__ = (
9+
"abstract"
10+
"AStdinStdoutProcessor",
11+
"StdinStdoutProcessor")
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
from .pipe import AStdinStdoutProcessor
3+
4+
5+
__all__ = (
6+
"AStdinStdoutProcessor", )
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
2+
import asyncio
3+
import importlib
4+
import sys
5+
from functools import cached_property
6+
7+
import abstracts
8+
9+
from aio.core.functional import async_property
10+
11+
12+
class AStdinStdoutProcessor(metaclass=abstracts.Abstraction):
13+
14+
def __init__(self, stdin, stdout, processor, log=None):
15+
self.processor = processor
16+
self.stdin = stdin
17+
self.stdout = stdout
18+
self._log = log
19+
20+
@cached_property
21+
def connecting(self):
22+
return asyncio.Lock()
23+
24+
@async_property(cache=True)
25+
async def connection(self):
26+
reader = asyncio.StreamReader()
27+
protocol = asyncio.StreamReaderProtocol(reader)
28+
await self.loop.connect_read_pipe(lambda: protocol, self.stdin)
29+
w_transport, w_protocol = await self.loop.connect_write_pipe(asyncio.streams.FlowControlMixin, self.stdout)
30+
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, self.loop)
31+
return reader, writer
32+
33+
@async_property
34+
async def listener(self):
35+
async with self.connecting:
36+
reader = await self.reader
37+
self.log(f"LISTENING\n")
38+
while True:
39+
line = await reader.readline()
40+
if not line:
41+
self.log(f"CLOSED: {line}\n")
42+
break
43+
if not line.strip():
44+
continue
45+
# self.log(f"RCVD: {line}\n")
46+
await self.in_q.put(line)
47+
48+
@async_property
49+
async def responder(self):
50+
async with self.connecting:
51+
writer = await self.writer
52+
self.log(f"RESPONDING\n")
53+
while True:
54+
outgoing = await self.out_q.get()
55+
self.out_q.task_done()
56+
# self.log(f"SEND: {outgoing}\n")
57+
writer.write(outgoing.encode("utf-8"))
58+
59+
@async_property
60+
async def reader(self):
61+
return (await self.connection)[0]
62+
63+
@async_property
64+
async def writer(self):
65+
return (await self.connection)[1]
66+
67+
@cached_property
68+
def in_q(self):
69+
return asyncio.Queue()
70+
71+
@cached_property
72+
def out_q(self):
73+
return asyncio.Queue()
74+
75+
async def process(self, *args):
76+
await asyncio.gather(
77+
asyncio.create_task(self.processor(self.in_q, self.out_q, *args, log=self.log)),
78+
asyncio.create_task(self.listener),
79+
asyncio.create_task(self.responder))
80+
81+
@cached_property
82+
def loop(self):
83+
return asyncio.get_event_loop()
84+
85+
def __call__(self, *args):
86+
return self.process(*args)
87+
88+
def log(self, message):
89+
if self._log:
90+
self._log(message)

aio.core/aio/core/pipe/pipe.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
2+
import abstracts
3+
4+
from aio.core import pipe
5+
6+
7+
@abstracts.implementer(pipe.AStdinStdoutProcessor)
8+
class StdinStdoutProcessor:
9+
pass

0 commit comments

Comments
 (0)