Skip to content

Commit 5e01e7c

Browse files
committed
fix: handle Lark live status edit cap and post markdown messages
Prevent status updates from stalling once Lark hits its 20-edit limit by rotating to a fresh message ID, and switch Lark text payloads to post content so markdown renders consistently.
1 parent d1667ec commit 5e01e7c

File tree

8 files changed

+138
-34
lines changed

8 files changed

+138
-34
lines changed

packages/core/runtime.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ export function createCoreRuntime(deps: RuntimeDeps) {
148148
const finalChunks = splitResultMessage(text);
149149
const singleChunk = finalChunks[0] ?? text;
150150
const statusRateLimited = runtimeDeps.im.wasRateLimited?.(channelId, statusTs) ?? false;
151+
const statusRateLimitError = runtimeDeps.im.getRateLimitError?.(channelId, statusTs);
151152

152153
if (finalChunks.length > 1) {
153154
if (statusFormat !== "aggressive" && !statusRateLimited) {
@@ -157,6 +158,7 @@ export function createCoreRuntime(deps: RuntimeDeps) {
157158
channelId,
158159
threadId,
159160
statusTs,
161+
...(statusRateLimitError ? { error: statusRateLimitError } : {}),
160162
});
161163
}
162164

@@ -176,6 +178,7 @@ export function createCoreRuntime(deps: RuntimeDeps) {
176178
channelId,
177179
threadId,
178180
statusTs,
181+
...(statusRateLimitError ? { error: statusRateLimitError } : {}),
179182
});
180183
await runtimeDeps.im.sendMessage(channelId, threadId, singleChunk, true);
181184
return;

