Skip to content
This repository was archived by the owner on Aug 25, 2024. It is now read-only.

Commit dabfb17

Browse files
authored
docs: tutorial: dataflow: chatbot: Gitter bot
1 parent 81c8e39 commit dabfb17

File tree

8 files changed

+554
-1
lines changed

8 files changed

+554
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6666
HTTP 307 response
6767
- Support for immediate response in HTTP service
6868
- Daal4py example usage.
69+
- Gitter chatbot tutorial.
6970
### Changed
7071
- Renamed `-seed` to `-inputs` in `dataflow create` command
7172
- Renamed configloader/png to configloader/image and added support for loading JPEG and TIFF file formats

dffml/df/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,9 @@ async def run(
460460
# We can't pass self to functions running in threads
461461
# Its not thread safe!
462462
bound = func.__get__(self, self.__class__)
463-
result = await bound(**inputs)
463+
result = bound(**inputs)
464+
if inspect.isawaitable(result):
465+
result = await result
464466
elif inspect.iscoroutinefunction(func):
465467
result = await func(**inputs)
466468
else:
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
Gitter ML Inference Chatbot
2+
===========================
3+
4+
This tutorial shows how to use configs in DFFML operations. We'll be implementing
5+
a Gitter chatbot. Let's take a look at the final result before moving forward.
6+
7+
.. image:: ./data/gitter.gif
8+
9+
Okay, Let's start!!
10+
We'll be using the Gitter's Streamping API to collect chats, for this we need an
11+
authorization token from Gitter. Go to https://developer.gitter.im/apps
12+
and get the personal access token for your chatbot (If you are redirected to the Gitter docs
13+
from this URL, sign in and try again).
14+
15+
Our dataflow will take a Gitter room URI as input (For https://gitter.im/dffml/community
16+
``dffml/community`` is the URI), listens to chats in the room and replies to
17+
messages which are directed to our bot.
18+
19+
.. note::
20+
21+
All the code for this example is located under the
22+
`examples/dataflow/chatbot <https://github.com/intel/dffml/blob/master/examples/examples/dataflow/chatbot>`_
23+
directory of the DFFML source code.
24+
25+
We'll write the operations for this dataflow in operations.py
26+
27+
.. literalinclude:: /../examples/dataflow/chatbot/operations.py
28+
:lines: 24-51
29+
30+
All requests to Gitter's API requires the room id for our room.
31+
``get_room_id`` gets the ``room id`` from room name (The input to
32+
our dataflow).
33+
34+
.. literalinclude:: /../examples/dataflow/chatbot/operations.py
35+
:lines: 52-87
36+
37+
We listen to new messages directed to our bot.
38+
39+
.. literalinclude:: /../examples/dataflow/chatbot/operations.py
40+
:lines: 90-122
41+
42+
We'll use this op to send replies back to the chatroom
43+
44+
.. literalinclude:: /../examples/dataflow/chatbot/operations.py
45+
:lines: 125-220
46+
47+
This is the operation where all the logic for interpreting the messages
48+
go. If you have a Natural Language Understanding module It'd go here, so
49+
that you can parse unstructered data.
50+
51+
Our operations are ``get_room_id, stream_chat, send_message and interpret_message``.
52+
All of them use at least one config. The common config being INISecretConfig which
53+
loads secret token and bot name from the ini config file.
54+
55+
.. literalinclude:: /../examples/dataflow/chatbot/configs.ini
56+
57+
Detour: What are imp_enter and ctx_enter?
58+
-----------------------------------------
59+
60+
.. code-block:: python
61+
62+
config_cls=GitterChannelConfig,
63+
imp_enter={"secret": lambda self: self.config.secret},
64+
ctx_enter={"sctx": lambda self: self.parent.secret()},
65+
66+
This piece of code in the op decorator tells that the operation will be using
67+
``GitterChannelConfig``. ``imp_enter`` and ``ctx_enter`` are basically shortcuts for
68+
the double context entry followed in dffml.
69+
70+
``"secret": lambda self: self.config.secret``: sets the ``secret`` attribute of parent
71+
to what is returned by the function; in this case it returns BaseSecret.
72+
73+
``"sctx": lambda self: self.parent.secret()``: calls the function and assigns the
74+
return value to ``sctx`` attribute.
75+
76+
So in the operation instead of
77+
78+
.. code-block:: python
79+
80+
with self.config.secret() as secret:
81+
with sctx as secret():
82+
sctx.call_a_method()
83+
84+
we can do
85+
86+
.. code-block:: python
87+
88+
self.sctx.call_a_method()
89+
90+
Running the dataflow
91+
--------------------
92+
93+
.. literalinclude:: /../examples/dataflow/chatbot/run.py
94+
95+
set the room name, config file name and run the dataflow
96+
97+
.. code-block:: console
98+
99+
python run.py
100+
101+
Or using the command line to, create the dataflow
102+
103+
.. code-block:: console
104+
105+
dffml dataflow create \
106+
operations:get_room_id \
107+
operations:stream_chat \
108+
operations:send_message \
109+
operations:interpret_message \
110+
-config \
111+
ini=operations:get_room_id.secret.plugin \
112+
configs.ini=operations:get_room_id.secret.config.filename \
113+
ini=operations:stream_chat.secret.plugin \
114+
configs.ini=operations:stream_chat.secret.config.filename \
115+
ini=operations:send_message.secret.plugin \
116+
configs.ini=operations:send_message.secret.config.filename \
117+
ini=operations:interpret_message.secret.plugin \
118+
configs.ini=operations:interpret_message.secret.config.filename \
119+
> chatbot_df.json
120+
121+
And run it by providing the ``room_name`` as the input
122+
123+
.. code-block:: console
124+
125+
dffml dataflow run records all \
126+
-dataflow ./chatbot_df.json \
127+
-inputs test_community1/community=room_name \
128+
-sources m=memory \
129+
-source-records temp

docs/tutorials/dataflows/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ Here we have some examples to better understand the DFFML DataFlows.
99

1010
locking
1111
io
12+
chatbot
1213
nlp
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[secrets]
2+
access_token = EnterAccessToken
3+
botname = UserNameOfBot
4+
api_url = https://api.gitter.im/v1
5+
stream_url = https://stream.gitter.im/v1
6+
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
import io
2+
import re
3+
import sys
4+
import json
5+
import tempfile
6+
import contextlib
7+
from aiohttp import ClientSession, ClientTimeout
8+
9+
from dffml.cli.cli import CLI
10+
from dffml import op, config, Definition, BaseSecret
11+
12+
ACCESSTOKEN = Definition(name="access_tok3n", primitive="str")
13+
ROOMNAME = Definition(name="room_name", primitive="str")
14+
ROOMID = Definition(name="room_id", primitive="str")
15+
MESSAGE = Definition(name="message", primitive="str")
16+
TOSEND = Definition(name="to_send", primitive="str")
17+
18+
19+
@config
20+
class GitterChannelConfig:
21+
secret: BaseSecret
22+
23+
24+
@op(
25+
inputs={"room_uri": ROOMNAME},
26+
outputs={"room_id": ROOMID},
27+
config_cls=GitterChannelConfig,
28+
imp_enter={
29+
"secret": lambda self: self.config.secret,
30+
"session": lambda self: ClientSession(trust_env=True),
31+
},
32+
ctx_enter={"sctx": lambda self: self.parent.secret()},
33+
)
34+
async def get_room_id(self, room_uri):
35+
# Get unique roomid from room uri
36+
access_token = await self.sctx.get("access_token")
37+
headers = {
38+
"Content-Type": "application/json",
39+
"Accept": "application/json",
40+
"Authorization": f"Bearer {access_token}",
41+
}
42+
43+
api_url = await self.sctx.get("api_url")
44+
url = f"{api_url}/rooms"
45+
async with self.parent.session.post(
46+
url, json={"uri": room_uri}, headers=headers
47+
) as resp:
48+
response = await resp.json()
49+
return {"room_id": response["id"]}
50+
51+
52+
@op(
53+
inputs={"room_id": ROOMID},
54+
outputs={"message": MESSAGE},
55+
config_cls=GitterChannelConfig,
56+
imp_enter={
57+
"secret": lambda self: self.config.secret,
58+
"session": lambda self: ClientSession(
59+
trust_env=True, timeout=ClientTimeout(total=None)
60+
),
61+
},
62+
ctx_enter={"sctx": lambda self: self.parent.secret()},
63+
)
64+
async def stream_chat(self, room_id):
65+
# Listen to messages in room
66+
access_token = await self.sctx.get("access_token")
67+
headers = {
68+
"Accept": "application/json",
69+
"Authorization": f"Bearer {access_token}",
70+
}
71+
stream_url = await self.sctx.get("stream_url")
72+
73+
url = f"{stream_url}/rooms/{room_id}/chatMessages"
74+
botname = await self.sctx.get("botname")
75+
76+
async with self.parent.session.get(url, headers=headers) as resp:
77+
async for data in resp.content:
78+
# Gitter sends " \n" at some intervals
79+
if data == " \n".encode():
80+
continue
81+
print(f"\n\n Got data {data} \n\n")
82+
data = json.loads(data.strip())
83+
message = data["text"]
84+
# Only listen to messages directed to bot
85+
if f"@{botname}" not in message:
86+
continue
87+
yield {"message": message}
88+
89+
90+
@op(
91+
inputs={"message": TOSEND, "room_id": ROOMID},
92+
config_cls=GitterChannelConfig,
93+
imp_enter={
94+
"secret": lambda self: self.config.secret,
95+
"session": lambda self: ClientSession(trust_env=True),
96+
},
97+
ctx_enter={"sctx": lambda self: self.parent.secret()},
98+
)
99+
async def send_message(self, message, room_id):
100+
access_token = await self.sctx.get("access_token")
101+
headers = {
102+
"Content-Type": "application/json",
103+
"Accept": "application/json",
104+
"Authorization": f"Bearer {access_token}",
105+
}
106+
try:
107+
message = json.loads(message)
108+
message = json.dumps(message, indent=4, sort_keys=True)
109+
except:
110+
pass
111+
112+
# For new line we need \\n,else Gitter api
113+
# responds with 'Bad Request'
114+
message = message.replace("\n", "\\n")
115+
api_url = await self.sctx.get("api_url")
116+
url = f"{api_url}/rooms/{room_id}/chatMessages"
117+
118+
async with self.parent.session.post(
119+
url, headers=headers, json={"text": message}
120+
) as resp:
121+
response = await resp.json()
122+
return
123+
124+
125+
@op(
126+
inputs={"message": MESSAGE,},
127+
outputs={"message": TOSEND},
128+
config_cls=GitterChannelConfig,
129+
imp_enter={"secret": lambda self: self.config.secret},
130+
ctx_enter={"sctx": lambda self: self.parent.secret()},
131+
)
132+
async def interpret_message(self, message):
133+
greet = ["hey", "hello", "hi"]
134+
for x in greet:
135+
if x in message.lower():
136+
return {"message": "Hey Hooman ฅ^•ﻌ•^ฅ"}
137+
138+
def extract_data(raw_data):
139+
raw_data = raw_data.split("data:")
140+
data = {"model-data": raw_data[1]}
141+
raw_data = raw_data[0].split("\n")
142+
for x in raw_data:
143+
k, *v = x.split(":")
144+
if isinstance(v, list): # for features
145+
v = ":".join(v)
146+
k = k.strip()
147+
v = v.strip()
148+
if k: # avoid blank
149+
data[k] = v
150+
return data
151+
152+
# Removing username from message
153+
# The regex matches @ followed by anything that
154+
# is not a whitespace in the first group and
155+
# the rest of the string in the second group.
156+
# We replace the string by the second group.
157+
message = re.sub(r"(@[^\s]+)(.*)", r"\2", message).strip()
158+
159+
if message.lower().startswith("train model"):
160+
return {"message": "Gimme more details!!"}
161+
162+
elif message.lower().startswith("predict:"):
163+
# Only replace first occurence of predict
164+
# because the feature to predict will be labeled predict
165+
raw_data = message.replace("predict:", "", 1).strip()
166+
cmds = ["predict", "all"]
167+
168+
elif message.lower().startswith("details:"):
169+
raw_data = message.replace("details:", "",).strip()
170+
cmds = ["train"]
171+
172+
else:
173+
return {"message": " Oops ,I didnt get that ᕙ(⇀‸↼‶)ᕗ "}
174+
175+
# If predict or train, extract data
176+
data = extract_data(raw_data)
177+
if "model-type" in data:
178+
model_type = data["model-type"]
179+
if "model-name" in data:
180+
model_name = data["model-name"]
181+
else:
182+
model_name = "mymodel"
183+
184+
features = data["features"].split(" ")
185+
predict = data["predict"]
186+
model_data = data["model-data"]
187+
188+
with tempfile.NamedTemporaryFile(suffix=".csv") as fileobj:
189+
fileobj.write(model_data.lstrip().encode())
190+
fileobj.seek(0)
191+
192+
stdout = io.StringIO()
193+
with contextlib.redirect_stdout(stdout):
194+
preds = await CLI.cli(
195+
*cmds,
196+
"-model",
197+
model_type,
198+
"-model-directory",
199+
model_name,
200+
"-model-features",
201+
*features,
202+
"-model-predict",
203+
predict,
204+
"-sources",
205+
"f=csv",
206+
"-source-filename",
207+
fileobj.name,
208+
)
209+
sys.stdout.flush()
210+
211+
if "train" in cmds:
212+
return {"message": "Done!!"}
213+
else:
214+
m = {}
215+
for pred in preds:
216+
pred = pred.predictions()
217+
m.update({p: pred[p]["value"] for p in pred})
218+
message = [f"{k}: {v}" for k, v in m.items()]
219+
message = "\n".join(message)
220+
return {"message": message}

0 commit comments

Comments
 (0)