Skip to content

Commit 44af50e

Browse files
author
Lasim
committed
feat(gateway): add logs streaming endpoint and centralized logging system
1 parent f5d7661 commit 44af50e

File tree

5 files changed

+516
-5
lines changed

5 files changed

+516
-5
lines changed

services/gateway/src/commands/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ export { registerStopCommand } from './stop';
99
export { registerStatusCommand } from './status';
1010
export { registerConfigCommand } from './config';
1111
export { registerVersionCommand } from './version';
12+
export { registerLogsCommand } from './logs';
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
import { Command } from 'commander';
2+
import chalk from 'chalk';
3+
import fetch from 'node-fetch';
4+
import { LogLevel, LogEntry } from '../utils/logger';
5+
6+
interface LogsOptions {
7+
follow: boolean;
8+
lines: string;
9+
level?: LogLevel;
10+
component?: string;
11+
json: boolean;
12+
noColor: boolean;
13+
port: string;
14+
host: string;
15+
}
16+
17+
export function registerLogsCommand(program: Command) {
18+
program
19+
.command('logs')
20+
.description('Stream real-time logs from the gateway')
21+
.option('-f, --follow', 'Follow log output (default: true)', true)
22+
.option('-n, --lines <number>', 'Number of lines to show initially', '50')
23+
.option('--level <level>', 'Filter by log level (debug, info, warn, error)')
24+
.option('--component <component>', 'Filter by component name')
25+
.option('--json', 'Output raw JSON format', false)
26+
.option('--no-color', 'Disable colored output', false)
27+
.option('-p, --port <port>', 'Gateway port', '9095')
28+
.option('-h, --host <host>', 'Gateway host', 'localhost')
29+
.action(async (options: LogsOptions) => {
30+
try {
31+
await streamLogs(options);
32+
} catch (error) {
33+
console.error(chalk.red('❌ Failed to stream logs:'), error instanceof Error ? error.message : String(error));
34+
process.exit(1);
35+
}
36+
});
37+
}
38+
39+
async function streamLogs(options: LogsOptions): Promise<void> {
40+
const { host, port, lines, level, component, json, noColor } = options;
41+
const gatewayUrl = `http://${host}:${port}`;
42+
43+
// Check if gateway is running
44+
try {
45+
const healthResponse = await fetch(`${gatewayUrl}/health`);
46+
if (!healthResponse.ok) {
47+
throw new Error(`Gateway health check failed: ${healthResponse.status}`);
48+
}
49+
} catch {
50+
console.error(chalk.red('❌ Gateway is not running or not accessible'));
51+
console.error(chalk.gray(' Make sure the gateway is started with "deploystack start"'));
52+
console.error(chalk.gray(` Expected gateway at: ${gatewayUrl}`));
53+
process.exit(1);
54+
}
55+
56+
// Build query parameters
57+
const params = new URLSearchParams();
58+
params.set('lines', lines);
59+
if (level) params.set('level', level);
60+
if (component) params.set('component', component);
61+
62+
const streamUrl = `${gatewayUrl}/logs/stream?${params.toString()}`;
63+
64+
console.log(chalk.blue('📡 Connecting to gateway logs...'));
65+
console.log(chalk.gray(` Gateway: ${gatewayUrl}`));
66+
if (level) console.log(chalk.gray(` Level filter: ${level}`));
67+
if (component) console.log(chalk.gray(` Component filter: ${component}`));
68+
console.log(chalk.gray(` Initial lines: ${lines}`));
69+
console.log('');
70+
71+
// Create SSE connection using fetch
72+
try {
73+
const response = await fetch(streamUrl, {
74+
headers: {
75+
'Accept': 'text/event-stream',
76+
'Cache-Control': 'no-cache'
77+
}
78+
});
79+
80+
if (!response.ok) {
81+
throw new Error(`Failed to connect to logs stream: ${response.status}`);
82+
}
83+
84+
console.log(chalk.green('✅ Connected to gateway logs\n'));
85+
86+
// Setup terminal UI with persistent bottom box
87+
await setupTerminalUI(response, json, noColor);
88+
} catch (error) {
89+
console.error(chalk.red('❌ Failed to connect to gateway logs'));
90+
console.error(chalk.gray(' Make sure the gateway is running and accessible'));
91+
if (error instanceof Error) {
92+
console.error(chalk.gray(` Error: ${error.message}`));
93+
}
94+
process.exit(1);
95+
}
96+
}
97+
98+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
99+
async function setupTerminalUI(response: any, json: boolean, noColor: boolean): Promise<void> {
100+
// Setup raw mode for keyboard input
101+
if (process.stdin.isTTY) {
102+
process.stdin.setRawMode(true);
103+
}
104+
process.stdin.resume();
105+
process.stdin.setEncoding('utf8');
106+
107+
let shouldStop = false;
108+
const terminalWidth = process.stdout.columns || 80;
109+
const boxWidth = Math.min(50, terminalWidth - 4);
110+
111+
// Create the persistent bottom box
112+
const createBottomBox = () => {
113+
const topBorder = '┌' + '─'.repeat(boxWidth - 2) + '┐';
114+
const message = 'Press "x" to exit logs';
115+
const padding = Math.max(0, boxWidth - 4 - message.length);
116+
const leftPad = Math.floor(padding / 2);
117+
const rightPad = padding - leftPad;
118+
const middleLine = '│ ' + ' '.repeat(leftPad) + message + ' '.repeat(rightPad) + ' │';
119+
const bottomBorder = '└' + '─'.repeat(boxWidth - 2) + '┘';
120+
121+
return noColor ?
122+
[topBorder, middleLine, bottomBorder] :
123+
[
124+
chalk.gray(topBorder),
125+
chalk.gray('│ ') + chalk.yellow(' '.repeat(leftPad) + message + ' '.repeat(rightPad)) + chalk.gray(' │'),
126+
chalk.gray(bottomBorder)
127+
];
128+
};
129+
130+
const showBottomBox = () => {
131+
const box = createBottomBox();
132+
console.log('\n' + box.join('\n'));
133+
};
134+
135+
const hideBottomBox = () => {
136+
// Move cursor up 4 lines and clear them
137+
process.stdout.write('\x1b[4A\x1b[0J');
138+
};
139+
140+
// Handle keyboard input
141+
const handleKeyPress = (key: string) => {
142+
if (key === 'x' || key === 'X') {
143+
shouldStop = true;
144+
hideBottomBox();
145+
console.log(chalk.yellow('🛑 Stopping log stream...'));
146+
cleanup();
147+
} else if (key === '\u0003') { // Ctrl+C
148+
shouldStop = true;
149+
hideBottomBox();
150+
console.log(chalk.yellow('\n🛑 Stopping log stream...'));
151+
cleanup();
152+
}
153+
};
154+
155+
const cleanup = () => {
156+
if (process.stdin.isTTY) {
157+
process.stdin.setRawMode(false);
158+
}
159+
process.stdin.pause();
160+
process.stdin.removeListener('data', handleKeyPress);
161+
process.exit(0);
162+
};
163+
164+
// Setup keyboard listener
165+
process.stdin.on('data', handleKeyPress);
166+
167+
// Handle process signals
168+
process.on('SIGINT', () => {
169+
shouldStop = true;
170+
hideBottomBox();
171+
console.log(chalk.yellow('\n🛑 Stopping log stream...'));
172+
cleanup();
173+
});
174+
175+
process.on('SIGTERM', cleanup);
176+
177+
// Show initial bottom box
178+
showBottomBox();
179+
180+
// Parse SSE stream using async iteration
181+
if (response.body) {
182+
let buffer = '';
183+
184+
// Use async iteration for the response body
185+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
186+
for await (const chunk of response.body as any) {
187+
if (shouldStop) break;
188+
189+
buffer += chunk.toString();
190+
191+
// Process complete lines
192+
const lines = buffer.split('\n');
193+
buffer = lines.pop() || ''; // Keep incomplete line in buffer
194+
195+
for (const line of lines) {
196+
if (shouldStop) break;
197+
198+
if (line.startsWith('data: ')) {
199+
const data = line.slice(6); // Remove 'data: ' prefix
200+
201+
if (data.trim() === '') continue; // Skip empty data
202+
203+
try {
204+
const logEntry: LogEntry = JSON.parse(data);
205+
206+
// Hide bottom box, show log entry, then show box again
207+
hideBottomBox();
208+
209+
if (json) {
210+
// Raw JSON output
211+
console.log(JSON.stringify(logEntry));
212+
} else {
213+
// Formatted output
214+
formatLogEntry(logEntry, noColor);
215+
}
216+
217+
showBottomBox();
218+
219+
} catch {
220+
// Skip malformed log entries or heartbeat messages
221+
continue;
222+
}
223+
} else if (line.startsWith(': ')) {
224+
// Skip heartbeat messages silently
225+
continue;
226+
}
227+
}
228+
}
229+
}
230+
}
231+
232+
function formatLogEntry(entry: LogEntry, noColor: boolean): void {
233+
const timestamp = noColor ? entry.timestamp : chalk.gray(entry.timestamp);
234+
const component = entry.component ?
235+
(noColor ? `[${entry.component}]` : chalk.cyan(`[${entry.component}]`)) : '';
236+
237+
let levelColor: (str: string) => string = (str) => str;
238+
let levelIcon = '';
239+
240+
if (!noColor) {
241+
switch (entry.level) {
242+
case 'debug':
243+
levelColor = chalk.gray;
244+
levelIcon = '🔍';
245+
break;
246+
case 'info':
247+
levelColor = chalk.blue;
248+
levelIcon = 'ℹ️';
249+
break;
250+
case 'warn':
251+
levelColor = chalk.yellow;
252+
levelIcon = '⚠️';
253+
break;
254+
case 'error':
255+
levelColor = chalk.red;
256+
levelIcon = '❌';
257+
break;
258+
}
259+
}
260+
261+
const levelText = levelColor(entry.level.toUpperCase().padEnd(5));
262+
const message = (!noColor && entry.level === 'error') ? chalk.red(entry.message) : entry.message;
263+
264+
console.log(`${timestamp} ${levelIcon} ${levelText} ${component} ${message}`);
265+
266+
if (entry.metadata) {
267+
const metadataText = noColor ?
268+
` Metadata: ${JSON.stringify(entry.metadata)}` :
269+
`${chalk.gray(' Metadata:')} ${JSON.stringify(entry.metadata)}`;
270+
console.log(metadataText);
271+
}
272+
}

