Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/cli/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,72 @@ import {
import * as os from "os";
import { getParseOptions } from "./argv";

// -----------------------------------------------------------------------------
// Safety net: prevent uncaught exceptions / unhandled rejections from crashing
// the server process. This is a fallback; the real fixes are in StreamManager
// and tool code, but this ensures resilience against unknown edge cases.
// -----------------------------------------------------------------------------
const isBenignNetworkError = (err: unknown): boolean => {
// TypeError: terminated - undici stream termination
if (err instanceof TypeError && err.message === "terminated") return true;

// UND_ERR_BODY_TIMEOUT - undici body timeout
if (
typeof err === "object" &&
err !== null &&
"code" in err &&
(err as { code?: string }).code === "UND_ERR_BODY_TIMEOUT"
)
return true;

// Check nested cause for UND_ERR_BODY_TIMEOUT
if (
typeof err === "object" &&
err !== null &&
"cause" in err &&
typeof (err as { cause?: unknown }).cause === "object" &&
(err as { cause?: { code?: string } }).cause !== null &&
(err as { cause: { code?: string } }).cause.code === "UND_ERR_BODY_TIMEOUT"
)
return true;

// AbortError - expected when streams are cancelled
if (err instanceof Error && err.name === "AbortError") return true;

return false;
};

const formatErrorWithCause = (err: unknown): string => {
if (!(err instanceof Error)) return String(err);
let msg = err.stack ?? err.message;
if (err.cause) {
msg += `\n [cause] ${formatErrorWithCause(err.cause)}`;
}
return msg;
};

process.on("uncaughtException", (err) => {
if (isBenignNetworkError(err)) {
console.warn("[mux-server] Suppressed benign uncaughtException:", err.message ?? err);
return;
}
console.error("[mux-server] Uncaught exception (server continuing):", formatErrorWithCause(err));
// Do NOT exit - keep server running
});

process.on("unhandledRejection", (reason) => {
if (isBenignNetworkError(reason)) {
const msg = reason instanceof Error ? reason.message : String(reason);
console.warn("[mux-server] Suppressed benign unhandledRejection:", msg);
return;
}
console.error(
"[mux-server] Unhandled rejection (server continuing):",
formatErrorWithCause(reason)
);
// Do NOT exit - keep server running
});

const program = new Command();
program
.name("mux server")
Expand Down
64 changes: 57 additions & 7 deletions src/node/services/streamManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,22 @@ export class StreamManager extends EventEmitter {
throw error;
}

// Attach no-op catch handlers to side promises that we may not always await.
// This prevents unhandled rejection crashes if the stream is aborted/terminated
// and these promises reject (e.g., undici TypeError: terminated).
const suppressUnhandledRejection = (p: unknown): void => {
if (p && typeof (p as Promise<unknown>).catch === "function") {
void (p as Promise<unknown>).catch(() => undefined);
}
};
suppressUnhandledRejection(streamResult.usage);
suppressUnhandledRejection(streamResult.totalUsage);
suppressUnhandledRejection(streamResult.steps);
suppressUnhandledRejection(streamResult.providerMetadata);
suppressUnhandledRejection(streamResult.text);
suppressUnhandledRejection(streamResult.finishReason);
suppressUnhandledRejection(streamResult.warnings);

const streamInfo: WorkspaceStreamInfo = {
state: StreamState.STARTING,
streamResult,
Expand Down Expand Up @@ -1162,15 +1178,20 @@ export class StreamManager extends EventEmitter {
error: toolErrorPart.error,
});

// Format error output
// Format error output - use safe stringify to avoid secondary exceptions
// (e.g., circular refs, BigInt) turning a handled tool failure into a crash
const formatToolError = (err: unknown): string => {
if (typeof err === "string") return err;
if (err instanceof Error) return err.message;
try {
return JSON.stringify(err);
} catch {
return String(err) || "[unserializable error]";
}
};
const errorOutput = {
success: false,
error:
typeof toolErrorPart.error === "string"
? toolErrorPart.error
: toolErrorPart.error instanceof Error
? toolErrorPart.error.message
: JSON.stringify(toolErrorPart.error),
error: formatToolError(toolErrorPart.error),
};

// Use shared completion logic (await to ensure partial is flushed before event)
Expand Down Expand Up @@ -1378,6 +1399,16 @@ export class StreamManager extends EventEmitter {
this.emit("stream-end", streamEndEvent);
}
} catch (error) {
// If stream was aborted or is stopping, treat termination errors as expected cancellation
// rather than fatal errors. This prevents TypeError: terminated (undici) from corrupting
// stream state or showing scary error messages when user simply cancelled the stream.
const isAbortedOrStopping =
streamInfo.abortController.signal.aborted || streamInfo.state === StreamState.STOPPING;
if (isAbortedOrStopping) {
log.debug("Stream ended after abort/stop (not an error):", { error });
return;
}

streamInfo.state = StreamState.ERROR;

// Log the actual error for debugging
Expand Down Expand Up @@ -1582,6 +1613,25 @@ export class StreamManager extends EventEmitter {
}
}

// Detect undici/fetch termination errors (e.g., TypeError: terminated, UND_ERR_BODY_TIMEOUT)
// These occur when the underlying HTTP connection is severed or times out
const isUndiciTermination =
(error instanceof TypeError && error.message === "terminated") ||
(typeof error === "object" &&
error !== null &&
"code" in error &&
(error as { code?: string }).code === "UND_ERR_BODY_TIMEOUT") ||
(typeof error === "object" &&
error !== null &&
"cause" in error &&
typeof (error as { cause?: unknown }).cause === "object" &&
(error as { cause?: { code?: string } }).cause !== null &&
(error as { cause: { code?: string } }).cause.code === "UND_ERR_BODY_TIMEOUT");

if (isUndiciTermination) {
return "network";
}

// Fall back to string matching for other errors
if (error instanceof Error) {
const message = error.message.toLowerCase();
Expand Down
51 changes: 35 additions & 16 deletions src/node/services/tools/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,42 @@ export const createTaskTool: ToolFactory = (config: ToolConfiguration) => {
);
}

const report = await taskService.waitForAgentReport(created.data.taskId, {
abortSignal,
requestingWorkspaceId: workspaceId,
});
try {
const report = await taskService.waitForAgentReport(created.data.taskId, {
abortSignal,
requestingWorkspaceId: workspaceId,
});

return parseToolResult(
TaskToolResultSchema,
{
status: "completed" as const,
taskId: created.data.taskId,
reportMarkdown: report.reportMarkdown,
title: report.title,
agentId: requestedAgentId,
agentType: requestedAgentId,
},
"task"
);
return parseToolResult(
TaskToolResultSchema,
{
status: "completed" as const,
taskId: created.data.taskId,
reportMarkdown: report.reportMarkdown,
title: report.title,
agentId: requestedAgentId,
agentType: requestedAgentId,
},
"task"
);
} catch (waitError) {
// If wait timed out, return a valid result indicating task is still running
// (consistent with task_await behavior). This prevents timeouts from becoming
// tool execution errors that can destabilize the parent stream.
const message = waitError instanceof Error ? waitError.message : String(waitError);
if (/timed out waiting for agent_report/i.test(message)) {
log.debug("Task wait timed out, returning running status", {
taskId: created.data.taskId,
});
return parseToolResult(
TaskToolResultSchema,
{ status: "running" as const, taskId: created.data.taskId },
"task"
);
}
// Re-throw other errors (e.g., "Interrupted", "Task terminated")
throw waitError;
}
},
});
};
Loading