Skip to content

Commit 769d557

Browse files
author
Lasim
committed
feat(backend): add satellite command notifications for MCP installations
1 parent 362987f commit 769d557

File tree

3 files changed

+180
-6
lines changed

3 files changed

+180
-6
lines changed

services/backend/src/routes/mcp/installations/callback.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,29 @@ export default async function oauthCallbackRoute(server: FastifyInstance) {
262262
})
263263
.where(eq(mcpServerInstallations.id, installation.id));
264264

265+
// Create satellite commands for immediate notification
266+
try {
267+
const { SatelliteCommandService } = await import('../../../services/satelliteCommandService');
268+
const satelliteCommandService = new SatelliteCommandService(db, request.log);
269+
const commands = await satelliteCommandService.notifyMcpInstallation(
270+
installation.id,
271+
installation.team_id,
272+
installation.created_by
273+
);
274+
275+
request.log.info(
276+
{
277+
installationId: installation.id,
278+
commandsCreated: commands.length,
279+
satelliteIds: commands.map(c => c.satellite_id)
280+
},
281+
'Satellite commands created for OAuth MCP installation'
282+
);
283+
} catch (commandError) {
284+
request.log.error(commandError, `Failed to create satellite commands for installation ${installation.id}:`);
285+
// Don't fail OAuth completion if command creation fails
286+
}
287+
265288
request.log.info(
266289
{
267290
installationId: installation.id,

services/satellite/src/server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,7 @@ export async function createServer() {
632632
// Phase 10: Initialize OAuth Token Service for HTTP/SSE MCP servers
633633
const oauthTokenService = new OAuthTokenService(server.log as any, backendClient, satelliteId);
634634
mcpServerWrapper.setOAuthTokenService(oauthTokenService);
635+
remoteToolDiscoveryManager.setOAuthTokenService(oauthTokenService);
635636

636637
// Store OAuth services on server instance for access by routes
637638
server.decorate('tokenIntrospectionService', tokenIntrospectionService);
@@ -720,6 +721,7 @@ export async function createServer() {
720721
// Phase 10: Initialize OAuth Token Service for HTTP/SSE MCP servers
721722
const oauthTokenService = new OAuthTokenService(server.log as any, backendClient, satelliteId);
722723
mcpServerWrapper.setOAuthTokenService(oauthTokenService);
724+
remoteToolDiscoveryManager.setOAuthTokenService(oauthTokenService);
723725

724726
// Store OAuth services on server instance for access by routes
725727
server.decorate('tokenIntrospectionService', tokenIntrospectionService);

services/satellite/src/services/remote-tool-discovery-manager.ts

Lines changed: 155 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { DynamicConfigManager, DynamicMcpServersConfig, ConfigurationChanges } f
66
import { McpServerConfig } from './command-polling-service';
77
import type { EventBus } from './event-bus';
88
import { maskUrlForLogging } from '../utils/log-masker';
9+
import { OAuthTokenService } from './oauth-token-service';
910

1011
/**
1112
* Cached tool information with namespacing
@@ -40,6 +41,7 @@ export class RemoteToolDiscoveryManager {
4041
private logger: FastifyBaseLogger;
4142
private configManager?: DynamicConfigManager;
4243
private eventBus?: EventBus;
44+
private oauthTokenService?: OAuthTokenService;
4345

4446
constructor(logger: FastifyBaseLogger, eventBus?: EventBus) {
4547
this.logger = logger.child({ component: 'RemoteToolDiscoveryManager' });
@@ -56,6 +58,16 @@ export class RemoteToolDiscoveryManager {
5658
}, 'Dynamic configuration manager set for tool discovery');
5759
}
5860

61+
/**
62+
* Set OAuth token service (for OAuth-enabled MCP servers)
63+
*/
64+
setOAuthTokenService(oauthTokenService: OAuthTokenService): void {
65+
this.oauthTokenService = oauthTokenService;
66+
this.logger.debug({
67+
operation: 'tool_discovery_oauth_service_set'
68+
}, 'OAuth token service set for tool discovery');
69+
}
70+
5971
/**
6072
* Initialize tool discovery - called once at startup
6173
*/
@@ -257,9 +269,121 @@ export class RemoteToolDiscoveryManager {
257269
// Build URL with query parameters
258270
const finalUrl = this.buildMcpServerUrl(config.url, config.url_query_params);
259271

272+
// Phase 10: OAuth token injection for tool discovery
273+
let headers: Record<string, string> = {};
274+
275+
if (config.requires_oauth && this.oauthTokenService) {
276+
if (!config.installation_id || !config.user_id || !config.team_id) {
277+
throw new Error(
278+
`OAuth required but missing context for ${serverName}. ` +
279+
'Installation ID, User ID, and Team ID are required for tool discovery.'
280+
);
281+
}
282+
283+
this.logger.info({
284+
operation: 'oauth_token_injection_tool_discovery',
285+
server_name: serverName,
286+
installation_id: config.installation_id,
287+
user_id: config.user_id,
288+
team_id: config.team_id
289+
}, 'MCP server requires OAuth for tool discovery - fetching tokens');
290+
291+
try {
292+
// Check token status first
293+
const tokenStatus = await this.oauthTokenService.checkTokenStatus(
294+
config.installation_id,
295+
config.user_id,
296+
config.team_id
297+
);
298+
299+
if (!tokenStatus.exists) {
300+
throw new Error(
301+
`OAuth authorization required for ${serverName}. ` +
302+
'Please visit the dashboard to authorize this MCP server.'
303+
);
304+
}
305+
306+
if (tokenStatus.expired) {
307+
this.logger.warn({
308+
operation: 'oauth_token_expired_tool_discovery',
309+
server_name: serverName,
310+
expires_at: tokenStatus.expires_at
311+
}, 'OAuth token is expired - attempting tool discovery anyway (backend may have refreshed)');
312+
}
313+
314+
// Retrieve tokens
315+
const tokens = await this.oauthTokenService.getTokens(
316+
config.installation_id,
317+
config.user_id,
318+
config.team_id
319+
);
320+
321+
if (!tokens) {
322+
throw new Error(`Failed to retrieve OAuth tokens for ${serverName}`);
323+
}
324+
325+
// Inject OAuth token into Authorization header
326+
headers['Authorization'] = `Bearer ${tokens.access_token}`;
327+
328+
this.logger.info({
329+
operation: 'oauth_token_injected_tool_discovery',
330+
server_name: serverName,
331+
expires_at: tokens.expires_at,
332+
has_refresh_token: !!tokens.refresh_token
333+
}, 'OAuth token injected for tool discovery');
334+
335+
} catch (error) {
336+
const errorMessage = error instanceof Error ? error.message : String(error);
337+
this.logger.error({
338+
operation: 'oauth_token_injection_failed_tool_discovery',
339+
server_name: serverName,
340+
error: errorMessage
341+
}, 'Failed to inject OAuth tokens for tool discovery');
342+
throw error;
343+
}
344+
}
345+
260346
// Create transport for the remote server
261347
const transport = new StreamableHTTPClientTransport(new URL(finalUrl));
262348

349+
// WORKAROUND: Patch global fetch temporarily to inject OAuth headers
350+
// The MCP SDK doesn't currently support custom headers in StreamableHTTPClientTransport
351+
let originalGlobalFetch: typeof fetch | null = null;
352+
if (Object.keys(headers).length > 0) {
353+
originalGlobalFetch = global.fetch;
354+
global.fetch = async (input: any, init?: any) => {
355+
// Properly merge headers (handle both Headers object and plain object)
356+
const mergedHeaders: Record<string, string> = {};
357+
358+
// Copy existing headers
359+
if (init?.headers) {
360+
if (init.headers instanceof Headers) {
361+
init.headers.forEach((value: string, key: string) => {
362+
mergedHeaders[key] = value;
363+
});
364+
} else {
365+
Object.assign(mergedHeaders, init.headers);
366+
}
367+
}
368+
369+
// Add OAuth headers (don't overwrite existing)
370+
Object.assign(mergedHeaders, headers);
371+
372+
const modifiedInit = {
373+
...init,
374+
headers: mergedHeaders
375+
};
376+
377+
return originalGlobalFetch!(input, modifiedInit);
378+
};
379+
380+
this.logger.debug({
381+
operation: 'oauth_headers_patched_global_fetch',
382+
server_name: serverName,
383+
headers_to_inject: Object.keys(headers)
384+
}, 'Patched global fetch to inject OAuth headers for tool discovery');
385+
}
386+
263387
try {
264388
// Connect to remote MCP server
265389
await client.connect(transport);
@@ -322,6 +446,11 @@ export class RemoteToolDiscoveryManager {
322446

323447
throw error;
324448
} finally {
449+
// Restore global fetch if it was patched
450+
if (originalGlobalFetch) {
451+
global.fetch = originalGlobalFetch;
452+
}
453+
325454
// Always clean up the client connection
326455
try {
327456
await client.close();
@@ -474,8 +603,22 @@ export class RemoteToolDiscoveryManager {
474603
unchanged_servers: configChanges.unchangedServers
475604
}, 'Processing differential tool discovery update');
476605

477-
// If no changes, skip all processing
478-
if (!configChanges.hasChanges) {
606+
// Check if any unchanged servers require OAuth (need re-discovery for token updates)
607+
const oauthUnchangedServers = configChanges.unchangedServers.filter(serverName => {
608+
const serverConfig = config.servers[serverName];
609+
return serverConfig?.requires_oauth === true;
610+
});
611+
612+
if (oauthUnchangedServers.length > 0) {
613+
this.logger.debug({
614+
operation: 'tool_discovery_oauth_rediscovery',
615+
oauth_servers: oauthUnchangedServers,
616+
oauth_server_count: oauthUnchangedServers.length
617+
}, `Forcing re-discovery for ${oauthUnchangedServers.length} OAuth servers (tokens may have been updated)`);
618+
}
619+
620+
// If no changes and no OAuth servers need re-discovery, skip all processing
621+
if (!configChanges.hasChanges && oauthUnchangedServers.length === 0) {
479622
this.logger.debug({
480623
operation: 'tool_discovery_no_changes',
481624
server_count: Object.keys(config.servers).length
@@ -501,16 +644,22 @@ export class RemoteToolDiscoveryManager {
501644
});
502645
}
503646

504-
// Discover tools for new servers
505-
const serversToDiscover = [...configChanges.addedServers, ...configChanges.modifiedServers];
647+
// Discover tools for new, modified, and OAuth unchanged servers
648+
// OAuth servers need re-discovery even when config unchanged (for token updates)
649+
const serversToDiscover = [
650+
...configChanges.addedServers,
651+
...configChanges.modifiedServers,
652+
...oauthUnchangedServers
653+
];
506654

507655
if (serversToDiscover.length > 0) {
508656
this.logger.debug({
509657
operation: 'tool_discovery_partial_discovery',
510658
servers_to_discover: serversToDiscover,
511659
added_count: configChanges.addedServers.length,
512-
modified_count: configChanges.modifiedServers.length
513-
}, `Discovering tools for ${serversToDiscover.length} servers (${configChanges.addedServers.length} new, ${configChanges.modifiedServers.length} modified)`);
660+
modified_count: configChanges.modifiedServers.length,
661+
oauth_unchanged_count: oauthUnchangedServers.length
662+
}, `Discovering tools for ${serversToDiscover.length} servers (${configChanges.addedServers.length} new, ${configChanges.modifiedServers.length} modified, ${oauthUnchangedServers.length} OAuth re-discovery)`);
514663

515664
let successCount = 0;
516665
let failureCount = 0;

0 commit comments

Comments
 (0)