Skip to content

Commit 801c748

Browse files
committed
fix(agents): stabilize onboarding state transitions and command routing
1 parent 5f8d81d commit 801c748

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1548
-384
lines changed

typescript/clients/web-ag-ui/apps/agent-clmm/src/agent.ts

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { pathToFileURL } from 'node:url';
22

33
import { END, InMemoryStore, START, StateGraph } from '@langchain/langgraph';
4-
import { projectCycleCommandThread } from 'agent-workflow-core';
54
import { v7 as uuidv7 } from 'uuid';
65
import { privateKeyToAccount } from 'viem/accounts';
76
import { z } from 'zod';
@@ -138,47 +137,6 @@ async function parseJsonResponse<T>(response: Response, schema: z.ZodSchema<T>):
138137
return schema.parse(payload);
139138
}
140139

141-
type ThreadStateValues = Record<string, unknown>;
142-
143-
const isRecord = (value: unknown): value is Record<string, unknown> =>
144-
typeof value === 'object' && value !== null && !Array.isArray(value);
145-
146-
function extractThreadStateValues(payload: unknown): ThreadStateValues | null {
147-
if (!isRecord(payload)) {
148-
return null;
149-
}
150-
151-
const values = payload['values'];
152-
if (isRecord(values)) {
153-
return values;
154-
}
155-
156-
const state = payload['state'];
157-
if (isRecord(state)) {
158-
return state;
159-
}
160-
161-
const data = payload['data'];
162-
if (isRecord(data)) {
163-
return data;
164-
}
165-
166-
if (isRecord(payload['thread'])) {
167-
return payload;
168-
}
169-
170-
return null;
171-
}
172-
173-
async function fetchThreadStateValues(
174-
baseUrl: string,
175-
threadId: string,
176-
): Promise<ThreadStateValues | null> {
177-
const response = await fetch(`${baseUrl}/threads/${threadId}/state`);
178-
const payload = await parseJsonResponse(response, z.unknown());
179-
return extractThreadStateValues(payload);
180-
}
181-
182140
async function ensureThread(baseUrl: string, threadId: string, graphId: string) {
183141
const metadata = { graph_id: graphId };
184142
const response = await fetch(`${baseUrl}/threads`, {
@@ -197,24 +155,11 @@ async function ensureThread(baseUrl: string, threadId: string, graphId: string)
197155
}
198156

199157
async function updateCycleState(baseUrl: string, threadId: string, runMessage: { id: string; role: 'user'; content: string }) {
200-
let existingThread: Record<string, unknown> | null = null;
201-
try {
202-
const currentState = await fetchThreadStateValues(baseUrl, threadId);
203-
if (currentState && isRecord(currentState['thread'])) {
204-
existingThread = currentState['thread'];
205-
}
206-
} catch (error: unknown) {
207-
const message =
208-
error instanceof Error ? error.message : typeof error === 'string' ? error : 'Unknown error';
209-
console.warn('[cron] Unable to fetch thread state before cycle update', { threadId, error: message });
210-
}
211-
212-
const thread = projectCycleCommandThread(existingThread);
213158
const response = await fetch(`${baseUrl}/threads/${threadId}/state`, {
214159
method: 'POST',
215160
headers: { 'Content-Type': 'application/json' },
216161
body: JSON.stringify({
217-
values: { messages: [runMessage], thread },
162+
values: { messages: [runMessage] },
218163
as_node: 'runCommand',
219164
}),
220165
});
@@ -300,10 +245,11 @@ export async function runGraphOnce(
300245
const startedAt = Date.now();
301246
console.info(`[cron] Starting CLMM graph run via API (thread=${threadId})`);
302247

248+
const clientMutationId = uuidv7();
303249
const runMessage = {
304250
id: uuidv7(),
305251
role: 'user' as const,
306-
content: JSON.stringify({ command: 'cycle' }),
252+
content: JSON.stringify({ command: 'cycle', clientMutationId }),
307253
};
308254

309255
const baseUrl = resolveLangGraphDeploymentUrl();
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
2+
3+
import { runGraphOnce } from './agent.js';
4+
5+
const ORIGINAL_FETCH = globalThis.fetch;
6+
7+
function jsonResponse(payload: unknown, status = 200): Response {
8+
return new Response(JSON.stringify(payload), {
9+
status,
10+
headers: { 'Content-Type': 'application/json' },
11+
});
12+
}
13+
14+
describe('runGraphOnce cron state update payload', () => {
15+
beforeEach(() => {
16+
process.env['A2A_TEST_AGENT_NODE_PRIVATE_KEY'] ??= `0x${'1'.repeat(64)}`;
17+
});
18+
19+
afterEach(() => {
20+
globalThis.fetch = ORIGINAL_FETCH;
21+
vi.restoreAllMocks();
22+
});
23+
24+
it('does not replay thread state into the cycle command update payload', async () => {
25+
const threadId = 'thread-1';
26+
const fetchMock = vi.fn();
27+
globalThis.fetch = fetchMock as unknown as typeof fetch;
28+
29+
fetchMock.mockResolvedValueOnce(jsonResponse({ thread_id: threadId }));
30+
fetchMock.mockResolvedValueOnce(jsonResponse({ thread_id: threadId }));
31+
32+
let stateUpdateBody: Record<string, unknown> | undefined;
33+
fetchMock.mockImplementationOnce((_url: string | URL | globalThis.Request, init?: RequestInit) => {
34+
const requestBody = typeof init?.body === 'string' ? init.body : '{}';
35+
stateUpdateBody = JSON.parse(requestBody) as Record<string, unknown>;
36+
return jsonResponse({ checkpoint_id: 'cp-1' });
37+
});
38+
39+
fetchMock.mockResolvedValueOnce(jsonResponse({ run_id: 'run-1', status: 'running' }));
40+
fetchMock.mockResolvedValueOnce(new Response(null, { status: 200 }));
41+
fetchMock.mockResolvedValueOnce(jsonResponse({ run_id: 'run-1', status: 'success' }));
42+
43+
await runGraphOnce(threadId);
44+
45+
expect(stateUpdateBody).toBeDefined();
46+
expect(stateUpdateBody).toMatchObject({
47+
as_node: 'runCommand',
48+
values: {
49+
messages: [expect.objectContaining({ role: 'user' })],
50+
},
51+
});
52+
expect((stateUpdateBody?.['values'] as Record<string, unknown>) ?? {}).not.toHaveProperty('thread');
53+
54+
const calledUrls = fetchMock.mock.calls
55+
.map(([url]) => String(url))
56+
.filter((url) => url.includes(`/threads/${threadId}/state`));
57+
expect(calledUrls).toEqual([`http://localhost:8124/threads/${threadId}/state`]);
58+
});
59+
});

typescript/clients/web-ag-ui/apps/agent-clmm/src/hire.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,11 @@ export async function startClmmHire(
8484
operatorInput: OperatorConfigInput,
8585
options?: { durability?: LangGraphDurability },
8686
) {
87+
const clientMutationId = uuidv7();
8788
const hireMessage = {
8889
id: uuidv7(),
8990
role: 'user' as const,
90-
content: JSON.stringify({ command: 'hire' }),
91+
content: JSON.stringify({ command: 'hire', clientMutationId }),
9192
};
9293

9394
type ClmmInvokeInput = Parameters<typeof clmmGraph.invoke>[0];

0 commit comments

Comments
 (0)