Skip to content

Commit 12737ff

Browse files
committed
updated the library
1 parent 85e415a commit 12737ff

22 files changed

+712
-1132
lines changed

src/core/messageManager.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import WebSocket from 'ws';
2+
import { EventEmitter } from 'events';
3+
4+
export interface PendingRequest {
5+
resolve: (value: any) => void;
6+
reject: (reason?: any) => void;
7+
messageTypes: string[];
8+
requestId?: string;
9+
}
10+
11+
/**
12+
* Centralized message manager for handling WebSocket communications
13+
*/
14+
export class MessageManager extends EventEmitter {
15+
public pendingRequests: Map<string, PendingRequest> = new Map();
16+
public websocket: WebSocket | null = null;
17+
public requestCounter = 0;
18+
19+
/**
20+
* Initialize the message manager with a WebSocket instance
21+
*/
22+
public initialize(websocket: WebSocket) {
23+
this.websocket = websocket;
24+
this.setupMessageListener();
25+
}
26+
27+
/**
28+
* Setup the centralized message listener
29+
*/
30+
public setupMessageListener() {
31+
if (!this.websocket) return;
32+
33+
this.websocket.on('message', (data: WebSocket.Data) => {
34+
try {
35+
const response = JSON.parse(data.toString());
36+
this.handleMessage(response);
37+
} catch (error) {
38+
console.error('Error parsing WebSocket message:', error);
39+
}
40+
});
41+
}
42+
43+
/**
44+
* Handle incoming messages and resolve pending requests
45+
*/
46+
public handleMessage(response: any) {
47+
const { type, requestId } = response;
48+
49+
// Handle requests with specific requestId
50+
if (requestId && this.pendingRequests.has(requestId)) {
51+
const request = this.pendingRequests.get(requestId)!;
52+
this.pendingRequests.delete(requestId);
53+
request.resolve(response);
54+
return;
55+
}
56+
57+
// Handle requests by message type
58+
for (const [id, request] of this.pendingRequests.entries()) {
59+
if (request.messageTypes.includes(type)) {
60+
this.pendingRequests.delete(id);
61+
request.resolve(response);
62+
return;
63+
}
64+
}
65+
66+
// Emit the message for any other listeners (like onUserMessage)
67+
this.emit('message', response);
68+
}
69+
70+
/**
71+
* Send a message and wait for a specific response type
72+
*/
73+
sendAndWaitForResponse<T = any>(
74+
message: any,
75+
expectedResponseType: string,
76+
timeout: number = 30000
77+
): Promise<T> {
78+
return new Promise((resolve, reject) => {
79+
if (!this.websocket) {
80+
reject(new Error('WebSocket is not initialized'));
81+
return;
82+
}
83+
84+
const requestId = `req_${++this.requestCounter}_${Date.now()}`;
85+
86+
// Add requestId to the message if it doesn't have one
87+
const messageWithId = { ...message, requestId };
88+
89+
// Parse multiple message types separated by pipe
90+
const messageTypes = expectedResponseType.split('|').map(type => type.trim());
91+
92+
// Store the pending request
93+
this.pendingRequests.set(requestId, {
94+
resolve,
95+
reject,
96+
messageTypes,
97+
requestId
98+
});
99+
100+
// Set timeout
101+
const timeoutId = setTimeout(() => {
102+
if (this.pendingRequests.has(requestId)) {
103+
this.pendingRequests.delete(requestId);
104+
reject(new Error(`Request timeout after ${timeout}ms for message types: ${expectedResponseType}`));
105+
}
106+
}, timeout);
107+
108+
// Override resolve to clear timeout
109+
const originalResolve = resolve;
110+
const wrappedResolve = (value: T) => {
111+
clearTimeout(timeoutId);
112+
originalResolve(value);
113+
};
114+
115+
// Update the stored request with wrapped resolve
116+
const request = this.pendingRequests.get(requestId)!;
117+
request.resolve = wrappedResolve;
118+
119+
// Send the message
120+
this.websocket.send(JSON.stringify(messageWithId));
121+
});
122+
}
123+
124+
/**
125+
* Send a message without waiting for response
126+
*/
127+
send(message: any): void {
128+
if (!this.websocket) {
129+
throw new Error('WebSocket is not initialized');
130+
}
131+
this.websocket.send(JSON.stringify(message));
132+
}
133+
134+
/**
135+
* Get the WebSocket instance
136+
*/
137+
getWebSocket(): WebSocket | null {
138+
return this.websocket;
139+
}
140+
141+
/**
142+
* Clean up all pending requests
143+
*/
144+
cleanup(): void {
145+
for (const [id, request] of this.pendingRequests.entries()) {
146+
request.reject(new Error('WebSocket connection closed'));
147+
}
148+
this.pendingRequests.clear();
149+
}
150+
}
151+
152+
export default new MessageManager();
Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import WebSocket from 'ws';
22
import fs from 'fs';
33
import yaml from 'js-yaml';
4+
import messageManager, { MessageManager } from './messageManager';
45

