11import { Server } from 'socket.io' ;
22import { createClient } from 'redis' ;
33import { createServer } from 'http' ;
4+ import { v4 as uuidv4 } from 'uuid' ;
45
56// Redis client configuration
67const redisClient = createClient ( {
78 url : 'redis://localhost:6379/0'
89} ) ;
910
10- // Allowed channels for Redis pub/sub
11- const allowedChannels = [ 'Scene:Jack' , 'Scene:Jane' , 'Human:Jack' , 'Jack:Human' , 'Agent:Runtime' , 'Runtime:Agent' ] ;
11+ // // Allowed channels for Redis pub/sub
12+ // const allowedChannels = ['Scene:Jack', 'Scene:Jane', 'Human:Jack', 'Jack:Human', 'Agent:Runtime', 'Runtime:Agent'];
1213
1314// Connect Redis client
1415redisClient . on ( 'error' , ( err ) => {
@@ -37,17 +38,75 @@ const init = async () => {
3738 const subscriber = redisClient . duplicate ( ) ;
3839 await subscriber . connect ( ) ;
3940
40- await subscriber . subscribe ( allowedChannels , ( message , channel ) => {
41- console . log ( `Received message from ${ channel } : ${ message } ` ) ;
42- io . emit ( 'new_message' , { channel, message } ) ;
43- } ) ;
41+ // await subscriber.subscribe(allowedChannels, (message, channel) => {
42+ // console.log(`Received message from ${channel}: ${message}`);
43+ // io.emit('new_message', { channel, message });
44+ // });
45+
46+ // Store active sessions and their Redis channels
47+ const activeSessions = { } ;
48+
49+ const getAllowedChannels = ( sessionId , sessionType ) => {
50+ if ( sessionType === 'Human/AI' ) {
51+ return [
52+ `Scene:Jack:${ sessionId } ` ,
53+ `Scene:Jane:${ sessionId } ` ,
54+ `Human:Jack:${ sessionId } ` ,
55+ `Jack:Human:${ sessionId } ` ,
56+ `Agent:Runtime:${ sessionId } ` ,
57+ `Runtime:Agent:${ sessionId } ` ,
58+ ] ;
59+ }
60+ return [
61+ `Human:Jack:${ sessionId } ` ,
62+ `Jack:Human:${ sessionId } ` ,
63+ `Agent:Runtime:${ sessionId } ` ,
64+ `Runtime:Agent:${ sessionId } ` ,
65+ ] ;
66+ } ;
4467
4568 // Socket.IO connection handling
4669 io . on ( 'connection' , ( socket ) => {
70+
4771 console . log ( 'A user connected' ) ;
72+ // const socketState = {
73+ // currentSessionId: null
74+ // };
75+
76+ socket . on ( 'create_session' , async ( { sessionType} , callback ) => {
77+ const sessionId = uuidv4 ( ) ;
78+ const channels = getAllowedChannels ( sessionId , sessionType )
79+ activeSessions [ sessionId ] = { channels, sessionType}
80+
81+ console . log ( `New session created: ${ sessionId } , Type: ${ sessionType } ` ) ;
82+
83+ await subscriber . subscribe ( channels , ( message , channels ) => {
84+ console . log ( `Received message from ${ channels } : ${ message } ` ) ;
85+ io . to ( sessionId ) . emit ( 'new_message' , { channels, message } ) ;
86+ } )
4887
49- socket . on ( 'chat_message' , async ( message ) => {
50- console . log ( 'Received chat message:' , message ) ;
88+ callback ( { sessionId } ) ;
89+
90+ socket . join ( sessionId ) ;
91+ } ) ;
92+
93+ // Join an existing session
94+ socket . on ( 'join_session' , async ( { sessionId } , callback ) => {
95+ if ( ! activeSessions [ sessionId ] ) {
96+ callback ( { success : false , error : 'Session does not exist' } ) ;
97+ return ;
98+ }
99+
100+ console . log ( `User joined session: ${ sessionId } ` ) ;
101+ socket . join ( sessionId ) ;
102+ // socketState.currentSessionId = sessionId;
103+ callback ( { success : true } ) ;
104+ } ) ;
105+
106+ socket . on ( 'chat_message' , async ( { sessionId, message} ) => {
107+ if ( ! sessionId ) return ;
108+
109+ console . log ( `Chat message in session ${ sessionId } :` , message ) ;
51110 try {
52111 const agentAction = {
53112 data : {
@@ -58,14 +117,16 @@ const init = async () => {
58117 data_type : "agent_action"
59118 }
60119 } ;
61- await redisClient . publish ( ' Human:Jack' , JSON . stringify ( agentAction ) ) ;
120+ await redisClient . publish ( ` Human:Jack: ${ sessionId } ` , JSON . stringify ( agentAction ) ) ;
62121 } catch ( err ) {
63122 console . error ( 'Error publishing chat message:' , err ) ;
64123 }
65124 } ) ;
66125
67- socket . on ( 'save_file' , async ( { path, content } ) => {
68- console . log ( 'Saving file:' , path ) ;
126+ socket . on ( 'save_file' , async ( { sessionId, path, content } ) => {
127+ if ( ! sessionId ) return ;
128+
129+ console . log ( `Saving file in session ${ sessionId } :` , path ) ;
69130 try {
70131 const saveMessage = {
71132 data : {
@@ -76,14 +137,16 @@ const init = async () => {
76137 data_type : "agent_action"
77138 }
78139 } ;
79- await redisClient . publish ( ' Agent:Runtime' , JSON . stringify ( saveMessage ) ) ;
140+ await redisClient . publish ( ` Agent:Runtime: ${ sessionId } ` , JSON . stringify ( saveMessage ) ) ;
80141 } catch ( err ) {
81142 console . error ( 'Error publishing save file message:' , err ) ;
82143 }
83144 } ) ;
84145
85- socket . on ( 'terminal_command' , async ( command ) => {
86- console . log ( 'Received terminal command:' , command ) ;
146+ socket . on ( 'terminal_command' , async ( { sessionId, command } ) => {
147+ if ( ! sessionId ) return ;
148+ console . log ( `Terminal command in session ${ sessionId } :` , command ) ;
149+
87150 try {
88151 const messageEnvelope = {
89152 data : {
@@ -94,11 +157,11 @@ const init = async () => {
94157 data_type : "agent_action"
95158 }
96159 } ;
97- await redisClient . publish ( ' Agent:Runtime' , JSON . stringify ( messageEnvelope ) ) ;
160+ await redisClient . publish ( ` Agent:Runtime: ${ sessionId } ` , JSON . stringify ( messageEnvelope ) ) ;
98161 } catch ( err ) {
99162 console . error ( 'Error publishing command:' , err ) ;
100163 socket . emit ( 'new_message' , {
101- channel : ' Runtime:Agent' ,
164+ channel : ` Runtime:Agent: ${ sessionId } ` ,
102165 message : JSON . stringify ( {
103166 data : {
104167 data_type : "text" ,
@@ -110,14 +173,16 @@ const init = async () => {
110173 } ) ;
111174
112175 // Handle process initialization
113- socket . on ( 'init_process' , async ( ) => {
114- console . log ( 'Received init_process request' ) ;
176+ socket . on ( 'init_process' , async ( sessionId ) => {
177+ if ( ! sessionId ) return ;
178+
179+ console . log ( `Initializing process in session ${ sessionId } ` ) ;
115180 try {
116181 const initParams = {
117182 node_name : "openhands_node" ,
118- input_channels : [ " Agent:Runtime" ] ,
119- output_channels : [ " Runtime:Agent" ] ,
120- modal_session_id : "arpan"
183+ input_channels : [ ` Agent:Runtime: ${ sessionId } ` ] ,
184+ output_channels : [ ` Runtime:Agent: ${ sessionId } ` ] ,
185+ modal_session_id : sessionId
121186 } ;
122187
123188 const response = await fetch ( 'http://localhost:5000/initialize' , {
@@ -134,7 +199,8 @@ const init = async () => {
134199 const result = await response . json ( ) ;
135200
136201 if ( result . status === 'initialized' ) {
137- socket . emit ( 'init_process_result' , { success : true } ) ;
202+ socket . emit ( 'init_process_result' , { success : true , sessionId : sessionId } ) ;
203+ // callback({ success: true });
138204 console . log ( 'OpenHands initialized successfully' ) ;
139205 } else {
140206 throw new Error ( `Unexpected initialization status: ${ result . status } ` ) ;
@@ -145,9 +211,27 @@ const init = async () => {
145211 success : false ,
146212 error : err . message
147213 } ) ;
214+ // callback({ success: false, error: err.message });
148215 }
149216 } ) ;
150217
218+ // Stop/Kill a session
219+ socket . on ( 'kill_session' , async ( { sessionId } , callback ) => {
220+ if ( ! activeSessions [ sessionId ] ) {
221+ callback ( { success : false , error : 'Session does not exist' } ) ;
222+ return ;
223+ }
224+
225+ console . log ( `Killing session: ${ sessionId } ` ) ;
226+ const { channels } = activeSessions [ sessionId ] ;
227+ await subscriber . unsubscribe ( channels ) ;
228+
229+ io . to ( sessionId ) . emit ( 'session_terminated' ) ;
230+ delete activeSessions [ sessionId ] ;
231+
232+ callback ( { success : true } ) ;
233+ } ) ;
234+
151235 socket . on ( 'disconnect' , ( ) => {
152236 console . log ( 'A user disconnected' ) ;
153237 } ) ;
0 commit comments