Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test-small:
runs-on: ubuntu-18.04
container:
image: python:3.9.1-slim-buster
image: python:3.11.2-buster
steps:
- name: Checkout source code
uses: actions/checkout@v2
Expand Down
31 changes: 31 additions & 0 deletions example/main_stdio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import msgflow
from msgflow.channel.stdio_channel import StdioChannel


class App:
def __init__(self):
self._channel = StdioChannel(user_name="You")

@property
def input_channel(self):
return self._channel

@msgflow.handler(
pattern=r'^Hello$',
pattern_attr=lambda msg: msg.req,
)
async def handle_input(self, msg, match):
await msg.respond(f"Hello!, match={match.group()}")


def main():
app = App()
bot = msgflow.Pipeline(app=app)
try:
asyncio.run(bot.run())
except KeyboardInterrupt:
print("Exit by KeyboardInterrupt")


main()
3 changes: 3 additions & 0 deletions msgflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
from .bot import Messenger

from .channel.channel import Channel, Message
from .pipeline import Pipeline, PipelineChannelMixin, handler
Empty file added msgflow/channel/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions msgflow/channel/channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import asyncio
from typing import Any


class Channel:
async def flow(self, queue: asyncio.Queue) -> None:
"""flow metiod is supposed to be called by asyncio.create_task.
Then it puts messaegs in a queue given as an argument.
"""
raise NotImplementedError()

async def post(self, obj: Any) -> None:
raise NotImplementedError()


class Message:
@property
def channel(self) -> Channel:
raise NotImplementedError()

async def respond(self, obj: Any) -> None:
raise NotImplementedError
57 changes: 57 additions & 0 deletions msgflow/channel/stdio_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import asyncio
from msgflow import Channel, Message, PipelineChannelMixin
import sys
import aioconsole


class StdioChannel(Channel, PipelineChannelMixin):
def __init__(self, user_name: str, in_fd=sys.stdin, out_fd=sys.stdout):
self._user_name = user_name
self._in_fd = in_fd
self._out_fd = out_fd

async def flow(self, queue: asyncio.Queue) -> None:
while True:
try:
text = await aioconsole.ainput(f"{self._user_name}> ")
except (KeyboardInterrupt, EOFError):
print("Bye!", file=self._out_fd)
break

msg = StdioMessage(channel=self, req=text)
await queue.put(msg)
await msg.wait_response()

async def post(self, obj: str) -> None:
print(obj, file=self._out_fd)

@property
def out_fd(self):
return self._out_fd


class StdioMessage(Message):
def __init__(self, channel, req=None, res=None):
self._req = req
self._res = res
self._channel = channel
self._event = asyncio.Event()

@property
def channel(self) -> Channel:
return self._channel

async def respond(self, obj: str) -> None:
print(f">>> {obj}", file=self._channel.out_fd)
self._event.set()

@property
def req(self):
return self._req

@property
def res(self):
return self._res

async def wait_response(self):
await self._event.wait()
67 changes: 67 additions & 0 deletions msgflow/channel/webapi_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import asyncio
from msgflow import Channel, Message, PipelineChannelMixin
from typing import Any
from aiohttp import web


class WebapiChannel(Channel, PipelineChannelMixin):
def __init__(self, host, port, routes):
self._host = host
self._port = port
self._routes = routes

async def flow(self, queue: asyncio.Queue) -> None:
async def handler(request):
msg = Message(req=request, res=None, service=self)
await queue.put(msg)
print(f"Queue updated: size={queue.qsize()}, object={queue}")
await msg.wait_response()
return msg.res

# https://docs.aiohttp.org/en/stable/web_advanced.html#application-runners
app = web.Application()
for method, path in self._routes:
app.add_routes([getattr(web, method)(path, handler)])

runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self._host, self._port)
await site.start()
try:
while True:
await asyncio.sleep(3600)
except asyncio.CancelledError:
print("Clean up")
# wait for finish signal
await runner.cleanup()

async def post(self, obj: Any) -> None:
raise NotImplementedError()


class WebapiMessage(Message):
def __init__(self, channel, req, res):
""""""
self._req = req
self._res = res
self._channel = channel
self._event = asyncio.Event()

@property
def channel(self) -> Channel:
return self._channel

