-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathapp.py
More file actions
219 lines (202 loc) · 7 KB
/
app.py
File metadata and controls
219 lines (202 loc) · 7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from navconfig.logging import logging
from navigator.handlers.types import AppHandler
# Tasker:
from navigator.background import BackgroundQueue
from navigator_auth import AuthHandler
from querysource.services import QuerySource
from parrot.scheduler import AgentSchedulerManager
from parrot.manager import BotManager
from parrot.conf import STATIC_DIR
from parrot.handlers.bots import (
FeedbackTypeHandler,
ChatbotFeedbackHandler,
PromptLibraryManagement,
ChatbotUsageHandler,
ChatbotSharingQuestion,
ToolList
)
from parrot.handlers.chat import (
BotManagement
)
# from parrot.handlers.o365_auth import (
# O365InteractiveAuthSessions,
# O365InteractiveAuthSessionDetail,
# )
# from parrot.services.mcp import ParrotMCPServer
# from parrot.tools.workday import WorkdayToolkit
# from parrot.services.o365_remote_auth import RemoteAuthManager
from parrot.handlers.jobs.worker import configure_redis_queue, configure_job_manager
from parrot.handlers.user import UserSocketManager
from parrot.handlers.llm import LLMClient
from parrot.handlers.google_generation import GoogleGeneration
from parrot.handlers.programs import ProgramsUserHandler
## New Handlers:
from parrot.handlers.video_reel import VideoReelHandler
from parrot.handlers.lyria_music import LyriaMusicHandler
class Main(AppHandler):
"""
Main App Handler for Parrot Application.
"""
app_name: str = 'Parrot'
enable_static: bool = True
enable_pgpool: bool = True
staticdir: str = STATIC_DIR
def _configure_logging(self):
"""Configuración explícita de logging para aiohttp server."""
# Obtener el logger raíz
root_logger = logging.getLogger()
# Limpiar handlers existentes si los hay
root_logger.handlers.clear()
# Configurar el nivel del logger raíz
root_logger.setLevel(logging.DEBUG)
def configure(self):
super(Main, self).configure()
# Tasker: Background Task Manager:
tasker = BackgroundQueue(
app=self.app,
max_workers=5,
queue_size=5
)
# Loading QUerySource
qry = QuerySource(
lazy=False,
loop=self.event_loop()
)
qry.setup(self.app)
# Chatbot System
self.bot_manager = BotManager()
self.bot_manager.setup(self.app)
# Scheduler Manager (after bot manager):
self._scheduler = AgentSchedulerManager(bot_manager=self.bot_manager)
self._scheduler.setup(app=self.app)
# Configure Redis RQ Queue for jobs
configure_redis_queue(self.app)
# Configure Job Manager
configure_job_manager(self.app)
# API of feedback types:
self.app.router.add_view(
'/api/v1/feedback_types/{feedback_type}',
FeedbackTypeHandler
)
ChatbotFeedbackHandler.configure(self.app, '/api/v1/bot_feedback')
# Prompt Library:
PromptLibraryManagement.configure(self.app, '/api/v1/chatbots/prompt_library')
# Questions (Usage handler, for sharing)
ChatbotUsageHandler.configure(self.app, '/api/v1/chatbots_usage')
self.app.router.add_view(
'/api/v1/chatbots/questions/{sid}',
ChatbotSharingQuestion
)
# Install Bot Management
BotManagement.setup(self.app, r'/api/v1/bot_management{slash:/?}{bot:[^/]*}')
# Tools List
self.app.router.add_view(
'/api/v1/agent_tools',
ToolList,
name='tools_list'
)
# # Office 365 delegated authentication endpoints
# self.app['o365_auth_manager'] = RemoteAuthManager()
# self.app.router.add_view(
# '/api/v1/o365/auth/sessions',
# O365InteractiveAuthSessions,
# name='o365_auth_sessions'
# )
# self.app.router.add_view(
# '/api/v1/o365/auth/sessions/{session_id}',
# O365InteractiveAuthSessionDetail,
# name='o365_auth_session_detail'
# )
# # Example Async View for Queue:
# self.app.router.add_view(
# '/api/v1/example_async',
# ExampleAsyncView,
# name='example_async'
# )
# ## NextStop
# nextstop = NextStopAgent(app=self.app)
# nextstop.setup(self.app, '/api/v1/agents/nextstop')
# # MCP server lifecycle management
# mcp_server = ParrotMCPServer(
# transports=["sse", "http", "websocket"],
# tools=WorkdayToolkit(redis_url="redis://localhost:6379/4")
# )
# mcp_server.setup(self.app)
# LLM Client Routes
self.app.router.add_view(
'/api/v1/ai/client',
LLMClient,
name='llm_client'
)
self.app.router.add_view(
'/api/v1/ai/client/{client_name}',
LLMClient,
name='llm_client_detail'
)
self.app.router.add_view(
'/api/v1/ai/clients',
LLMClient,
name='llm_clients_list'
)
self.app.router.add_view(
'/api/v1/ai/clients/models',
LLMClient,
name='llm_clients_models'
)
self.app.router.add_view(
'/api/v1/ai/google/generation',
GoogleGeneration,
name='google_generation'
)
ws = UserSocketManager(
self.app,
route_prefix="/ws/userinfo",
redis_url="redis://localhost:6379/4",
default_channels=["information", "following"]
)
self.app['user_socket_manager'] = ws
# Programs API
self.app.router.add_view(
'/api/v1/programs_user',
ProgramsUserHandler,
name='programs_user'
)
## implement Video Reel Handler:
VideoReelHandler.setup(self.app)
# Lyria:
self.app.router.add_view(
"/api/v1/google/generation/music", LyriaMusicHandler
)
### Auth System
# create a new instance of Auth System
auth = AuthHandler()
auth.setup(self.app) # configure this Auth system into App.
async def on_prepare(self, request, response):
"""
on_prepare.
description: Signal for customize the response while is prepared.
"""
async def pre_cleanup(self, app):
"""
pre_cleanup.
description: Signal for running tasks before on_cleanup/shutdown App.
"""
async def on_cleanup(self, app):
"""
on_cleanup.
description: Signal for customize the response when server is closing
"""
async def on_startup(self, app):
"""
on_startup.
description: Signal for customize the response when server is started
"""
app['websockets'] = []
async def on_shutdown(self, app):
"""
on_shutdown.
description: Signal for customize the response when server is shutting down
"""
manager = app.get('o365_auth_manager')
if manager:
await manager.shutdown()