Skip to content

Commit 2fc1f5e

Browse files
committed
Plugin: add monitor channels for cross-thread activity
1 parent ea1c417 commit 2fc1f5e

File tree

10 files changed

+699
-6
lines changed

10 files changed

+699
-6
lines changed

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { INTERACTIVE_NAMESPACE } from "./src/types.js";
44

55
const COMMANDS = [
66
["codex_resume", "Resume or bind an existing Codex thread."],
7+
["codex_monitor", "Monitor cross-thread approvals, prompts, and unread activity."],
78
["codex_detach", "Detach this conversation from the current Codex thread."],
89
["codex_status", "Show the current Codex binding and thread state."],
910
["codex_stop", "Stop the active Codex turn."],

src/client.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,39 @@ describe("extractThreadTokenUsageSnapshot", () => {
218218
remainingPercent: 80,
219219
});
220220
});
221+
222+
it("extracts thread status and active flags from thread list payloads", () => {
223+
expect(
224+
__testing.extractThreadsFromValue({
225+
threads: [
226+
{
227+
id: "thread-1",
228+
name: "Needs approval",
229+
cwd: "/repo/openclaw",
230+
updatedAt: 1_710_000_000,
231+
status: {
232+
type: "active",
233+
activeFlags: ["waitingOnApproval", "waitingOnUserInput"],
234+
},
235+
},
236+
],
237+
}),
238+
).toEqual([
239+
{
240+
threadId: "thread-1",
241+
title: "Needs approval",
242+
summary: "",
243+
projectKey: "/repo/openclaw",
244+
createdAt: undefined,
245+
updatedAt: 1_710_000_000_000,
246+
gitBranch: undefined,
247+
status: {
248+
type: "active",
249+
activeFlags: ["waitingOnApproval", "waitingOnUserInput"],
250+
},
251+
},
252+
]);
253+
});
221254
});
222255

223256
describe("extractFileChangePathsFromReadResult", () => {

src/client.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import type {
2121
ReviewResult,
2222
ReviewTarget,
2323
SkillSummary,
24+
ThreadActiveFlag,
2425
ThreadReplay,
2526
ThreadState,
27+
ThreadStatusSummary,
2628
ThreadSummary,
2729
TurnTerminalError,
2830
TurnResult,
@@ -1188,13 +1190,51 @@ function extractThreadsFromValue(value: unknown): ThreadSummary[] {
11881190
pickString(asRecord(record.git_info) ?? {}, ["branch"]) ??
11891191
pickString(asRecord(sessionRecord?.gitInfo) ?? {}, ["branch"]) ??
11901192
pickString(asRecord(sessionRecord?.git_info) ?? {}, ["branch"]),
1193+
status: extractThreadStatus(record) ?? extractThreadStatus(sessionRecord),
11911194
});
11921195
}
11931196
return [...summaries.values()].sort(
11941197
(left, right) => (right.updatedAt ?? 0) - (left.updatedAt ?? 0),
11951198
);
11961199
}
11971200

