Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ __pycache__/
.DS_Store
.beads/
plugin/gateway-core/node_modules/
plugin/gateway-core/*.tgz
plugin/gateway-core@latest
runtime/
bg/
Expand Down
90 changes: 90 additions & 0 deletions plugin/gateway-core/dist/hooks/session-recovery/index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
import { writeGatewayEventAudit } from "../../audit/event-audit.js";
import { injectHookMessage, inspectHookMessageSafety } from "../hook-message-injector/index.js";
import { readCombinedToolAfterOutputText } from "../shared/tool-after-output.js";
const STALE_QUESTION_PREVENTION_MS = 60_000;
function nowMs() {
return Date.now();
}
function resolveToolSessionId(payload) {
return String(payload.input?.sessionID ?? payload.input?.sessionId ?? "").trim();
}
function resolveMessageSessionId(payload) {
return String(payload.properties?.info?.sessionID ?? payload.properties?.info?.sessionId ?? "").trim();
}
function normalizeToolName(raw) {
return String(raw ?? "").trim().toLowerCase();
}
function isQuestionTool(raw) {
const tool = normalizeToolName(raw);
return tool === "question" || tool === "askuserquestion";
}
// Returns true when event error resembles recoverable transient session failure.
function isRecoverableError(error) {
const candidate = error && typeof error === "object" && "message" in error
Expand Down Expand Up @@ -161,6 +178,7 @@ async function injectRecoveryMessage(args) {
// Creates session recovery hook that attempts one auto-resume per active error session.
export function createSessionRecoveryHook(options) {
const recoveringSessions = new Set();
const pendingQuestions = new Map();
return {
id: "session-recovery",
priority: 280,
Expand All @@ -173,7 +191,61 @@ export function createSessionRecoveryHook(options) {
const sessionId = resolveSessionId(eventPayload);
if (sessionId) {
recoveringSessions.delete(sessionId);
pendingQuestions.delete(sessionId);
}
return;
}
if (type === "message.updated") {
const messagePayload = (payload ?? {});
const sessionId = resolveMessageSessionId(messagePayload);
if (!sessionId) {
return;
}
const info = messagePayload.properties?.info;
const role = String(info?.role ?? "").trim().toLowerCase();
if (role === "user") {
pendingQuestions.delete(sessionId);
return;
}
if (role !== "assistant") {
return;
}
const completed = Number.isFinite(Number(info?.time?.completed ?? Number.NaN));
const errored = info?.error !== undefined && info?.error !== null;
if (completed || errored) {
pendingQuestions.delete(sessionId);
return;
}
const existing = pendingQuestions.get(sessionId);
if (!existing) {
return;
}
pendingQuestions.set(sessionId, {
...existing,
lastUpdatedAt: nowMs(),
});
return;
}
if (type === "tool.execute.before") {
const toolPayload = (payload ?? {});
const sessionId = resolveToolSessionId(toolPayload);
if (!sessionId || !isQuestionTool(toolPayload.input?.tool)) {
return;
}
pendingQuestions.set(sessionId, {
tool: normalizeToolName(toolPayload.input?.tool),
startedAt: nowMs(),
lastUpdatedAt: nowMs(),
});
return;
}
if (type === "tool.execute.before.error") {
const toolPayload = (payload ?? {});
const sessionId = resolveToolSessionId(toolPayload);
if (!sessionId || !isQuestionTool(toolPayload.input?.tool)) {
return;
}
pendingQuestions.delete(sessionId);
return;
}
if (type === "session.idle") {
Expand All @@ -188,6 +260,19 @@ export function createSessionRecoveryHook(options) {
if (!client || typeof client.messages !== "function") {
return;
}
const pendingQuestion = pendingQuestions.get(sessionId);
if (pendingQuestion) {
const ageMs = Math.max(0, nowMs() - Math.max(pendingQuestion.startedAt, pendingQuestion.lastUpdatedAt));
if (ageMs < STALE_QUESTION_PREVENTION_MS) {
writeGatewayEventAudit(directory, {
hook: "session-recovery",
stage: "skip",
reason_code: "stale_question_tool_prevention_not_stale",
session_id: sessionId,
});
return;
}
}
try {
const response = await client.messages({
path: { id: sessionId },
Expand Down Expand Up @@ -233,6 +318,7 @@ export function createSessionRecoveryHook(options) {
}
finally {
recoveringSessions.delete(sessionId);
pendingQuestions.delete(sessionId);
}
}
catch {
Expand All @@ -248,6 +334,10 @@ export function createSessionRecoveryHook(options) {
if (type === "tool.execute.after") {
const toolPayload = (payload ?? {});
const sessionId = String(toolPayload.input?.sessionID ?? toolPayload.input?.sessionId ?? "").trim();
if (sessionId && isQuestionTool(toolPayload.input?.tool)) {
pendingQuestions.delete(sessionId);
return;
}
const directory = typeof toolPayload.directory === "string" && toolPayload.directory.trim()
? toolPayload.directory
: options.directory;
Expand Down
128 changes: 128 additions & 0 deletions plugin/gateway-core/src/hooks/session-recovery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,61 @@ interface ToolAfterPayload {
}
}

interface ToolBeforePayload {
directory?: string
input?: {
tool?: string
sessionID?: string
sessionId?: string
}
}

interface MessageUpdatedPayload {
directory?: string
properties?: {
info?: {
role?: string
sessionID?: string
sessionId?: string
error?: unknown
time?: { completed?: number }
}
}
}

interface PendingQuestionState {
tool: string
startedAt: number
lastUpdatedAt: number
}

const STALE_QUESTION_PREVENTION_MS = 60_000

function nowMs(): number {
return Date.now()
}

function resolveToolSessionId(payload: {
input?: { sessionID?: string; sessionId?: string }
}): string {
return String(payload.input?.sessionID ?? payload.input?.sessionId ?? "").trim()
}

function resolveMessageSessionId(payload: MessageUpdatedPayload): string {
return String(
payload.properties?.info?.sessionID ?? payload.properties?.info?.sessionId ?? "",
).trim()
}

function normalizeToolName(raw: unknown): string {
return String(raw ?? "").trim().toLowerCase()
}

function isQuestionTool(raw: unknown): boolean {
const tool = normalizeToolName(raw)
return tool === "question" || tool === "askuserquestion"
}

// Returns true when event error resembles recoverable transient session failure.
function isRecoverableError(error: unknown): boolean {
const candidate =
Expand Down Expand Up @@ -287,6 +342,7 @@ export function createSessionRecoveryHook(options: {
autoResume: boolean
}): GatewayHook {
const recoveringSessions = new Set<string>()
const pendingQuestions = new Map<string, PendingQuestionState>()
return {
id: "session-recovery",
priority: 280,
Expand All @@ -299,9 +355,63 @@ export function createSessionRecoveryHook(options: {
const sessionId = resolveSessionId(eventPayload)
if (sessionId) {
recoveringSessions.delete(sessionId)
pendingQuestions.delete(sessionId)
}
return
}
if (type === "message.updated") {
const messagePayload = (payload ?? {}) as MessageUpdatedPayload
const sessionId = resolveMessageSessionId(messagePayload)
if (!sessionId) {
return
}
const info = messagePayload.properties?.info
const role = String(info?.role ?? "").trim().toLowerCase()
if (role === "user") {
pendingQuestions.delete(sessionId)
return
}
if (role !== "assistant") {
return
}
const completed = Number.isFinite(Number(info?.time?.completed ?? Number.NaN))
const errored = info?.error !== undefined && info?.error !== null
if (completed || errored) {
pendingQuestions.delete(sessionId)
return
}
const existing = pendingQuestions.get(sessionId)
if (!existing) {
return
}
pendingQuestions.set(sessionId, {
...existing,
lastUpdatedAt: nowMs(),
})
return
}
if (type === "tool.execute.before") {
const toolPayload = (payload ?? {}) as ToolBeforePayload
const sessionId = resolveToolSessionId(toolPayload)
if (!sessionId || !isQuestionTool(toolPayload.input?.tool)) {
return
}
pendingQuestions.set(sessionId, {
tool: normalizeToolName(toolPayload.input?.tool),
startedAt: nowMs(),
lastUpdatedAt: nowMs(),
})
return
}
if (type === "tool.execute.before.error") {
const toolPayload = (payload ?? {}) as ToolBeforePayload
const sessionId = resolveToolSessionId(toolPayload)
if (!sessionId || !isQuestionTool(toolPayload.input?.tool)) {
return
}
pendingQuestions.delete(sessionId)
return
}
if (type === "session.idle") {
const directory =
typeof eventPayload.directory === "string" && eventPayload.directory.trim()
Expand All @@ -315,6 +425,19 @@ export function createSessionRecoveryHook(options: {
if (!client || typeof client.messages !== "function") {
return
}
const pendingQuestion = pendingQuestions.get(sessionId)
if (pendingQuestion) {
const ageMs = Math.max(0, nowMs() - Math.max(pendingQuestion.startedAt, pendingQuestion.lastUpdatedAt))
if (ageMs < STALE_QUESTION_PREVENTION_MS) {
writeGatewayEventAudit(directory, {
hook: "session-recovery",
stage: "skip",
reason_code: "stale_question_tool_prevention_not_stale",
session_id: sessionId,
})
return
}
}
try {
const response = await client.messages({
path: { id: sessionId },
Expand Down Expand Up @@ -359,6 +482,7 @@ export function createSessionRecoveryHook(options: {
})
} finally {
recoveringSessions.delete(sessionId)
pendingQuestions.delete(sessionId)
}
} catch {
writeGatewayEventAudit(directory, {
Expand All @@ -373,6 +497,10 @@ export function createSessionRecoveryHook(options: {
if (type === "tool.execute.after") {
const toolPayload = (payload ?? {}) as ToolAfterPayload
const sessionId = String(toolPayload.input?.sessionID ?? toolPayload.input?.sessionId ?? "").trim()
if (sessionId && isQuestionTool(toolPayload.input?.tool)) {
pendingQuestions.delete(sessionId)
return
}
const directory =
typeof toolPayload.directory === "string" && toolPayload.directory.trim()
? toolPayload.directory
Expand Down
Loading