Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
736 changes: 736 additions & 0 deletions .claude/issue/workflow-deep-analysis.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/global/core/app/jsonschema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const JsonSchemaPropertiesItemSchema = z.object({
not: z.any().optional(), // 不匹配

// 枚举和常量
enum: z.array(z.string()).optional(), // 枚举值
enum: z.array(z.any()).optional(), // 枚举值
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 类型放宽: z.array(z.string())z.array(z.any()) 虽然修复了 enum 值不限于 string 的问题(JSON Schema 的 enum 确实支持任意类型),但 z.any() 完全绕过了运行时校验。

建议使用更精确的联合类型:

enum: z.array(z.union([z.string(), z.number(), z.boolean(), z.null()])).optional(),

const: z.any().optional(), // 常量值

// 字符串约束
Expand Down
4 changes: 3 additions & 1 deletion packages/service/common/logger/sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ export async function createSinks(options: CreateSinksOptions): Promise<CreateSi
timestampStyle: 'reset',

categorySeparator: ':',
timestamp: () => dayjs().format('YYYY-MM-DD HH:mm:ss')
timestamp: () => dayjs().format('YYYY-MM-DD HH:mm:ss'),
// Full depth for nested objects (e.g. Zod errors) in console output
inspectOptions: { depth: 5 }
})
}),
(record) => levelFilter(record, consoleLevel)
Expand Down
171 changes: 107 additions & 64 deletions packages/service/core/workflow/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,16 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
string,
{ node: RuntimeNodeItemType; skippedNodeIdList: Set<string> }
>();
private runningNodeCount = 0;
private maxConcurrency: number;
private resolve: (e: WorkflowQueue) => void;
private processingActive = false; // 标记是否正在处理队列

// Buffer
// 可以根据 nodeId 获取所有的 source 边和 target 边
private edgeIndex = {
bySource: new Map<string, RuntimeEdgeItemType[]>(),
byTarget: new Map<string, RuntimeEdgeItemType[]>()
};

constructor({
maxConcurrency = 10,
Expand All @@ -388,6 +395,20 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
if (!node) return;
this.addSkipNode(node, new Set(skippedNodeIdList));
});

// 一次性构建索引 - O(m)
const filteredEdges = filterWorkflowEdges(runtimeEdges);
filteredEdges.forEach((edge) => {
if (!this.edgeIndex.bySource.has(edge.source)) {
this.edgeIndex.bySource.set(edge.source, []);
}
this.edgeIndex.bySource.get(edge.source)!.push(edge);

if (!this.edgeIndex.byTarget.has(edge.target)) {
this.edgeIndex.byTarget.set(edge.target, []);
}
this.edgeIndex.byTarget.get(edge.target)!.push(edge);
});
}

// Add active node to queue (if already in the queue, it will not be added again)
Expand All @@ -397,50 +418,79 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
}
this.activeRunQueue.add(nodeId);

