Skip to content

Commit 0d4d107

Browse files
committed
Plugin: add monitor channels for cross-thread activity
1 parent 37f2590 commit 0d4d107

File tree

12 files changed

+705
-7
lines changed

12 files changed

+705
-7
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ Pre-release packages are published on matching npm dist-tags instead of `latest`
4040

4141
- Uses your existing local Codex CLI setup instead of a separate hosted bridge.
4242
- Feels natural in chat: bind once with `/cas_resume`, then just talk.
43-
- Keeps useful controls close at hand with `/cas_status`, `/cas_plan`, `/cas_review`, `/cas_model`, and more.
43+
- Keeps useful controls close at hand with `/cas_status`, `/cas_monitor`, `/cas_plan`, `/cas_review`, `/cas_model`, and more.
4444
- Works well for Telegram and Discord conversations that you want tied to a real Codex thread.
4545

4646
## Typical Workflow
@@ -62,6 +62,9 @@ Pre-release packages are published on matching npm dist-tags instead of `latest`
6262
| `/cas_resume --sync` | Resume and try to sync the chat/topic name to the Codex thread. | You can combine this with other flags. |
6363
| `/cas_resume release-fix` | Resume a matching thread by title or id. | If more than one thread matches, you get buttons to choose. |
6464
| `/cas_status` | Show the current binding and thread state. | Includes thread id, model, workspace, sandbox, and permissions when available. |
65+
| `/cas_monitor` | Bind this conversation as a cross-thread monitor surface. | Shows pending approvals, pending questions, and unread thread activity across sessions. |
66+
| `/cas_monitor --cwd ~/github/openclaw` | Limit monitor mode to one workspace. | Uses the same `--cwd` path handling as `/cas_resume`. |
67+
| `/cas_monitor status|off` | Inspect or disable monitor mode. | `off` is also compatible with `/cas_detach`. |
6568
| `/cas_detach` | Unbind this conversation from Codex. | Stops routing plain text from this conversation into the bound thread. |
6669
| `/cas_stop` | Interrupt the active Codex run. | Only applies when a turn is currently in progress. |
6770
| `/cas_steer <message>` | Send follow-up steer text to an active run. | Example: `/cas_steer focus on the failing tests first` |