services/gateway/src/core/server/proxy.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { TeamMCPConfig, MCPServerConfig } from '../../types/mcp';
1111
import { SessionManager } from './session-manager';
1212
import { SSEHandler } from './sse-handler';
1313
import { ToolDiscoveryManager } from '../../utils/tool-discovery-manager';
14+
import { logger } from '../../utils/logger';
1415

1516
export interface ProxyServerOptions {
1617
port?: number;
@@ -201,6 +202,11 @@ export class ProxyServer {
201202
return this.getStatus();
202203
});
203204

205+
// Logs streaming endpoint - real-time log streaming via SSE
206+
this.fastify.get('/logs/stream', async (request, reply) => {
207+
await this.handleLogsStream(request, reply);
208+
});
209+
204210
// Root endpoint - helpful information about available endpoints
205211
this.fastify.get('/', async () => {
206212
return {
@@ -827,6 +833,93 @@ export class ProxyServer {
827833
return processInfo;
828834
}
829835

836+
/**
837+
* Handle logs streaming endpoint - real-time log streaming via SSE
838+
*/
839+
private async handleLogsStream(request: FastifyRequest, reply: FastifyReply): Promise<void> {
840+
const query = request.query as any;
841+
const lines = parseInt(query.lines || '50', 10);
842+
const level = query.level as any;
843+
const component = query.component as string;
844+
845+
logger.info('Logs stream connection established', 'proxy', {
846+
userAgent: request.headers['user-agent'],
847+
lines,
848+
level,
849+
component
850+
});
851+
852+
// Set SSE headers
853+
reply.raw.writeHead(200, {
854+
'Content-Type': 'text/event-stream',
855+
'Cache-Control': 'no-cache',
856+
'Connection': 'keep-alive',
857+
'Access-Control-Allow-Origin': '*',
858+
'Access-Control-Allow-Headers': 'Cache-Control'
859+
});
860+
861+
// Send recent logs
862+
const recentLogs = logger.filterLogs(level, component, lines);
863+
for (const logEntry of recentLogs) {
864+
const sseData = `data: ${JSON.stringify(logEntry)}\n\n`;
865+
reply.raw.write(sseData);
866+
}
867+
868+
// Listen for new log entries
869+
const logHandler = (logEntry: any) => {
870+
// Apply filters
871+
if (level) {
872+
const levelPriority: Record<string, number> = { debug: 0, info: 1, warn: 2, error: 3 };
873+
const minPriority = levelPriority[level];
874+
const entryPriority = levelPriority[logEntry.level];
875+
if (entryPriority < minPriority) {
876+
return;
877+
}
878+
}
879+
880+
if (component && logEntry.component !== component) {
881+
return;
882+
}
883+
884+
// Send log entry via SSE
885+
const sseData = `data: ${JSON.stringify(logEntry)}\n\n`;
886+
try {
887+
reply.raw.write(sseData);
888+
} catch {
889+
// Connection closed, remove listener
890+
logger.removeListener('log', logHandler);
891+
}
892+
};
893+
894+
logger.on('log', logHandler);
895+
896+
// Handle client disconnect
897+
request.raw.on('close', () => {
898+
logger.removeListener('log', logHandler);
899+
logger.info('Logs stream connection closed', 'proxy');
900+
});
901+
902+
request.raw.on('error', () => {
903+
logger.removeListener('log', logHandler);
904+
logger.warn('Logs stream connection error', 'proxy');
905+
});
906+
907+
// Keep connection alive with periodic heartbeat
908+
const heartbeat = setInterval(() => {
909+
try {
910+
reply.raw.write(': heartbeat\n\n');
911+
} catch {
912+
clearInterval(heartbeat);
913+
logger.removeListener('log', logHandler);
914+
}
915+
}, 30000); // 30 seconds
916+
917+
// Clean up heartbeat on disconnect
918+
request.raw.on('close', () => {
919+
clearInterval(heartbeat);
920+
});
921+
}
922+
830923
/**
831924
* Load team MCP configuration
832925
*/

0 commit comments

Comments
 (0)