Skip to content

Commit ae81d03

Browse files
committed
fix(web): disconnect agent connect streams on detail teardown
1 parent 9edf1a7 commit ae81d03

File tree

6 files changed

+553
-18
lines changed

6 files changed

+553
-18
lines changed

typescript/clients/web-ag-ui/apps/agent-gmx-allora/src/workflow/nodes/collectDelegations.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ const resolveDelegationOnboarding = (state: ClmmState): OnboardingState => {
5353
return { step: 2, key: DELEGATION_STEP_KEY };
5454
};
5555

56+
function readConfigString(
57+
config: CopilotKitConfig,
58+
key: 'thread_id' | 'checkpoint_id' | 'checkpoint_ns',
59+
): string | undefined {
60+
const root = config as { configurable?: unknown };
61+
if (!root.configurable || typeof root.configurable !== 'object') {
62+
return undefined;
63+
}
64+
const configurable = root.configurable as Record<string, unknown>;
65+
const value = configurable[key];
66+
return typeof value === 'string' && value.length > 0 ? value : undefined;
67+
}
68+
5669
const HexSchema = z
5770
.string()
5871
.regex(/^0x[0-9a-fA-F]*$/u)
@@ -125,19 +138,33 @@ export const collectDelegationsNode = async (
125138
state: ClmmState,
126139
config: CopilotKitConfig,
127140
): Promise<ClmmUpdate | Command<string, ClmmUpdate>> => {
141+
const runMetadata = {
142+
threadId: readConfigString(config, 'thread_id'),
143+
checkpointId: readConfigString(config, 'checkpoint_id'),
144+
checkpointNamespace: readConfigString(config, 'checkpoint_ns'),
145+
};
128146
const delegationOnboarding = resolveDelegationOnboarding(state);
147+
const taskState = state.thread.task?.taskStatus?.state;
148+
const taskMessage = state.thread.task?.taskStatus?.message?.content;
149+
const isAwaitingDelegationInput =
150+
taskState === 'input-required' &&
151+
typeof taskMessage === 'string' &&
152+
taskMessage.toLowerCase().includes('delegation approval');
129153
logInfo('collectDelegations: entering node', {
154+
...runMetadata,
130155
delegationsBypassActive: state.thread.delegationsBypassActive === true,
131156
hasDelegationBundle: Boolean(state.thread.delegationBundle),
132157
});
133158
logWarn('collectDelegations: node entered', {
159+
...runMetadata,
134160
onboardingStatus: state.thread.onboardingFlow?.status,
135161
onboardingStep: state.thread.onboarding?.step,
136162
onboardingKey: state.thread.onboarding?.key,
137163
delegationsBypassActive: state.thread.delegationsBypassActive === true,
138164
hasDelegationBundle: Boolean(state.thread.delegationBundle),
139165
hasOperatorInput: Boolean(state.thread.operatorInput),
140166
hasFundingTokenInput: Boolean(state.thread.fundingTokenInput),
167+
isAwaitingDelegationInput,
141168
});
142169

143170
if (state.thread.delegationsBypassActive === true) {
@@ -235,6 +262,16 @@ export const collectDelegationsNode = async (
235262
nextTaskMessage: awaitingMessage,
236263
});
237264
const hasRunnableConfig = Boolean((config as { configurable?: unknown }).configurable);
265+
logInfo('collectDelegations: pause transition prepared', {
266+
...runMetadata,
267+
hasRunnableConfig,
268+
shouldPersistPendingState,
269+
isAwaitingDelegationInput,
270+
currentOnboardingStep: state.thread.onboarding?.step,
271+
currentOnboardingKey: state.thread.onboarding?.key,
272+
nextOnboardingStep: delegationOnboarding.step,
273+
nextOnboardingKey: delegationOnboarding.key,
274+
});
238275
const pauseSnapshotView = applyThreadPatch(state, pendingView);
239276
if (hasRunnableConfig && shouldPersistPendingState) {
240277
const mergedView = pauseSnapshotView;
@@ -244,6 +281,10 @@ export const collectDelegationsNode = async (
244281
thread: mergedView,
245282
metadata: {
246283
pauseMechanism: 'checkpoint-and-interrupt',
284+
...runMetadata,
285+
hasRunnableConfig,
286+
shouldPersistPendingState,
287+
isAwaitingDelegationInput,
247288
},
248289
});
249290
await copilotkitEmitState(config, {
@@ -272,18 +313,30 @@ export const collectDelegationsNode = async (
272313
thread: pauseSnapshotView,
273314
metadata: {
274315
pauseMechanism: 'interrupt',
316+
...runMetadata,
317+
hasRunnableConfig,
275318
checkpointPersisted: shouldPersistPendingState,
319+
isAwaitingDelegationInput,
276320
},
277321
});
278322

279323
const interruptResult = await requestInterruptPayload({
280324
request,
281325
interrupt,
282326
});
327+
logInfo('collectDelegations: interrupt payload received', {
328+
...runMetadata,
329+
rawPayloadType: typeof interruptResult.raw,
330+
hasDecodedPayload: interruptResult.decoded !== undefined,
331+
});
283332
const parsed = DelegationSigningResponseSchema.safeParse(interruptResult.decoded);
284333
if (!parsed.success) {
285334
const issues = parsed.error.issues.map((issue) => issue.message).join('; ');
286335
const failureMessage = `Invalid delegation signing response: ${issues}`;
336+
logWarn('collectDelegations: invalid delegation signing payload', {
337+
...runMetadata,
338+
issues,
339+
});
287340
const { task, statusEvent } = buildTaskStatus(awaitingInput.task, 'failed', failureMessage);
288341
const failedView = applyThreadPatch(state, {
289342
task,
@@ -304,6 +357,9 @@ export const collectDelegationsNode = async (
304357
}
305358

306359
if (parsed.data.outcome === 'rejected') {
360+
logWarn('collectDelegations: delegation signing rejected by user', {
361+
...runMetadata,
362+
});
307363
const { task, statusEvent } = buildTaskStatus(
308364
awaitingInput.task,
309365
'failed',
@@ -344,6 +400,10 @@ export const collectDelegationsNode = async (
344400
descriptions: [...DELEGATION_DESCRIPTIONS],
345401
warnings,
346402
};
403+
logInfo('collectDelegations: delegation signing accepted', {
404+
...runMetadata,
405+
signedDelegationCount: delegationBundle.delegations.length,
406+
});
347407

348408
const { task, statusEvent } = buildTaskStatus(
349409
awaitingInput.task,
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { NextRequest } from 'next/server';
2+
import { z } from 'zod';
3+
4+
type ConnectAbortHandler = () => void;
5+
type ConnectAbortersByThread = Map<string, Set<ConnectAbortHandler>>;
6+
7+
type RuntimeGlobals = typeof globalThis & {
8+
__copilotkitConnectAbortersByThread?: ConnectAbortersByThread;
9+
};
10+
11+
const disconnectPayloadSchema = z.object({
12+
agentId: z.string().min(1),
13+
threadId: z.string().min(1),
14+
});
15+
16+
function getConnectAbortersByThread(): ConnectAbortersByThread | null {
17+
const runtimeGlobals = globalThis as RuntimeGlobals;
18+
const aborters = runtimeGlobals.__copilotkitConnectAbortersByThread;
19+
if (!aborters) return null;
20+
return aborters;
21+
}
22+
23+
export async function POST(req: NextRequest): Promise<Response> {
24+
let payload: unknown;
25+
try {
26+
payload = await req.json();
27+
} catch {
28+
return Response.json(
29+
{
30+
ok: false,
31+
error: 'Invalid disconnect payload',
32+
},
33+
{ status: 400 },
34+
);
35+
}
36+
37+
const parsed = disconnectPayloadSchema.safeParse(payload);
38+
if (!parsed.success) {
39+
return Response.json(
40+
{
41+
ok: false,
42+
error: 'Invalid disconnect payload',
43+
},
44+
{ status: 400 },
45+
);
46+
}
47+
48+
const { agentId, threadId } = parsed.data;
49+
const connectKey = `${agentId}:${threadId}`;
50+
const abortersByThread = getConnectAbortersByThread();
51+
const aborters = abortersByThread?.get(connectKey);
52+
if (!aborters || aborters.size === 0) {
53+
return Response.json({
54+
ok: true,
55+
abortedCount: 0,
56+
});
57+
}
58+
59+
const handlers = Array.from(aborters);
60+
abortersByThread?.delete(connectKey);
61+
let abortedCount = 0;
62+
for (const abortHandler of handlers) {
63+
try {
64+
abortHandler();
65+
abortedCount += 1;
66+
} catch {
67+
// best-effort disconnect
68+
}
69+
}
70+
71+
return Response.json({
72+
ok: true,
73+
abortedCount,
74+
});
75+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { beforeEach, describe, expect, it, vi } from 'vitest';
2+
3+
import { POST } from './route';
4+
5+
type ConnectAbortHandler = () => void;
6+
type ConnectAbortersByThread = Map<string, Set<ConnectAbortHandler>>;
7+
8+
type RuntimeGlobals = typeof globalThis & {
9+
__copilotkitConnectAbortersByThread?: ConnectAbortersByThread;
10+
};
11+
12+
function getRuntimeGlobals(): RuntimeGlobals {
13+
return globalThis as RuntimeGlobals;
14+
}
15+
16+
describe('/api/agent-disconnect', () => {
17+
beforeEach(() => {
18+
getRuntimeGlobals().__copilotkitConnectAbortersByThread = new Map();
19+
});
20+
21+
it('aborts active connect handlers for the agent/thread key', async () => {
22+
const abortA = vi.fn();
23+
const abortB = vi.fn();
24+
getRuntimeGlobals().__copilotkitConnectAbortersByThread?.set(
25+
'agent-gmx-allora:thread-1',
26+
new Set([abortA, abortB]),
27+
);
28+
29+
const request = new Request('http://localhost/api/agent-disconnect', {
30+
method: 'POST',
31+
headers: { 'content-type': 'application/json' },
32+
body: JSON.stringify({
33+
agentId: 'agent-gmx-allora',
34+
threadId: 'thread-1',
35+
}),
36+
});
37+
38+
const response = await POST(request);
39+
expect(response.status).toBe(200);
40+
expect(await response.json()).toEqual({
41+
ok: true,
42+
abortedCount: 2,
43+
});
44+
expect(abortA).toHaveBeenCalledTimes(1);
45+
expect(abortB).toHaveBeenCalledTimes(1);
46+
});
47+
48+
it('returns 400 for invalid payloads', async () => {
49+
const request = new Request('http://localhost/api/agent-disconnect', {
50+
method: 'POST',
51+
headers: { 'content-type': 'application/json' },
52+
body: JSON.stringify({
53+
agentId: '',
54+
}),
55+
});
56+
57+
const response = await POST(request);
58+
expect(response.status).toBe(400);
59+
expect(await response.json()).toEqual({
60+
ok: false,
61+
error: 'Invalid disconnect payload',
62+
});
63+
});
64+
});

0 commit comments

Comments
 (0)