56
/**
67
* Class representing a WebSocket connection.
@@ -65,17 +66,14 @@ class cbws {
6566
});
6667

6768
this.websocket.on('open', () => {
68-
// if (this.websocket) {
69-
// this.websocket.send(JSON.stringify({
70-
// "type": "sendMessage",
71-
// "message": initialMessage
72-
// }));
73-
// resolve(this.websocket);
74-
// }
69+
// Initialize the message manager with this websocket
70+
messageManager.initialize(this.websocket);
71+
resolve(this.websocket);
7572
});
7673

77-
this.websocket.on('message', (data: WebSocket.Data) => {
78-
// Handle incoming WebSocket messages here.
74+
this.websocket.on('close', () => {
75+
// Clean up pending requests when connection closes
76+
messageManager.cleanup();
7977
});
8078
});
8179
}
@@ -92,6 +90,13 @@ class cbws {
9290
return this.websocket;
9391
}
9492
}
93+
94+
/**
95+
* Get the message manager instance
96+
*/
97+
get messageManager(): MessageManager {
98+
return messageManager;
99+
}
95100
}
96101

97102
export default new cbws();

src/index.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import cbws from './modules/websocket';
1+
import cbws from './core/websocket';
22
import cbfs from './modules/fs';
33
import cbllm from './modules/llm';
44
import cbterminal from './modules/terminal';
@@ -107,9 +107,8 @@ class Codebolt {
107107
*/
108108
onUserMessage(handler: (userMessage: any) => void | Promise<void> | any | Promise<any>) {
109109
const waitForConnection = () => {
110-
if (cbws.getWebsocket) {
111-
cbws.getWebsocket.on('message', async (data: string) => {
112-
const response = JSON.parse(data);
110+
if (cbws.messageManager) {
111+
const handleUserMessage = async (response: any) => {
113112
if (response.type === "messageResponse") {
114113
try {
115114
const result = await handler(response);
@@ -123,17 +122,19 @@ class Codebolt {
123122
message.message = result;
124123
}
125124

126-
cbws.getWebsocket.send(JSON.stringify(message));
125+
cbws.messageManager.send(message);
127126
} catch (error) {
128127
console.error('Error in user message handler:', error);
129128
// Send processStoped even if there's an error
130-
cbws.getWebsocket.send(JSON.stringify({
129+
cbws.messageManager.send({
131130
"type": "processStoped",
132131
"error": error instanceof Error ? error.message : "Unknown error occurred"
133-
}));
132+
});
134133
}
135134
}
136-
});
135+
};
136+
137+
cbws.messageManager.on('message', handleUserMessage);
137138
} else {
138139
setTimeout(waitForConnection, 100);
139140
}

src/modules/agent.ts

Lines changed: 21 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { GetAgentStateResponse } from '@codebolt/types';
2-
import cbws from './websocket';
2+
import cbws from '../core/websocket';
33

44

55
export enum AgentLocation {
@@ -30,23 +30,18 @@ const codeboltAgent = {
3030
* @returns {Promise<AgentResponse>} A promise that resolves with the agent details.
3131
*/
3232
findAgent: (task: string, maxResult = 1, agents = [], agentLocaltion: AgentLocation = AgentLocation.ALL, getFrom: FilterUsing.USE_VECTOR_DB): Promise<any> => {
33-
return new Promise((resolve, reject) => {
34-
cbws.getWebsocket.send(JSON.stringify({
33+
return cbws.messageManager.sendAndWaitForResponse(
34+
{
3535
"type": "agentEvent",
3636
"action": "findAgent",
3737
"task": task,
3838
"agents": agents,// for filter in vector db
3939
"maxResult": maxResult,
4040
"location": agentLocaltion,
4141
"getFrom": getFrom
42-
}));
43-
cbws.getWebsocket.on('message', (data: string) => {
44-
const response = JSON.parse(data);
45-
if (response.type === "findAgentByTaskResponse") {
46-
resolve(response); // Resolve the Promise with the agent details
47-
}
48-
});
49-
});
42+
},
43+
"findAgentByTaskResponse"
44+
);
5045
},
5146

5247
/**
@@ -55,60 +50,45 @@ const codeboltAgent = {
5550
* @returns {Promise<void>} A promise that resolves when the agent has been successfully started.
5651
*/
5752
startAgent: (agentId: string, task: string): Promise<any> => {
58-
return new Promise((resolve, reject) => {
59-
cbws.getWebsocket.send(JSON.stringify({
53+
return cbws.messageManager.sendAndWaitForResponse(
54+
{
6055
"type": "agentEvent",
6156
"action": "startAgent",
6257
"agentId": agentId,
6358
"task": task
64-
}));
65-
cbws.getWebsocket.on('message', (data: string) => {
66-
const response = JSON.parse(data);
67-
if (response.type === "taskCompletionResponse" && response.agentId === agentId) {
68-
resolve(response); // Resolve the Promise when the agent has been successfully started
69-
}
70-
});
71-
});
59+
},
60+
"taskCompletionResponse"
61+
);
7262
},
7363

7464
/**
7565
* Lists all available agents.
7666
* @returns {Promise<any>} A promise that resolves with the list of agents.
7767
*/
7868
getAgentsList: (type: Agents = Agents.DOWNLOADED): Promise<any> => {
79-
return new Promise((resolve, reject) => {
80-
cbws.getWebsocket.send(JSON.stringify({
69+
return cbws.messageManager.sendAndWaitForResponse(
70+
{
8171
"type": "agentEvent",
8272
"action": "listAgents",
8373
"agentType": type,
8474

85-
}));
86-
cbws.getWebsocket.on('message', (data: string) => {
87-
const response = JSON.parse(data);
88-
if (response.type === "listAgentsResponse") {
89-
resolve(response); // Resolve the Promise with the list of agents
90-
}
91-
});
92-
});
75+
},
76+
"listAgentsResponse"
77+
);
9378
},
9479
/**
9580
* Lists all available agents.
9681
* @returns {Promise<any>} A promise that resolves with the list of agents.
9782
*/
9883
getAgentsDetail: (agentList = []): Promise<any> => {
99-
return new Promise((resolve, reject) => {
100-
cbws.getWebsocket.send(JSON.stringify({
84+
return cbws.messageManager.sendAndWaitForResponse(
85+
{
10186
"type": "agentEvent",
10287
"action": "agentsDetail",
10388
"agentList": agentList
104-
}));
105-
cbws.getWebsocket.on('message', (data: string) => {
106-
const response = JSON.parse(data);
107-
if (response.type === "listAgentsResponse") {
108-
resolve(response); // Resolve the Promise with the list of agents
109-
}
110-
});
111-
});
89+
},
90+
"listAgentsResponse"
91+
);
11292
}
11393
}
11494

0 commit comments

Comments
 (0)