async def respond(self, obj: str) -> None:
self._res = obj
self._event.set()

@property
def req(self):
return self._req

@property
def res(self):
return self._res

async def wait_response(self):
return self._event.wait()
139 changes: 139 additions & 0 deletions msgflow/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import asyncio
from pydantic import BaseModel
from msgflow import Channel
from typing import Any, Optional
import re
import inspect


class PipelineChannelMixin:
@property
def name(self) -> str:
return self._pipeline_provider_name

def set_name(self, name):
self._pipeline_provider_name = name


class Pipeline:
def __init__(self, app):
# List up all methods which has "config" attr set by `handler` decorator
handlers = []
for method_name in dir(app):
method = getattr(app, method_name)
if hasattr(method, "config"):
handlers.append(method)

print(handlers)
self._handlers = handlers

# Create a list including input channels
input_channels = []
if isinstance(app.input_channel, dict):
# Sort may be needed
for svc_name, svc in app.input_channel.items():
svc.set_name(svc_name)
input_channels.append(svc)
elif isinstance(app.input_channel, list):
input_channels = app.input_channel
else:
input_channels = [app.input_channel]

self._input_channels = input_channels

async def _handle(self, queue):
while True:
msg = await queue.get()
for handler in self._handlers:
# Logging
print(f"handler: {handler.__name__}, config: {handler.config}")

if not handler.config.enabled:
continue
if (handler.config.channel_name is not None) and (msg.channel.name != handler.config.channel_name):
continue
if (handler.config.cond is not None) and (not handler.config.cond(msg)):
continue

match = None
if handler.config.pattern is not None:
match = re.match(handler.config.pattern, handler.config.pattern_attr(msg))
if match is None:
continue

# Get handlers argument names
handler_arg_names = inspect.getfullargspec(handler).args[1:] # use [1:] to remove first "self"

# Set up argument to pass to handler
possible_arg_map = {"msg": msg, "match": match}
args = dict()
for arg in handler_arg_names:
if arg in possible_arg_map:
args[arg] = possible_arg_map[arg]
else:
raise Exception(f"{arg} should be one of {set(possible_arg_map.keys())}")
await handler(**args)

# If one of handlers matches, other handlers will be ignored
#break

async def run(self):
queue = asyncio.Queue()

# Create tasks and **schedule to run soon concurrently**
service_tasks = []
for service in self._input_channels:
svc_task = asyncio.create_task(service.flow(queue=queue))
#svc_task.set_name(f"{svc_task.get_name()}-{service.name}")
service_tasks.append(svc_task)
handler_task = asyncio.create_task(self._handle(queue=queue))

# Check one of service tasks completes
# Use wait to check one of service/task is completed or raise error
done, pending = await asyncio.wait(
service_tasks + [handler_task],
return_when=asyncio.FIRST_COMPLETED
)


def _default_pattern_attr(msg):
raise NotImplementedError("Specify which attribute to use for pattern match")


class HandlerConfig(BaseModel):
enabled: bool
channel_name: Optional[str]
cond: Optional[Any]
pattern: Optional[str]
pattern_attr: Any = _default_pattern_attr


def handler(
_func=None, *,
channel=None,
cond=None,
pattern=None,
pattern_attr=_default_pattern_attr,
):
"""Tag function as a handler of service
"""

def decorator(func):
# Set "config" attribute
func.config = HandlerConfig(
enabled=True,
channel_name=channel,
cond=cond,
pattern=pattern,
pattern_attr=pattern_attr,
)
return func

if _func is None:
# when handler is called with arguments,,
# _func is set to None
return decorator
else:
# when handler is called without arguments,
# _func is set to the function to be decorated
return decorator(_func)
9 changes: 9 additions & 0 deletions msgflow/regex.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import regex_spm


def match(*args, **argv):
return regex_spm.match_in(*args, **argv)


def search(*args, **argv):
return regex_spm.search_in(*args, **argv)
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
"envyaml==0.2060",
"pydantic>=1.0,<2",
"requests>=2,<3",
"aiohttp>=3.8,<3.9",
"aioconsole>=0.5.1,<0.6.0",
"regex_spm>=1.0.0,<2",
],
extras_require={
"test": ["pytest>=5", "black==20.8b1"],
Expand Down