2
2
#
3
3
# BSD 3-Clause License
4
4
5
- # Copyright (c) 2021-2024 Datalayer, Inc.
6
- #
7
- # Datalayer License
8
-
9
5
from __future__ import annotations
10
6
11
7
import json
12
8
import logging
13
9
14
- from pydantic import BaseModel
15
- from typing import Optional
16
-
17
10
from concurrent import futures
18
11
from concurrent .futures import as_completed
19
12
20
13
from anyio import create_task_group , sleep
21
14
from anyio .from_thread import start_blocking_portal
22
15
23
- from jupyter_kernel_client import KernelClient
24
-
25
16
from jupyter_server .utils import url_path_join
26
17
from jupyter_server .base .handlers import APIHandler
27
18
28
- from jupyter_ai_agents import __version__
19
+ from jupyter_kernel_client import KernelClient
20
+
21
+ from jupyter_ai_agents .handlers .agents .manager import AIAgentsManager
29
22
from jupyter_ai_agents .agents .prompt import PromptAgent
30
- from jupyter_ai_agents .handlers . agents . agents import AIAgentsManager
23
+ from jupyter_ai_agents .models import AgentRequestModel
31
24
from jupyter_ai_agents .utils import http_to_ws
25
+ from jupyter_ai_agents import __version__
32
26
33
27
34
28
logger = logging .getLogger (__name__ )
35
29
30
+
36
31
EXECUTOR = futures .ThreadPoolExecutor (8 )
37
32
38
- MANAGER = None
33
+ AI_AGENTS_MANAGER : AIAgentsManager | None = None
39
34
40
- ROOMS = {}
35
+ # COLLABORATION_ROOMS = {}
41
36
42
37
43
38
def prompt_ai_agent (room_id , jupyter_ingress , jupyter_token , kernel_id ):
44
39
async def long_running_prompt ():
45
- global MANAGER
40
+ global AI_AGENTS_MANAGER
46
41
room_ws_url = http_to_ws (url_path_join (jupyter_ingress , "/api/collaboration/room" , room_id ))
47
42
logger .info ("AI Agent will connect to room [%s]…" , room_ws_url )
48
43
has_runtime = jupyter_ingress and jupyter_token and kernel_id
@@ -56,55 +51,46 @@ async def long_running_prompt():
56
51
log = logger ,
57
52
)
58
53
logger .info ("Starting AI Agent for room [%s]…" , room_id )
59
- async def sometask () -> None :
60
- print ( 'Task running ' )
54
+ async def prompt_task () -> None :
55
+ logger . info ( 'Starting Prompt Agent. ' )
61
56
await prompt_agent .start ()
62
57
if prompt_agent .runtime_client is not None :
63
58
prompt_agent .runtime_client .start ()
64
- print ( 'Task finished ' )
59
+ logger . info ( 'Prompt Agent is started. ' )
65
60
async with create_task_group () as tg :
66
- tg .start_soon (sometask )
67
- await sleep (20 )
68
- # await MANAGER.track_agent(room_id, prompt_agent)
69
- print ('Task running...' )
70
- return 'Task return value'
61
+ tg .start_soon (prompt_task )
62
+ AI_AGENTS_MANAGER .register_ai_agent (room_id , prompt_agent )
63
+ # Sleep forever to keep the ai agent alive.
64
+ # TODO Replace with AI_AGENTS_MANAGER
65
+ while True :
66
+ await sleep (10 )
67
+ # await AI_AGENTS_MANAGER.track_agent(room_id, prompt_agent)
68
+ return 'Prompt task is finished.'
71
69
with start_blocking_portal () as portal :
72
70
futures = [portal .start_task_soon (long_running_prompt )]
73
71
for future in as_completed (futures ):
74
- print (future .result ())
75
-
76
-
77
- class RuntimeModel (BaseModel ):
78
- ingress : Optional [str ] = None
79
- token : Optional [str ] = None
80
- kernel_id : Optional [str ] = None
81
- jupyter_pod_name : Optional [str ] = None
82
-
83
-
84
- class AgentRequestModel (BaseModel ):
85
- room_id : Optional [str ] = None
86
- runtime : Optional [RuntimeModel ] = None
72
+ logger .info ("Future is completed with result [%s]" , future .result ())
87
73
88
74
89
- class AIAgentHandler (APIHandler ):
75
+ class AIAgentsInstanceHandler (APIHandler ):
90
76
91
77
# @web.authenticated
92
78
async def get (self , matched_part = None , * args , ** kwargs ):
93
- global MANAGER
94
- if MANAGER is None :
95
- MANAGER = AIAgentsManager ()
79
+ global AI_AGENTS_MANAGER
80
+ if AI_AGENTS_MANAGER is None :
81
+ AI_AGENTS_MANAGER = AIAgentsManager ()
96
82
self .write ({
97
83
"success" : True ,
98
84
"matched_part" : matched_part ,
99
85
})
100
86
101
87
# @web.authenticated
102
88
async def post (self , matched_part = None , * args , ** kwargs ):
103
- global MANAGER
104
- if MANAGER is None :
105
- MANAGER = AIAgentsManager ()
89
+ global AI_AGENTS_MANAGER
90
+ if AI_AGENTS_MANAGER is None :
91
+ AI_AGENTS_MANAGER = AIAgentsManager ()
106
92
body_data = json .loads (self .request .body )
107
- print ( body_data )
93
+ logger . info ( "Body data" , body_data )
108
94
self .write ({
109
95
"success" : True ,
110
96
"matched_part" : matched_part ,
@@ -115,26 +101,26 @@ class AIAgentsHandler(APIHandler):
115
101
116
102
# @web.authenticated
117
103
async def get (self , * args , ** kwargs ):
118
- global MANAGER
119
- if MANAGER is None :
120
- MANAGER = AIAgentsManager ()
104
+ global AI_AGENTS_MANAGER
105
+ if AI_AGENTS_MANAGER is None :
106
+ AI_AGENTS_MANAGER = AIAgentsManager ()
121
107
self .write ({
122
108
"success" : True ,
123
109
})
124
110
125
111
# @web.authenticated
126
112
async def post (self , * args , ** kwargs ):
127
113
"""Endpoint creating an AI Agent for a given room."""
128
- global MANAGER
129
- if MANAGER is None :
130
- MANAGER = AIAgentsManager ()
114
+ global AI_AGENTS_MANAGER
115
+ if AI_AGENTS_MANAGER is None :
116
+ AI_AGENTS_MANAGER = AIAgentsManager ()
131
117
request_body = json .loads (self .request .body )
132
118
agent_request = AgentRequestModel (** request_body )
133
- self .log .info ("Create AI Agents is requested [%s]" , agent_request .model_dump ())
119
+ self .log .info ("AI Agents create handler requested with [%s]" , agent_request .model_dump ())
134
120
room_id = agent_request .room_id
135
- if room_id in MANAGER :
121
+ if room_id in AI_AGENTS_MANAGER :
136
122
self .log .info ("AI Agent for room [%s] already exists." , room_id )
137
- # TODO check agent
123
+ # TODO check the ai agent.
138
124
return {
139
125
"success" : True ,
140
126
"message" : "AI Agent already exists" ,
@@ -145,11 +131,11 @@ async def post(self, *args, **kwargs):
145
131
jupyter_ingress = runtime .ingress
146
132
jupyter_token = runtime .token
147
133
kernel_id = runtime .kernel_id
148
- # Start AI Agent
134
+ # Start AI Agent in a ThreadPoolExecutor.
149
135
EXECUTOR .submit (prompt_ai_agent , room_id , jupyter_ingress , jupyter_token , kernel_id )
150
136
res = json .dumps ({
151
137
"success" : True ,
152
- "message" : f"AI Agent started for room '{ room_id } '." ,
138
+ "message" : f"AI Agent is started for room '{ room_id } '." ,
153
139
})
154
- print ( res )
140
+ logger . info ( "AI Agent create request exiting with reponse [%s]" , res )
155
141
self .finish (res )
0 commit comments