1201+
function normalizeThreadActiveFlag(value: unknown): ThreadActiveFlag | undefined {
1202+
if (typeof value !== "string") {
1203+
return undefined;
1204+
}
1205+
const normalized = value.trim();
1206+
return normalized === "waitingOnApproval" || normalized === "waitingOnUserInput"
1207+
? normalized
1208+
: undefined;
1209+
}
1210+
1211+
function extractThreadStatus(value: unknown): ThreadStatusSummary | undefined {
1212+
const record = asRecord(asRecord(value)?.status ?? value);
1213+
if (!record) {
1214+
const statusType = typeof value === "string" ? value.trim() : "";
1215+
if (
1216+
statusType === "notLoaded" ||
1217+
statusType === "idle" ||
1218+
statusType === "systemError"
1219+
) {
1220+
return { type: statusType };
1221+
}
1222+
return undefined;
1223+
}
1224+
const type = pickString(record, ["type", "status", "kind"]);
1225+
if (type === "notLoaded" || type === "idle" || type === "systemError") {
1226+
return { type };
1227+
}
1228+
if (type === "active") {
1229+
const rawFlags = findFirstArrayByKeys(record, ["activeFlags", "active_flags"]) ?? [];
1230+
const activeFlags = rawFlags
1231+
.map((entry) => normalizeThreadActiveFlag(entry))
1232+
.filter((entry): entry is ThreadActiveFlag => Boolean(entry));
1233+
return { type, activeFlags };
1234+
}
1235+
return undefined;
1236+
}
1237+
11981238
function normalizeConversationRole(value: string | undefined): "user" | "assistant" | undefined {
11991239
const normalized = value?.trim().toLowerCase();
12001240
if (normalized === "user" || normalized === "usermessage") {
@@ -2500,6 +2540,10 @@ export class CodexAppServerClient {
25002540
await connection?.client.close().catch(() => undefined);
25012541
}
25022542

2543+
onNotification(listener: (method: string, params: unknown) => Promise<void> | void): () => void {
2544+
return this.addNotificationListener(listener);
2545+
}
2546+
25032547
async listThreads(params: {
25042548
sessionKey?: string;
25052549
workspaceDir?: string;
@@ -3558,6 +3602,7 @@ export const __testing = {
35583602
extractStartupProbeInfo,
35593603
formatFileEditNotice,
35603604
extractThreadTokenUsageSnapshot,
3605+
extractThreadsFromValue,
35613606
extractRateLimitSummaries,
35623607
formatStdioProcessLog,
35633608
resolveTurnStoppedReason,

src/controller.test.ts

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
55
import type { OpenClawPluginApi, PluginCommandContext } from "openclaw/plugin-sdk";
66
import { CodexAppServerClient } from "./client.js";
77
import { CodexPluginController } from "./controller.js";
8+
import type { ThreadSummary } from "./types.js";
89

910
function makeStateDir(): string {
1011
return fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-app-server-test-"));
@@ -102,13 +103,16 @@ async function createControllerHarness() {
102103
const controller = new CodexPluginController(api);
103104
await controller.start();
104105
const clientMock = {
105-
listThreads: vi.fn(async () => [
106+
listThreads: vi.fn(async (): Promise<ThreadSummary[]> => [
106107
{
107108
threadId: "thread-1",
108109
title: "Discord Thread",
109110
projectKey: "/repo/openclaw",
110111
createdAt: Date.now() - 60_000,
111112
updatedAt: Date.now() - 30_000,
113+
status: {
114+
type: "idle",
115+
},
112116
},
113117
]),
114118
listModels: vi.fn(async () => [
@@ -591,6 +595,138 @@ describe("Discord controller flows", () => {
591595
})).toBeNull();
592596
});
593597

598+
it("enables codex_monitor and reports cross-thread activity", async () => {
599+
const { controller, clientMock } = await createControllerHarness();
600+
clientMock.listThreads.mockResolvedValue([
601+
{
602+
threadId: "thread-approval",
603+
title: "Approve deploy",
604+
projectKey: "/repo/openclaw",
605+
updatedAt: Date.now() - 60_000,
606+
status: {
607+
type: "active",
608+
activeFlags: ["waitingOnApproval"],
609+
},
610+
},
611+
{
612+
threadId: "thread-unread",
613+
title: "Fresh output",
614+
projectKey: "/repo/openclaw",
615+
updatedAt: Date.now() - 30_000,
616+
status: {
617+
type: "idle",
618+
},
619+
},
620+
]);
621+
622+
const reply = await controller.handleCommand(
623+
"codex_monitor",
624+
buildDiscordCommandContext({
625+
commandBody: "/codex_monitor",
626+
}),
627+
);
628+
629+
expect(reply.text).toContain("Monitor: active");
630+
expect(reply.text).toContain("Pending approvals:");
631+
expect(reply.text).toContain("Unread activity:");
632+
expect((controller as any).store.getMonitorBinding({
633+
channel: "discord",
634+
accountId: "default",
635+
conversationId: "channel:chan-1",
636+
})).toEqual(
637+
expect.objectContaining({
638+
workspaceDir: undefined,
639+
}),
640+
);
641+
});
642+
643+
it("detaches monitor bindings with codex_detach", async () => {
644+
const { controller } = await createControllerHarness();
645+
await (controller as any).store.upsertMonitorBinding({
646+
conversation: {
647+
channel: "discord",
648+
accountId: "default",
649+
conversationId: "channel:chan-1",
650+
},
651+
updatedAt: Date.now(),
652+
});
653+
654+
const reply = await controller.handleCommand(
655+
"codex_detach",
656+
buildDiscordCommandContext({
657+
commandBody: "/codex_detach",
658+
}),
659+
);
660+
661+
expect(reply).toEqual({
662+
text: "Detached this conversation from Codex.",
663+
});
664+
expect((controller as any).store.getMonitorBinding({
665+
channel: "discord",
666+
accountId: "default",
667+
conversationId: "channel:chan-1",
668+
})).toBeNull();
669+
});
670+
671+
it("does not claim inbound messages for monitor-only conversations", async () => {
672+
const { controller } = await createControllerHarness();
673+
await (controller as any).store.upsertMonitorBinding({
674+
conversation: {
675+
channel: "discord",
676+
accountId: "default",
677+
conversationId: "channel:chan-1",
678+
},
679+
updatedAt: Date.now(),
680+
});
681+
682+
const result = await controller.handleInboundClaim({
683+
content: "hello",
684+
channel: "discord",
685+
accountId: "default",
686+
conversationId: "discord:channel:chan-1",
687+
metadata: { guildId: "guild-1" },
688+
});
689+
690+
expect(result).toEqual({ handled: false });
691+
});
692+
693+
it("dedupes unchanged monitor refresh summaries", async () => {
694+
const { controller, clientMock, sendMessageDiscord } = await createControllerHarness();
695+
await (controller as any).store.upsertMonitorBinding({
696+
conversation: {
697+
channel: "discord",
698+
accountId: "default",
699+
conversationId: "channel:chan-1",
700+
},
701+
updatedAt: Date.now(),
702+
});
703+
clientMock.listThreads.mockResolvedValue([
704+
{
705+
threadId: "thread-unread",
706+
title: "Fresh output",
707+
projectKey: "/repo/openclaw",
708+
updatedAt: Date.now() - 30_000,
709+
status: {
710+
type: "idle",
711+
},
712+
},
713+
]);
714+
715+
await (controller as any).refreshMonitorBindings({ force: true });
716+
await (controller as any).refreshMonitorBindings();
717+
718+
expect(sendMessageDiscord).toHaveBeenCalledTimes(1);
719+
expect((controller as any).store.getMonitorBinding({
720+
channel: "discord",
721+
accountId: "default",
722+
conversationId: "channel:chan-1",
723+
})).toEqual(
724+
expect.objectContaining({
725+
lastSummarySignature: expect.stringContaining("Unread activity:"),
726+
}),
727+
);
728+
});
729+
594730
it("shows plan mode on in codex_status when the bound conversation has an active plan run", async () => {
595731
const { controller } = await createControllerHarness();
596732
await (controller as any).store.upsertBinding({

0 commit comments

Comments
 (0)