Skip to content

Commit 5e7cf25

Browse files
author
robertlestak
committed
make memory server more resillient
1 parent 594e496 commit 5e7cf25

File tree

1 file changed

+157
-73
lines changed

1 file changed

+157
-73
lines changed

src/memory/index.ts

Lines changed: 157 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,30 @@ import { promises as fs } from 'fs';
88
import path from 'path';
99
import { fileURLToPath } from 'url';
1010

11+
// Simple file lock implementation
12+
class FileLock {
13+
private locks = new Map<string, Promise<void>>();
14+
15+
async acquire<T>(key: string, fn: () => Promise<T>): Promise<T> {
16+
while (this.locks.has(key)) {
17+
await this.locks.get(key);
18+
}
19+
20+
let release: () => void;
21+
const lock = new Promise<void>(resolve => { release = resolve; });
22+
this.locks.set(key, lock);
23+
24+
try {
25+
return await fn();
26+
} finally {
27+
this.locks.delete(key);
28+
release!();
29+
}
30+
}
31+
}
32+
33+
const fileLock = new FileLock();
34+
1135
// Define memory file path using environment variable with fallback
1236
export const defaultMemoryPath = path.join(path.dirname(fileURLToPath(import.meta.url)), 'memory.jsonl');
1337

@@ -70,39 +94,47 @@ export class KnowledgeGraphManager {
7094
constructor(private memoryFilePath: string) {}
7195

7296
private async loadGraph(): Promise<KnowledgeGraph> {
73-
try {
74-
const data = await fs.readFile(this.memoryFilePath, "utf-8");
75-
const lines = data.split("\n").filter(line => line.trim() !== "");
76-
return lines.reduce((graph: KnowledgeGraph, line) => {
77-
const item = JSON.parse(line);
78-
if (item.type === "entity") graph.entities.push(item as Entity);
79-
if (item.type === "relation") graph.relations.push(item as Relation);
80-
return graph;
81-
}, { entities: [], relations: [] });
82-
} catch (error) {
83-
if (error instanceof Error && 'code' in error && (error as any).code === "ENOENT") {
84-
return { entities: [], relations: [] };
97+
return fileLock.acquire(this.memoryFilePath, async () => {
98+
try {
99+
const data = await fs.readFile(this.memoryFilePath, "utf-8");
100+
const lines = data.split("\n").filter(line => line.trim() !== "");
101+
return lines.reduce((graph: KnowledgeGraph, line) => {
102+
try {
103+
const item = JSON.parse(line);
104+
if (item.type === "entity") graph.entities.push(item as Entity);
105+
if (item.type === "relation") graph.relations.push(item as Relation);
106+
} catch (parseError) {
107+
console.error(`Skipping malformed line: ${line}`, parseError);
108+
}
109+
return graph;
110+
}, { entities: [], relations: [] });
111+
} catch (error) {
112+
if (error instanceof Error && 'code' in error && (error as any).code === "ENOENT") {
113+
return { entities: [], relations: [] };
114+
}
115+
throw error;
85116
}
86-
throw error;
87-
}
117+
});
88118
}
89119

90120
private async saveGraph(graph: KnowledgeGraph): Promise<void> {
91-
const lines = [
92-
...graph.entities.map(e => JSON.stringify({
93-
type: "entity",
94-
name: e.name,
95-
entityType: e.entityType,
96-
observations: e.observations
97-
})),
98-
...graph.relations.map(r => JSON.stringify({
99-
type: "relation",
100-
from: r.from,
101-
to: r.to,
102-
relationType: r.relationType
103-
})),
104-
];
105-
await fs.writeFile(this.memoryFilePath, lines.join("\n"));
121+
return fileLock.acquire(this.memoryFilePath, async () => {
122+
const lines = [
123+
...graph.entities.map(e => JSON.stringify({
124+
type: "entity",
125+
name: e.name,
126+
entityType: e.entityType,
127+
observations: e.observations
128+
})),
129+
...graph.relations.map(r => JSON.stringify({
130+
type: "relation",
131+
from: r.from,
132+
to: r.to,
133+
relationType: r.relationType
134+
})),
135+
];
136+
await fs.writeFile(this.memoryFilePath, lines.join("\n") + "\n");
137+
});
106138
}
107139

108140
async createEntities(entities: Entity[]): Promise<Entity[]> {
@@ -465,57 +497,109 @@ async function main() {
465497
if (process.env.MCP_TRANSPORT === 'http') {
466498
const { createServer } = await import('http');
467499
const transports: Record<string, StreamableHTTPServerTransport> = {};
500+
const SESSION_TIMEOUT = 30 * 60 * 1000; // 30 minutes
501+
const MAX_REQUEST_SIZE = 10 * 1024 * 1024; // 10MB
502+
const sessionTimers: Record<string, NodeJS.Timeout> = {};
503+
504+
const cleanupSession = (sessionId: string) => {
505+
if (sessionTimers[sessionId]) {
506+
clearTimeout(sessionTimers[sessionId]);
507+
delete sessionTimers[sessionId];
508+
}
509+
delete transports[sessionId];
510+
console.error('Session cleaned up:', sessionId);
511+
};
512+
513+
const resetSessionTimer = (sessionId: string) => {
514+
if (sessionTimers[sessionId]) {
515+
clearTimeout(sessionTimers[sessionId]);
516+
}
517+
sessionTimers[sessionId] = setTimeout(() => cleanupSession(sessionId), SESSION_TIMEOUT);
518+
};
468519

469520
const httpServer = createServer(async (req, res) => {
470-
const sessionId = req.headers['mcp-session-id'] as string | undefined;
471-
472-
if (req.method === 'POST') {
473-
let body = '';
474-
req.on('data', chunk => body += chunk);
475-
req.on('end', async () => {
476-
const parsedBody = body.trim() ? JSON.parse(body) : undefined;
477-
478-
let transport: StreamableHTTPServerTransport;
479-
if (sessionId && transports[sessionId]) {
480-
transport = transports[sessionId];
481-
} else if (!sessionId) {
482-
transport = new StreamableHTTPServerTransport({
483-
sessionIdGenerator: () => crypto.randomUUID(),
484-
onsessioninitialized: (sid) => {
485-
transports[sid] = transport;
486-
console.error('Session initialized:', sid);
487-
},
488-
onsessionclosed: (sid) => {
489-
delete transports[sid];
490-
console.error('Session closed:', sid);
521+
try {
522+
const sessionId = req.headers['mcp-session-id'] as string | undefined;
523+
524+
if (req.method === 'POST') {
525+
let body = '';
526+
let size = 0;
527+
528+
req.on('data', chunk => {
529+
size += chunk.length;
530+
if (size > MAX_REQUEST_SIZE) {
531+
req.destroy();
532+
res.writeHead(413);
533+
res.end('Request too large');
534+
return;
535+
}
536+
body += chunk;
537+
});
538+
539+
req.on('end', async () => {
540+
try {
541+
const parsedBody = body.trim() ? JSON.parse(body) : undefined;
542+
543+
let transport: StreamableHTTPServerTransport;
544+
if (sessionId && transports[sessionId]) {
545+
transport = transports[sessionId];
546+
resetSessionTimer(sessionId);
547+
} else if (!sessionId) {
548+
transport = new StreamableHTTPServerTransport({
549+
sessionIdGenerator: () => crypto.randomUUID(),
550+
onsessioninitialized: (sid) => {
551+
transports[sid] = transport;
552+
resetSessionTimer(sid);
553+
console.error('Session initialized:', sid);
554+
},
555+
onsessionclosed: (sid) => {
556+
cleanupSession(sid);
557+
}
558+
});
559+
await server.connect(transport);
560+
} else {
561+
res.writeHead(400);
562+
res.end('Invalid session ID');
563+
return;
491564
}
492-
});
493-
await server.connect(transport);
494-
} else {
565+
566+
await transport.handleRequest(req, res, parsedBody);
567+
} catch (error) {
568+
console.error('Error handling POST request:', error);
569+
res.writeHead(500);
570+
res.end('Internal server error');
571+
}
572+
});
573+
574+
req.on('error', (error) => {
575+
console.error('Request error:', error);
576+
res.writeHead(400);
577+
res.end('Bad request');
578+
});
579+
} else if (req.method === 'GET') {
580+
if (!sessionId || !transports[sessionId]) {
495581
res.writeHead(400);
496-
res.end('Invalid session ID');
582+
res.end('Invalid or missing session ID');
497583
return;
498584
}
499-
500-
await transport.handleRequest(req, res, parsedBody);
501-
});
502-
} else if (req.method === 'GET') {
503-
if (!sessionId || !transports[sessionId]) {
504-
res.writeHead(400);
505-
res.end('Invalid or missing session ID');
506-
return;
507-
}
508-
await transports[sessionId].handleRequest(req, res);
509-
} else if (req.method === 'DELETE') {
510-
if (!sessionId || !transports[sessionId]) {
511-
res.writeHead(400);
512-
res.end('Invalid or missing session ID');
513-
return;
585+
resetSessionTimer(sessionId);
586+
await transports[sessionId].handleRequest(req, res);
587+
} else if (req.method === 'DELETE') {
588+
if (!sessionId || !transports[sessionId]) {
589+
res.writeHead(400);
590+
res.end('Invalid or missing session ID');
591+
return;
592+
}
593+
await transports[sessionId].handleRequest(req, res);
594+
cleanupSession(sessionId);
595+
} else {
596+
res.writeHead(405);
597+
res.end('Method not allowed');
514598
}
515-
await transports[sessionId].handleRequest(req, res);
516-
} else {
517-
res.writeHead(405);
518-
res.end('Method not allowed');
599+
} catch (error) {
600+
console.error('Unhandled error in HTTP handler:', error);
601+
res.writeHead(500);
602+
res.end('Internal server error');
519603
}
520604
});
521605

0 commit comments

Comments
 (0)