packages/core/runtime/event-stream.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
clearPendingQuestion,
33
getPendingQuestion,
44
setPendingQuestion,
5+
updateActiveRequest,
56
type ActiveRequest,
67
type TrackedTodo,
78
type TrackedTool,
@@ -159,7 +160,7 @@ export async function startEventStreamWatcher(
159160
onUpdate();
160161

161162
void (async () => {
162-
await deps.im.updateMessage(
163+
const updatedStatusTs = await deps.im.updateMessage(
163164
request.channelId,
164165
request.statusMessageTs,
165166
buildStatusMessageForAgent({
@@ -171,6 +172,10 @@ export async function startEventStreamWatcher(
171172
}),
172173
false
173174
);
175+
if (typeof updatedStatusTs === "string" && updatedStatusTs !== request.statusMessageTs) {
176+
request.statusMessageTs = updatedStatusTs;
177+
updateActiveRequest(request.channelId, request.threadId, { statusMessageTs: updatedStatusTs });
178+
}
174179
setPendingQuestion(request.channelId, request.threadId, {
175180
requestId,
176181
sessionId: properties.sessionID ?? request.sessionId,

packages/core/runtime/message-updates.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ type QueuedUpdate = {
77
messageTs: string;
88
text: string;
99
asMarkdown: boolean;
10-
resolve: () => void;
10+
resolve: (messageTs?: string) => void;
1111
};
1212

1313
function isRateLimitError(error: unknown): boolean {
@@ -23,6 +23,7 @@ export function createRateLimitedImAdapter(
2323
const queue: QueuedUpdate[] = [];
2424
let processing = false;
2525
const rateLimitedMessages = new Set<string>();
26+
const rateLimitErrors = new Map<string, string>();
2627

2728
function key(channelId: string, messageTs: string): string {
2829
return `${channelId}:${messageTs}`;
@@ -43,10 +44,13 @@ export function createRateLimitedImAdapter(
4344

4445
globalLastUpdateAt = Date.now();
4546
try {
46-
await im.updateMessage(item.channelId, item.messageTs, item.text, item.asMarkdown);
47+
const maybeUpdatedTs = await im.updateMessage(item.channelId, item.messageTs, item.text, item.asMarkdown);
48+
item.resolve(typeof maybeUpdatedTs === "string" ? maybeUpdatedTs : undefined);
4749
} catch (error) {
4850
if (isRateLimitError(error)) {
49-
rateLimitedMessages.add(key(item.channelId, item.messageTs));
51+
const rateLimitKey = key(item.channelId, item.messageTs);
52+
rateLimitedMessages.add(rateLimitKey);
53+
rateLimitErrors.set(rateLimitKey, String(error));
5054
log.warn("IM message update hit rate limit (429)", {
5155
channelId: item.channelId,
5256
messageTs: item.messageTs,
@@ -58,9 +62,8 @@ export function createRateLimitedImAdapter(
5862
messageTs: item.messageTs,
5963
error: String(error),
6064
});
65+
item.resolve();
6166
}
62-
63-
item.resolve();
6467
}
6568

6669
processing = false;
@@ -74,12 +77,19 @@ export function createRateLimitedImAdapter(
7477
}
7578
return rateLimitedMessages.has(key(channelId, messageTs));
7679
},
80+
getRateLimitError: (channelId: string, messageTs: string): string | undefined => {
81+
if (typeof im.getRateLimitError === "function") {
82+
const upstream = im.getRateLimitError(channelId, messageTs);
83+
if (upstream) return upstream;
84+
}
85+
return rateLimitErrors.get(key(channelId, messageTs));
86+
},
7787
updateMessage: async (
7888
channelId: string,
7989
messageTs: string,
8090
text: string,
8191
asMarkdown = true
82-
): Promise<void> => {
92+
): Promise<string | undefined> => {
8393
for (let i = queue.length - 1; i >= 0; i--) {
8494
const queued = queue[i];
8595
if (queued && queued.channelId === channelId && queued.messageTs === messageTs) {
@@ -88,7 +98,7 @@ export function createRateLimitedImAdapter(
8898
}
8999
}
90100

91-
return new Promise<void>((resolve) => {
101+
return new Promise<string | undefined>((resolve) => {
92102
queue.push({ channelId, messageTs, text, asMarkdown, resolve });
93103
void processQueue();
94104
});

packages/core/runtime/open-request.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,18 @@ export async function runOpenRequest(params: {
5757

5858
const providerLabel = deps.agent.getDisplayNameForSession(sessionId);
5959

60-
const statusTs = await deps.im.sendMessage(
60+
const initialStatusTs = await deps.im.sendMessage(
6161
context.channelId,
6262
context.replyThreadId,
6363
`${providerLabel} is running...`,
6464
false
6565
);
6666

67-
if (!statusTs) {
67+
if (!initialStatusTs) {
6868
log.error("Failed to send status message");
6969
return null;
7070
}
71+
let statusTs = initialStatusTs;
7172

7273
const request = createActiveRequest(
7374
sessionId,
@@ -97,7 +98,6 @@ export async function runOpenRequest(params: {
9798
const result = await runTrackedRequest({
9899
deps,
99100
request,
100-
statusTs,
101101
workingPath: cwd,
102102
stateMachine,
103103
liveEventHistory,
@@ -126,9 +126,14 @@ export async function runOpenRequest(params: {
126126
statusMessageFormat: resolveStatusMessageFormat(),
127127
});
128128
if (!request.statusFrozen) {
129-
await deps.im.updateMessage(context.channelId, statusTs, statusText, false);
129+
const updatedStatusTs = await deps.im.updateMessage(context.channelId, statusTs, statusText, false);
130+
if (typeof updatedStatusTs === "string" && updatedStatusTs !== statusTs) {
131+
statusTs = updatedStatusTs;
132+
request.statusMessageTs = updatedStatusTs;
133+
}
130134
}
131135
updateActiveRequest(context.channelId, context.threadId, {
136+
statusMessageTs: request.statusMessageTs,
132137
currentText: request.currentText,
133138
tools: request.tools,
134139
todos: request.todos,

packages/core/runtime/request-runner.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ type RunnerDeps = {
1515
export type RunTrackedRequestParams = {
1616
deps: RunnerDeps;
1717
request: ActiveRequest;
18-
statusTs: string;
1918
workingPath: string;
2019
stateMachine: CoreStateMachine;
2120
liveEventHistory: Map<string, SessionEvent[]>;
@@ -43,7 +42,6 @@ export async function runTrackedRequest(
4342
const {
4443
deps,
4544
request,
46-
statusTs,
4745
workingPath,
4846
stateMachine,
4947
liveEventHistory,
@@ -161,7 +159,7 @@ export async function runTrackedRequest(
161159
liveParsedState.delete(getStatusMessageKey(request));
162160

163161
const errorStatus = `Error: ${message}\n_${suggestion}_`;
164-
await deps.im.updateMessage(request.channelId, statusTs, errorStatus, false);
162+
await deps.im.updateMessage(request.channelId, request.statusMessageTs, errorStatus, false);
165163
onFail(message);
166164
return { responses: null };
167165
}

packages/core/runtime/selection-reply.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
loadSession,
77
markMessageProcessed,
88
saveSession,
9+
updateActiveRequest,
910
} from "@/config/local/sessions";
1011
import { resolveStatusMessageFormat } from "@/config";
1112
import { buildMessageOptions } from "@/core/runtime/message-options";
@@ -72,11 +73,12 @@ export async function handleSelectionReply(params: HandleSelectionReplyParams):
7273
const providerId = deps.agent.getProviderForSession(sessionId);
7374
const providerLabel = deps.agent.getDisplayNameForSession(sessionId);
7475

75-
const statusTs = await deps.im.sendMessage(channelId, threadId, `${providerLabel} is running...`, false);
76-
if (!statusTs) {
76+
const initialStatusTs = await deps.im.sendMessage(channelId, threadId, `${providerLabel} is running...`, false);
77+
if (!initialStatusTs) {
7778
log.error("Failed to send status message for button selection");
7879
return;
7980
}
81+
let statusTs = initialStatusTs;
8082

8183
const request = createActiveRequest(sessionId, channelId, threadId, threadId, statusTs, selection);
8284
const statusMessageKey = getStatusMessageKey(request);
@@ -119,7 +121,6 @@ export async function handleSelectionReply(params: HandleSelectionReplyParams):
119121
await runTrackedRequest({
120122
deps,
121123
request,
122-
statusTs,
123124
workingPath: cwd,
124125
stateMachine: getStateMachine({ channelId, threadId }),
125126
liveEventHistory: state.liveEventHistory,
@@ -141,7 +142,12 @@ export async function handleSelectionReply(params: HandleSelectionReplyParams):
141142
state: state.liveParsedState.get(statusMessageKey),
142143
statusMessageFormat: resolveStatusMessageFormat(),
143144
});
144-
await deps.im.updateMessage(channelId, statusTs, statusText, false);
145+
const updatedStatusTs = await deps.im.updateMessage(channelId, statusTs, statusText, false);
146+
if (typeof updatedStatusTs === "string" && updatedStatusTs !== statusTs) {
147+
statusTs = updatedStatusTs;
148+
request.statusMessageTs = updatedStatusTs;
149+
updateActiveRequest(channelId, threadId, { statusMessageTs: updatedStatusTs });
150+
}
145151
},
146152
onComplete: () => {
147153
completeActiveRequest(channelId, threadId);

packages/core/types.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,14 @@ export type AgentStatusMessageParams = {
5252
export interface IMAdapter {
5353
maxEditableMessageChars?: number;
5454
sendMessage(channelId: string, threadId: string, text: string, asMarkdown?: boolean): Promise<string | undefined>;
55-
updateMessage(channelId: string, messageTs: string, text: string, asMarkdown?: boolean): Promise<void>;
55+
updateMessage(
56+
channelId: string,
57+
messageTs: string,
58+
text: string,
59+
asMarkdown?: boolean
60+
): Promise<string | undefined | void>;
5661
wasRateLimited?(channelId: string, messageTs: string): boolean;
62+
getRateLimitError?(channelId: string, messageTs: string): string | undefined;
5763
deleteMessage(channelId: string, messageTs: string): Promise<void>;
5864
fetchThreadHistory(channelId: string, threadId: string, messageId: string): Promise<string | null>;
5965
buildAgentContext(params: AgentContextBuilderParams): Promise<OpenCodeMessageContext>;

0 commit comments

Comments
 (0)