this.processActiveNode();
// 非递归触发:如果没有正在处理,则启动处理循环
if (!this.processingActive) {
this.startProcessing();
}
}
// Process next active node
private async processActiveNode() {
// Finish
if (this.activeRunQueue.size === 0 && this.runningNodeCount === 0) {
if (isDebugMode) {
// 没有下一个激活节点,说明debug 进入了一个“即将结束”状态。可以开始处理 skip 节点
if (this.debugNextStepRunNodes.length === 0 && this.skipNodeQueue.size > 0) {
this.processSkipNodes();
} else {
this.resolve(this);
}
return;
}

// 如果没有交互响应,则开始处理 skip(交互响应的 skip 需要留给后续处理)
if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) {
this.processSkipNodes();
} else {
this.resolve(this);
}
// 迭代处理队列(替代递归的 processActiveNode)
private async startProcessing() {
// 防止重复启动
if (this.processingActive) {
return;
}

// Over max concurrency(如果 this.activeRunQueue.size === 0 条件触发,代表肯定有节点在运行)
if (this.activeRunQueue.size === 0 || this.runningNodeCount >= this.maxConcurrency) {
return;
}
this.processingActive = true;

await surrenderProcess();
const nodeId = this.activeRunQueue.keys().next().value;
const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined;
try {
const runningNodePromises = new Set<Promise<unknown>>();

// 迭代循环替代递归
while (true) {
// 检查结束条件
if (this.activeRunQueue.size === 0 && runningNodePromises.size === 0) {
if (isDebugMode) {
// 没有下一个激活节点,说明debug 进入了一个”即将结束”状态。可以开始处理 skip 节点
if (this.debugNextStepRunNodes.length === 0 && this.skipNodeQueue.size > 0) {
await this.processSkipNodes();
continue;
} else {
this.resolve(this);
break;
}
}

if (nodeId) {
this.activeRunQueue.delete(nodeId);
}
if (node) {
this.runningNodeCount++;
// 如果没有交互响应,则开始处理 skip(交互响应的 skip 需要留给后续处理)
if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) {
await this.processSkipNodes();
continue;
} else {
this.resolve(this);
break;
}
}

this.checkNodeCanRun(node).finally(() => {
this.runningNodeCount--;
this.processActiveNode();
});
// 检查并发限制
if (this.activeRunQueue.size === 0 || runningNodePromises.size >= this.maxConcurrency) {
if (runningNodePromises.size > 0) {
// 当上一个节点运行结束时,立即运行下一轮
await Promise.race(runningNodePromises);
} else {
// 理论上不应出现此情况,防御性退回到让出进程
await surrenderProcess();
}
continue;
}

// 处理下一个节点
const nodeId = this.activeRunQueue.keys().next().value;
const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined;

if (nodeId) {
this.activeRunQueue.delete(nodeId);
}

if (node) {
// 不再递归调用,异步执行节点(不等待完成)
const nodePromise: Promise<unknown> = this.checkNodeCanRun(node).finally(() => {
runningNodePromises.delete(nodePromise);
});
runningNodePromises.add(nodePromise);
}
}
} finally {
this.processingActive = false;
}
}

Expand All @@ -453,17 +503,16 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR

this.skipNodeQueue.set(node.nodeId, { node, skippedNodeIdList: concatSkippedNodeIdList });
}

