Skip to content
52 changes: 47 additions & 5 deletions src/agent/openclaw/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ export class OpenClawAgent {
private currentStreamMsgId: string | null = null;
private accumulatedAssistantText = '';

// When true, all chat.event processing is skipped to prevent duplicate output
// during channel turns (Feishu/Telegram/DingTalk). Agent events handle rendering.
private channelTurnActive = false;

private readonly onStreamEvent: (data: IResponseMessage) => void;
private readonly onSignalEvent?: (data: IResponseMessage) => void;
private readonly onSessionKeyUpdate?: (sessionKey: string) => void;
Expand Down Expand Up @@ -210,6 +214,7 @@ export class OpenClawAgent {
}

// Reset streaming state for new message
this.channelTurnActive = false;
this.currentStreamMsgId = null;
this.accumulatedAssistantText = '';
this.adapter.resetMessageTracking();
Expand Down Expand Up @@ -261,6 +266,42 @@ export class OpenClawAgent {
return Promise.resolve({ success: true, data: null });
}

/**
* Send a message from an external channel (Feishu/Telegram) using a separate
* gateway session key to avoid rs_ 404 errors with the main AionUI session.
* Gateway broadcasts the response to all WebSocket clients, so the main
* OpenClawAgentManager automatically receives and renders the reply.
*/
async sendChannelMessage(content: string): Promise<AcpResult> {
try {
if (!this.connection?.isConnected) {
await this.start();
}

// Block all chat.event processing for the entire channel turn.
// Gateway sends both agent.event and chat.event for the same response;
// without this flag, handleEndTurn (from agent lifecycle end) resets
// currentStreamMsgId, allowing late-arriving chat.event deltas to
// produce duplicate content in AionUI.
this.channelTurnActive = true;
this.currentStreamMsgId = uuid();
this.accumulatedAssistantText = '';

await this.connection!.chatSend({
sessionKey: 'channel',
message: content,
});

return { success: true, data: null };
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
return {
success: false,
error: createAcpError(AcpErrorType.UNKNOWN, errorMsg, false),
};
}
}

/**
* Kill the agent (compatibility method)
*/
Expand Down Expand Up @@ -348,10 +389,9 @@ export class OpenClawAgent {
}

private handleChatEvent(event: ChatEvent): void {
// Skip delta processing when handleAgentEvent is already handling the assistant stream
// This prevents duplicate messages with different msg_ids
if (event.state === 'delta' && this.currentStreamMsgId) {
// Agent stream is active, skip to avoid duplicate content
// Skip delta processing when agent.event is already handling the assistant stream,
// or during channel turns where late-arriving chat.event deltas must be blocked.
if (event.state === 'delta' && (this.currentStreamMsgId || this.channelTurnActive)) {
return;
}

Expand Down Expand Up @@ -551,7 +591,9 @@ export class OpenClawAgent {
}

private handleEndTurn(): void {
// Reset streaming state for next turn
// Always reset streaming state so the next turn gets a fresh msg_id.
// Late chat.event deltas during channel turns are blocked by the
// channelTurnActive flag in handleChatEvent, not by currentStreamMsgId.
this.currentStreamMsgId = null;
this.accumulatedAssistantText = '';

Expand Down
19 changes: 13 additions & 6 deletions src/channels/agent/ChannelMessageService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import WorkerManage from '@/process/WorkerManage';
import { getDatabase } from '@/process/database';
import type BaseAgentManager from '@/process/task/BaseAgentManager';
import type OpenClawAgentManager from '@/process/task/OpenClawAgentManager';
import { composeMessage, transformMessage, type TMessage } from '../../common/chatLib';
import { uuid } from '../../common/utils';
import { channelEventBus, type IAgentMessageEvent } from './ChannelEventBus';
Expand Down Expand Up @@ -158,16 +159,20 @@ export class ChannelMessageService {
// 获取任务
// Get task
let task: BaseAgentManager<unknown>;
const db = getDatabase();
const dbResult = db.getConversation(conversationId);
let isOpenClaw = false;

try {
// 检查会话来源,如果来自 Channel 则开启 yoloMode (自动同意)
// Check conversation source, enable yoloMode if it's from a Channel
const db = getDatabase();
const dbResult = db.getConversation(conversationId);
const isFromChannel = dbResult.success && (dbResult.data?.source === 'lark' || dbResult.data?.source === 'telegram' || dbResult.data?.source === 'dingtalk');

task = await WorkerManage.getTaskByIdRollbackBuild(conversationId, {
yoloMode: isFromChannel,
});

isOpenClaw = task.type === 'openclaw-gateway';
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Failed to get conversation task';
console.error(`[ChannelMessageService] Failed to get task:`, errorMsg);
Expand Down Expand Up @@ -199,11 +204,13 @@ export class ChannelMessageService {
finishCount: 0,
});

// Build payload based on agent type.
// Gemini expects { input }, ACP/Codex expect { content }.
const payload: { input?: string; content?: string; msg_id: string } = task.type === 'gemini' ? { input: message, msg_id: msgId } : task.type === 'acp' || task.type === 'codex' ? { content: message, msg_id: msgId } : { content: message, msg_id: msgId };
// Send message based on agent type.
// OpenClaw: use sendChannelMessage (separate session key to avoid rs_ 404).
// Gateway broadcasts the response to all WebSocket clients, so the main
// OpenClawAgentManager receives and renders it automatically via channelEventBus.
const sendPromise = isOpenClaw ? (task as OpenClawAgentManager).sendChannelMessage({ content: message, msg_id: msgId }) : task.sendMessage(task.type === 'gemini' ? { input: message, msg_id: msgId } : { content: message, msg_id: msgId });

task.sendMessage(payload).catch((error: Error) => {
sendPromise.catch((error: Error) => {
const errorMessage = `Error: ${error.message || 'Failed to send message'}`;
console.error(`[ChannelMessageService] Send error:`, error);
onStream({ type: 'tips', id: uuid(), conversation_id: conversationId, content: { type: 'error', content: errorMessage } }, true);
Expand Down
9 changes: 9 additions & 0 deletions src/channels/gateway/ActionExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import type { TMessage } from '@/common/chatLib';
import { getDatabase } from '@/process/database';
import { ProcessConfig } from '@/process/initStorage';
import WorkerManage from '@/process/WorkerManage';
import { ConversationService } from '@/process/services/conversationService';
import { buildChatErrorResponse, chatActions } from '../actions/ChatActions';
import { handlePairingShow, platformActions } from '../actions/PlatformActions';
Expand Down Expand Up @@ -342,6 +343,14 @@ export class ActionExecutor {

// Get or create session (scoped by chatId for per-chat isolation)
let session = this.sessionManager.getSession(channelUser.id, chatId);

// When an active OpenClaw session exists in AionUI, ALWAYS route channel
// messages to it (even if a cached session points to a different conversation).
const activeOpenClawTask = WorkerManage.listTasks().find((t) => t.type === 'openclaw-gateway');
if (activeOpenClawTask && session?.conversationId !== activeOpenClawTask.id) {
session = this.sessionManager.createSessionWithConversation(channelUser, activeOpenClawTask.id, 'acp', undefined, chatId);
}

if (!session || !session.conversationId) {
const source = platform === 'lark' ? 'lark' : platform === 'dingtalk' ? 'dingtalk' : 'telegram';

Expand Down
43 changes: 42 additions & 1 deletion src/process/task/OpenClawAgentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ class OpenClawAgentManager extends BaseAgentManager<OpenClawAgentManagerData> {
ipcBridge.conversation.responseStream.emit(msg);

// Emit to Channel global event bus (Telegram/Lark streaming)
channelEventBus.emitAgentMessage(this.conversation_id, msg);
// Skip user_content to avoid echoing the channel user's own message back
if (msg.type !== 'user_content') {
channelEventBus.emitAgentMessage(this.conversation_id, msg);
}
}

private handleSignalEvent(message: IResponseMessage): void {
Expand Down Expand Up @@ -201,6 +204,44 @@ class OpenClawAgentManager extends BaseAgentManager<OpenClawAgentManagerData> {
}
}

/**
* Send a message from an external channel (Feishu/Telegram) using a separate
* gateway session key to avoid rs_ 404 errors with the main AionUI session.
* Gateway broadcasts the response to all WebSocket clients, so the main
* OpenClawAgentManager automatically receives and renders the reply.
*/
async sendChannelMessage(data: { content: string; msg_id?: string }): Promise<void> {
cronBusyGuard.setProcessing(this.conversation_id, true);
this.status = 'running';
try {
await this.bootstrap;

// Route user message through handleStreamEvent — the same proven code path
// used by AI responses. handleStreamEvent handles DB persistence and IPC
// emission to both openclawConversation and conversation streams.
if (data.msg_id && data.content) {
this.handleStreamEvent({
type: 'user_content',
conversation_id: this.conversation_id,
msg_id: data.msg_id,
data: data.content,
});
}

const result = await this.agent.sendChannelMessage(data.content);
if (result.success === false) {
throw new Error(result.error.message || 'Failed to send channel message');
}
} catch (error) {
cronBusyGuard.setProcessing(this.conversation_id, false);
this.status = 'finished';

const errorMsg = error instanceof Error ? error.message : String(error);
this.emitErrorMessage(`Failed to send channel message: ${errorMsg}`);
throw error;
}
}

async confirm(id: string, callId: string, data: string) {
super.confirm(id, callId, data);
await this.bootstrap;
Expand Down
Loading