index.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ describe("plugin registration", () => {
3838
expect(api.registerCommand).toHaveBeenCalled();
3939
expect(api.registerCommand.mock.calls.map(([params]) => params.name)).toEqual([
4040
"cas_resume",
41+
"cas_monitor",
4142
"cas_detach",
4243
"cas_status",
4344
"cas_stop",

index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { INTERACTIVE_NAMESPACE } from "./src/types.js";
44

55
const COMMANDS = [
66
["cas_resume", "Resume or bind an existing Codex thread."],
7+
["cas_monitor", "Monitor cross-thread approvals, prompts, and unread activity."],
78
["cas_detach", "Detach this conversation from the current Codex thread."],
89
["cas_status", "Show the current Codex binding and thread state."],
910
["cas_stop", "Stop the active Codex turn."],

src/client.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,39 @@ describe("extractThreadTokenUsageSnapshot", () => {
261261
remainingPercent: 80,
262262
});
263263
});
264+
265+
it("extracts thread status and active flags from thread list payloads", () => {
266+
expect(
267+
__testing.extractThreadsFromValue({
268+
threads: [
269+
{
270+
id: "thread-1",
271+
name: "Needs approval",
272+
cwd: "/repo/openclaw",
273+
updatedAt: 1_710_000_000,
274+
status: {
275+
type: "active",
276+
activeFlags: ["waitingOnApproval", "waitingOnUserInput"],
277+
},
278+
},
279+
],
280+
}),
281+
).toEqual([
282+
{
283+
threadId: "thread-1",
284+
title: "Needs approval",
285+
summary: "",
286+
projectKey: "/repo/openclaw",
287+
createdAt: undefined,
288+
updatedAt: 1_710_000_000_000,
289+
gitBranch: undefined,
290+
status: {
291+
type: "active",
292+
activeFlags: ["waitingOnApproval", "waitingOnUserInput"],
293+
},
294+
},
295+
]);
296+
});
264297
});
265298

266299
describe("extractFileChangePathsFromReadResult", () => {

src/client.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import type {
2121
ReviewResult,
2222
ReviewTarget,
2323
SkillSummary,
24+
ThreadActiveFlag,
2425
ThreadReplay,
2526
ThreadState,
27+
ThreadStatusSummary,
2628
ThreadSummary,
2729
TurnTerminalError,
2830
TurnResult,
@@ -1217,13 +1219,51 @@ function extractThreadsFromValue(value: unknown): ThreadSummary[] {
12171219
pickString(asRecord(record.git_info) ?? {}, ["branch"]) ??
12181220
pickString(asRecord(sessionRecord?.gitInfo) ?? {}, ["branch"]) ??
12191221
pickString(asRecord(sessionRecord?.git_info) ?? {}, ["branch"]),
1222+
status: extractThreadStatus(record) ?? extractThreadStatus(sessionRecord),
12201223
});
12211224
}
12221225
return [...summaries.values()].sort(
12231226
(left, right) => (right.updatedAt ?? 0) - (left.updatedAt ?? 0),
12241227
);
12251228
}
12261229

1230+
function normalizeThreadActiveFlag(value: unknown): ThreadActiveFlag | undefined {
1231+
if (typeof value !== "string") {
1232+
return undefined;
1233+
}
1234+
const normalized = value.trim();
1235+
return normalized === "waitingOnApproval" || normalized === "waitingOnUserInput"
1236+
? normalized
1237+
: undefined;
1238+
}
1239+
1240+
function extractThreadStatus(value: unknown): ThreadStatusSummary | undefined {
1241+
const record = asRecord(asRecord(value)?.status ?? value);
1242+
if (!record) {
1243+
const statusType = typeof value === "string" ? value.trim() : "";
1244+
if (
1245+
statusType === "notLoaded" ||
1246+
statusType === "idle" ||
1247+
statusType === "systemError"
1248+
) {
1249+
return { type: statusType };
1250+
}
1251+
return undefined;
1252+
}
1253+
const type = pickString(record, ["type", "status", "kind"]);
1254+
if (type === "notLoaded" || type === "idle" || type === "systemError") {
1255+
return { type };
1256+
}
1257+
if (type === "active") {
1258+
const rawFlags = findFirstArrayByKeys(record, ["activeFlags", "active_flags"]) ?? [];
1259+
const activeFlags = rawFlags
1260+
.map((entry) => normalizeThreadActiveFlag(entry))
1261+
.filter((entry): entry is ThreadActiveFlag => Boolean(entry));
1262+
return { type, activeFlags };
1263+
}
1264+
return undefined;
1265+
}
1266+
12271267
function normalizeConversationRole(value: string | undefined): "user" | "assistant" | undefined {
12281268
const normalized = value?.trim().toLowerCase();
12291269
if (normalized === "user" || normalized === "usermessage") {
@@ -2529,6 +2569,10 @@ export class CodexAppServerClient {
25292569
await connection?.client.close().catch(() => undefined);
25302570
}
25312571

2572+
onNotification(listener: (method: string, params: unknown) => Promise<void> | void): () => void {
2573+
return this.addNotificationListener(listener);
2574+
}
2575+
25322576
async listThreads(params: {
25332577
sessionKey?: string;
25342578
workspaceDir?: string;
@@ -3618,6 +3662,7 @@ export const __testing = {
36183662
extractStartupProbeInfo,
36193663
formatFileEditNotice,
36203664
extractThreadTokenUsageSnapshot,
3665+
extractThreadsFromValue,
36213666
extractRateLimitSummaries,
36223667
formatStdioProcessLog,
36233668
resolveTurnStoppedReason,

src/controller.test.ts

Lines changed: 137 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
55
import type { OpenClawPluginApi, PluginCommandContext } from "openclaw/plugin-sdk";
66
import { CodexAppServerClient } from "./client.js";
77
import { CodexPluginController } from "./controller.js";
8+
import type { ThreadSummary } from "./types.js";
89

910
const discordSdkState = vi.hoisted(() => ({
1011
buildDiscordComponentMessage: vi.fn((params: { spec: { text?: string; blocks?: unknown[] } }) => ({
@@ -123,13 +124,16 @@ async function createControllerHarness() {
123124
const controller = new CodexPluginController(api);
124125
await controller.start();
125126
const clientMock = {
126-
listThreads: vi.fn(async () => [
127+
listThreads: vi.fn(async (): Promise<ThreadSummary[]> => [
127128
{
128129
threadId: "thread-1",
129130
title: "Discord Thread",
130131
projectKey: "/repo/openclaw",
131132
createdAt: Date.now() - 60_000,
132133
updatedAt: Date.now() - 30_000,
134+
status: {
135+
type: "idle",
136+
},
133137
},
134138
]),
135139
listModels: vi.fn(async () => [
@@ -899,6 +903,138 @@ describe("Discord controller flows", () => {
899903
})).toBeNull();
900904
});
901905

906+
it("enables cas_monitor and reports cross-thread activity", async () => {
907+
const { controller, clientMock } = await createControllerHarness();
908+
clientMock.listThreads.mockResolvedValue([
909+
{
910+
threadId: "thread-approval",
911+
title: "Approve deploy",
912+
projectKey: "/repo/openclaw",
913+
updatedAt: Date.now() - 60_000,
914+
status: {
915+
type: "active",
916+
activeFlags: ["waitingOnApproval"],
917+
},
918+
},
919+
{
920+
threadId: "thread-unread",
921+
title: "Fresh output",
922+
projectKey: "/repo/openclaw",
923+
updatedAt: Date.now() - 30_000,
924+
status: {
925+
type: "idle",
926+
},
927+
},
928+
]);
929+
930+
const reply = await controller.handleCommand(
931+
"cas_monitor",
932+
buildDiscordCommandContext({
933+
commandBody: "/cas_monitor",
934+
}),
935+
);
936+
937+
expect(reply.text).toContain("Monitor: active");
938+
expect(reply.text).toContain("Pending approvals:");
939+
expect(reply.text).toContain("Unread activity:");
940+
expect((controller as any).store.getMonitorBinding({
941+
channel: "discord",
942+
accountId: "default",
943+
conversationId: "channel:chan-1",
944+
})).toEqual(
945+
expect.objectContaining({
946+
workspaceDir: undefined,
947+
}),
948+
);
949+
});
950+
951+
it("detaches monitor bindings with cas_detach", async () => {
952+
const { controller } = await createControllerHarness();
953+
await (controller as any).store.upsertMonitorBinding({
954+
conversation: {
955+
channel: "discord",
956+
accountId: "default",
957+
conversationId: "channel:chan-1",
958+
},
959+
updatedAt: Date.now(),
960+
});
961+
962+
const reply = await controller.handleCommand(
963+
"cas_detach",
964+
buildDiscordCommandContext({
965+
commandBody: "/cas_detach",
966+
}),
967+
);
968+
969+
expect(reply).toEqual({
970+
text: "Detached this conversation from Codex.",
971+
});
972+
expect((controller as any).store.getMonitorBinding({
973+
channel: "discord",
974+
accountId: "default",
975+
conversationId: "channel:chan-1",
976+
})).toBeNull();
977+
});
978+
979+
it("does not claim inbound messages for monitor-only conversations", async () => {
980+
const { controller } = await createControllerHarness();
981+
await (controller as any).store.upsertMonitorBinding({
982+
conversation: {
983+
channel: "discord",
984+
accountId: "default",
985+
conversationId: "channel:chan-1",
986+
},
987+
updatedAt: Date.now(),
988+
});
989+
990+
const result = await controller.handleInboundClaim({
991+
content: "hello",
992+
channel: "discord",
993+
accountId: "default",
994+
conversationId: "discord:channel:chan-1",
995+
metadata: { guildId: "guild-1" },
996+
});
997+
998+
expect(result).toEqual({ handled: false });
999+
});
1000+
1001+
it("dedupes unchanged monitor refresh summaries", async () => {
1002+
const { controller, clientMock, sendMessageDiscord } = await createControllerHarness();
1003+
await (controller as any).store.upsertMonitorBinding({
1004+
conversation: {
1005+
channel: "discord",
1006+
accountId: "default",
1007+
conversationId: "channel:chan-1",
1008+
},
1009+
updatedAt: Date.now(),
1010+
});
1011+
clientMock.listThreads.mockResolvedValue([
1012+
{
1013+
threadId: "thread-unread",
1014+
title: "Fresh output",
1015+
projectKey: "/repo/openclaw",
1016+
updatedAt: Date.now() - 30_000,
1017+
status: {
1018+
type: "idle",
1019+
},
1020+
},
1021+
]);
1022+
1023+
await (controller as any).refreshMonitorBindings({ force: true });
1024+
await (controller as any).refreshMonitorBindings();
1025+
1026+
expect(sendMessageDiscord).toHaveBeenCalledTimes(1);
1027+
expect((controller as any).store.getMonitorBinding({
1028+
channel: "discord",
1029+
accountId: "default",
1030+
conversationId: "channel:chan-1",
1031+
})).toEqual(
1032+
expect.objectContaining({
1033+
lastSummarySignature: expect.stringContaining("Unread activity:"),
1034+
}),
1035+
);
1036+
});
1037+
9021038
it("shows plan mode on in cas_status when the bound conversation has an active plan run", async () => {
9031039
const { controller } = await createControllerHarness();
9041040
await (controller as any).store.upsertBinding({

0 commit comments

Comments
 (0)