forked from pablomarin/RAG-Agents-Accelerator
-
Notifications
You must be signed in to change notification settings - Fork 571
Expand file tree
/
Copy pathbot.py
More file actions
186 lines (144 loc) · 8.21 KB
/
bot.py
File metadata and controls
186 lines (144 loc) · 8.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
import re
import asyncio
import random
import requests
import json
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional, Union
from langchain_openai import AzureChatOpenAI
from langchain_community.utilities import BingSearchAPIWrapper
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.runnables import ConfigurableField, ConfigurableFieldSpec
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory, CosmosDBChatMessageHistory
from langchain.agents import ConversationalChatAgent, AgentExecutor, Tool
from langchain.callbacks.base import BaseCallbackHandler
from langchain.callbacks.manager import CallbackManager
from langchain.schema import AgentAction, AgentFinish, LLMResult
from langchain_core.runnables.history import RunnableWithMessageHistory
#custom libraries that we will use later in the app
from common.utils import (
DocSearchAgent,
CSVTabularAgent,
SQLSearchAgent,
ChatGPTTool,
BingSearchAgent
)
from common.prompts import CUSTOM_CHATBOT_PROMPT, WELCOME_MESSAGE
from botbuilder.core import ActivityHandler, TurnContext
from botbuilder.schema import ChannelAccount, Activity, ActivityTypes
# Env variables needed by langchain
os.environ["OPENAI_API_VERSION"] = os.environ.get("AZURE_OPENAI_API_VERSION")
# Callback hanlder used for the bot service to inform the client of the thought process before the final response
class BotServiceCallbackHandler(BaseCallbackHandler):
"""Callback handler to use in Bot Builder Application"""
def __init__(self, turn_context: TurnContext) -> None:
self.tc = turn_context
async def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> Any:
await self.tc.send_activity(f"LLM Error: {error}\n")
async def on_tool_start(self, serialized: Dict[str, Any], input_str: str, **kwargs: Any) -> Any:
await self.tc.send_activity(f"Tool: {serialized['name']}")
async def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
await self.tc.send_activity(f"\u2611{action.log} ...")
await self.tc.send_activity(Activity(type=ActivityTypes.typing))
# Bot Class
class MyBot(ActivityHandler):
def __init__(self):
self.model_name = os.environ.get("AZURE_OPENAI_MODEL_NAME")
def get_session_history(self, session_id: str, user_id: str) -> CosmosDBChatMessageHistory:
cosmos = CosmosDBChatMessageHistory(
cosmos_endpoint=os.environ['AZURE_COSMOSDB_ENDPOINT'],
cosmos_database=os.environ['AZURE_COSMOSDB_NAME'],
cosmos_container=os.environ['AZURE_COSMOSDB_CONTAINER_NAME'],
connection_string=os.environ['AZURE_COMOSDB_CONNECTION_STRING'],
session_id=session_id,
user_id=user_id
)
# prepare the cosmosdb instance
cosmos.prepare_cosmos()
return cosmos
# Function to show welcome message to new users
async def on_members_added_activity(self, members_added: ChannelAccount, turn_context: TurnContext):
for member_added in members_added:
if member_added.id != turn_context.activity.recipient.id:
await turn_context.send_activity(WELCOME_MESSAGE)
# See https://aka.ms/about-bot-activity-message to learn more about the message and other activity types.
async def on_message_activity(self, turn_context: TurnContext):
# Extract info from TurnContext - You can change this to whatever , this is just one option
session_id = turn_context.activity.conversation.id
user_id = turn_context.activity.from_property.id + "-" + turn_context.activity.channel_id
input_text_metadata = dict()
# Check if local_timestamp exists and is not None before formatting it
input_text_metadata["local_timestamp"] = turn_context.activity.local_timestamp.strftime("%I:%M:%S %p, %A, %B %d of %Y") if turn_context.activity.local_timestamp else "Not Available"
# Check if local_timezone exists and is not None before assigning it
input_text_metadata["local_timezone"] = turn_context.activity.local_timezone if turn_context.activity.local_timezone else "Not Available"
# Check if locale exists and is not None before assigning it
input_text_metadata["locale"] = turn_context.activity.locale if turn_context.activity.locale else "Not Available"
# Setting the query to send to OpenAI
input_text = turn_context.activity.text + "\n\n metadata:\n" + str(input_text_metadata)
# Set Callback Handler
cb_handler = BotServiceCallbackHandler(turn_context)
cb_manager = CallbackManager(handlers=[cb_handler])
# Set LLM
llm = AzureChatOpenAI(deployment_name=self.model_name, temperature=0,
max_tokens=1500, callback_manager=cb_manager, streaming=True)
# Initialize our Tools/Experts
doc_indexes = ["srch-index-files", "srch-index-csv"]
doc_search = DocSearchAgent(llm=llm, indexes=doc_indexes,
k=6, reranker_th=1,
sas_token=os.environ['BLOB_SAS_TOKEN'],
name="docsearch",
description="useful when the questions includes the term: docsearch",
callback_manager=cb_manager, verbose=False)
book_indexes = ["srch-index-books"]
book_search = DocSearchAgent(llm=llm, indexes=book_indexes,
k=6, reranker_th=1,
sas_token=os.environ['BLOB_SAS_TOKEN'],
name="booksearch",
description="useful when the questions includes the term: booksearch",
callback_manager=cb_manager, verbose=False)
www_search = BingSearchAgent(llm=llm, k=10, callback_manager=cb_manager,
name="bing",
description="useful when the questions includes the term: bing")
sql_search = SQLSearchAgent(llm=llm, k=30, callback_manager=cb_manager,
name="sqlsearch",
description="useful when the questions includes the term: sqlsearch",
verbose=False)
chatgpt_search = ChatGPTTool(llm=llm, callback_manager=cb_manager,
name="chatgpt",
description="useful when the questions includes the term: chatgpt",
verbose=False)
tools = [doc_search, book_search, www_search, sql_search, chatgpt_search]
agent = create_openai_tools_agent(llm, tools, CUSTOM_CHATBOT_PROMPT)
agent_executor = AgentExecutor(agent=agent, tools=tools)
brain_agent_executor = RunnableWithMessageHistory(
agent_executor,
self.get_session_history,
input_messages_key="question",
history_messages_key="history",
history_factory_config=[
ConfigurableFieldSpec(
id="user_id",
annotation=str,
name="User ID",
description="Unique identifier for the user.",
default="",
is_shared=True,
),
ConfigurableFieldSpec(
id="session_id",
annotation=str,
name="Session ID",
description="Unique identifier for the conversation.",
default="",
is_shared=True,
),
],
)
config={"configurable": {"session_id": session_id, "user_id": user_id}}
await turn_context.send_activity(Activity(type=ActivityTypes.typing))
answer = (await brain_agent_executor.ainvoke({"question": input_text}, config=config))["output"]
await turn_context.send_activity(answer)