// 迭代处理 skip 节点(每次只处理一个,然后返回主循环检查 active)
private async processSkipNodes() {
// 取一个 node,并且从队列里删除
await surrenderProcess();
const skipItem = this.skipNodeQueue.values().next().value;
if (skipItem) {
this.skipNodeQueue.delete(skipItem.node.nodeId);
this.checkNodeCanRun(skipItem.node, skipItem.skippedNodeIdList).finally(() => {
this.processActiveNode();
await this.checkNodeCanRun(skipItem.node, skipItem.skippedNodeIdList).catch((error) => {
logger.error('Workflow skip node run error', { error, nodeName: skipItem.node.name });
});
} else {
this.processActiveNode();
}
}

Expand Down Expand Up @@ -579,7 +628,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
// run module
const dispatchRes: NodeResponseType = await (async () => {
if (callbackMap[node.flowNodeType]) {
const targetEdges = runtimeEdges.filter((item) => item.source === node.nodeId);
const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || [];
const errorHandleId = getHandleId(node.nodeId, 'source_catch', 'right');

try {
Expand Down Expand Up @@ -848,9 +897,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
// Get next source edges and update status
const skipHandleId = result[DispatchNodeResponseKeyEnum.skipHandleId] || [];

const targetEdges = filterWorkflowEdges(runtimeEdges).filter(
(item) => item.source === node.nodeId
);
const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || [];

// update edge status
targetEdges.forEach((edge) => {
Expand All @@ -864,23 +911,20 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
// 同时可以去重
const nextStepActiveNodesMap = new Map<string, RuntimeNodeItemType>();
const nextStepSkipNodesMap = new Map<string, RuntimeNodeItemType>();
runtimeNodes.forEach((node) => {
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'active')) {
nextStepActiveNodesMap.set(node.nodeId, node);
}
if (
targetEdges.some((item) => item.target === node.nodeId && item.status === 'skipped')
) {
nextStepSkipNodesMap.set(node.nodeId, node);
targetEdges.forEach((edge) => {
const targetNode = this.runtimeNodesMap.get(edge.target);
if (!targetNode) return;

if (edge.status === 'active') {
nextStepActiveNodesMap.set(targetNode.nodeId, targetNode);
} else if (edge.status === 'skipped') {
nextStepSkipNodesMap.set(targetNode.nodeId, targetNode);
}
});

const nextStepActiveNodes = Array.from(nextStepActiveNodesMap.values());
const nextStepSkipNodes = Array.from(nextStepSkipNodesMap.values());

return {
nextStepActiveNodes,
nextStepSkipNodes
nextStepActiveNodes: Array.from(nextStepActiveNodesMap.values()),
nextStepSkipNodes: Array.from(nextStepSkipNodesMap.values())
};
};

Expand All @@ -900,11 +944,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
return;
}

logger.debug('Run workflow node', {
maxRunTimes: data.maxRunTimes,
appId: data.runningAppInfo.id
});

// Get node run status by edges
const status = checkNodeRunStatus({
nodesMap: this.runtimeNodesMap,
Expand Down Expand Up @@ -1111,6 +1150,10 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
});

const workflowQueue = await new Promise<WorkflowQueue>((resolve) => {
logger.info('Workflow run start', {
maxRunTimes: data.maxRunTimes,
appId: data.runningAppInfo.id
});
const workflowQueue = new WorkflowQueue({
resolve,
defaultSkipNodeQueue: data.lastInteractive?.skipNodeQueue || data.defaultSkipNodeQueue
Expand Down
1 change: 1 addition & 0 deletions packages/web/i18n/en/app.json
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@
"publish_channel.wecom.empty": "Publish to WeCom bot. Please <a>bind a custom domain</a> and complete domain verification first.",
"publish_success": "Publish Successful",
"question_guide_tip": "After the conversation, 3 guiding questions will be generated for you.",
"raw_params": "original parameters",
"reasoning_response": "Output thinking",
"recharge": "Go to recharge",
"reference_variable": "Reference variables",
Expand Down
1 change: 1 addition & 0 deletions packages/web/i18n/zh-CN/app.json
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@
"publish_channel.wecom.empty": "发布到企业微信机器人,请先 <a>绑定自定义域名</a>,并且通过域名校验。",
"publish_success": "发布成功",
"question_guide_tip": "对话结束后,会为你生成 3 个引导性问题。",
"raw_params": "原始参数",
"reasoning_response": "输出思考",
"recharge": "去充值",
"reference_variable": "引用变量",
Expand Down
1 change: 1 addition & 0 deletions packages/web/i18n/zh-Hant/app.json
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@
"publish_channel": "發布通道",
"publish_success": "發布成功",
"question_guide_tip": "對話結束後,會為你產生 3 個引導性問題。",
"raw_params": "原始參數",
"reasoning_response": "輸出思考",
"recharge": "去充值",
"reference_variable": "引用變量",
Expand Down
2 changes: 1 addition & 1 deletion projects/app/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "app",
"version": "4.14.8.1",
"version": "4.14.8.4",
"private": false,
"scripts": {
"dev": "npm run build:workers && next dev",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const getFlattenedErrorKeys = (errors: any, prefix = ''): string[] => {
const LabelAndFormRender = ({
label,
required,
description,
placeholder,
inputType,
showValueType,
Expand All @@ -40,6 +41,7 @@ const LabelAndFormRender = ({
}: {
label: string | React.ReactNode;
required?: boolean;
description?: string;
placeholder?: string;
showValueType?: boolean;
form: UseFormReturn<any>;
Expand All @@ -57,7 +59,7 @@ const LabelAndFormRender = ({
<Box _notLast={{ mb: 4 }}>
<Flex alignItems={'center'} mb={1}>
{typeof label === 'string' ? <FormLabel required={required}>{t(label)}</FormLabel> : label}
{placeholder && <QuestionTip ml={1} label={placeholder} />}
{description && <QuestionTip ml={1} label={description} />}
</Flex>

<Controller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const VariableInputForm = ({
{...item}
isUnChange={isUnChange}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type, item.valueType)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down Expand Up @@ -150,7 +150,7 @@ const VariableInputForm = ({
{...item}
isUnChange={isUnChange}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type, item.valueType)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down Expand Up @@ -192,7 +192,7 @@ const VariableInputForm = ({
{...item}
isUnChange={isUnChange}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
bg={'myGray.50'}
form={variablesForm}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const ChatHomeVariablesForm = ({ chatForm }: Props) => {
{...item}
key={item.key}
fieldName={`variables.${item.key}`}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type, item.valueType)}
form={variablesForm}
bg={'myGray.50'}
Expand All @@ -63,7 +63,7 @@ const ChatHomeVariablesForm = ({ chatForm }: Props) => {
{...item}
key={item.key}
fieldName={`variables.${item.key}`}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
bg={'myGray.50'}
Expand Down
Loading
Loading