Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
736 changes: 736 additions & 0 deletions .claude/issue/workflow-deep-analysis.md

Large diffs are not rendered by default.

744 changes: 744 additions & 0 deletions .claude/issue/workflow-memory-cpu-analysis.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/global/core/app/jsonschema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export const JsonSchemaPropertiesItemSchema = z.object({
not: z.any().optional(), // 不匹配

// 枚举和常量
enum: z.array(z.string()).optional(), // 枚举值
enum: z.array(z.any()).optional(), // 枚举值
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 类型放宽: z.array(z.string())z.array(z.any()) 虽然修复了 enum 值不限于 string 的问题(JSON Schema 的 enum 确实支持任意类型),但 z.any() 完全绕过了运行时校验。

建议使用更精确的联合类型:

enum: z.array(z.union([z.string(), z.number(), z.boolean(), z.null()])).optional(),

const: z.any().optional(), // 常量值

// 字符串约束
Expand Down
4 changes: 3 additions & 1 deletion packages/service/common/logger/sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ export async function createSinks(options: CreateSinksOptions): Promise<CreateSi
timestampStyle: 'reset',

categorySeparator: ':',
timestamp: () => dayjs().format('YYYY-MM-DD HH:mm:ss')
timestamp: () => dayjs().format('YYYY-MM-DD HH:mm:ss'),
// Full depth for nested objects (e.g. Zod errors) in console output
inspectOptions: { depth: Infinity }
})
}),
(record) => levelFilter(record, consoleLevel)
Expand Down
123 changes: 77 additions & 46 deletions packages/service/core/workflow/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
private runningNodeCount = 0;
private maxConcurrency: number;
private resolve: (e: WorkflowQueue) => void;
private processingActive = false; // 标记是否正在处理队列

constructor({
maxConcurrency = 10,
Expand Down Expand Up @@ -397,50 +398,82 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
}
this.activeRunQueue.add(nodeId);

this.processActiveNode();
// 非递归触发:如果没有正在处理,则启动处理循环
if (!this.processingActive) {
this.startProcessing();
}
}
// Process next active node
private async processActiveNode() {
// Finish
if (this.activeRunQueue.size === 0 && this.runningNodeCount === 0) {
if (isDebugMode) {
// 没有下一个激活节点,说明debug 进入了一个“即将结束”状态。可以开始处理 skip 节点
if (this.debugNextStepRunNodes.length === 0 && this.skipNodeQueue.size > 0) {
this.processSkipNodes();
} else {
this.resolve(this);
}
return;
}

// 如果没有交互响应,则开始处理 skip(交互响应的 skip 需要留给后续处理)
if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) {
this.processSkipNodes();
} else {
this.resolve(this);
}
// 迭代处理队列(替代递归的 processActiveNode)
private async startProcessing() {
// 防止重复启动
if (this.processingActive) {
return;
}

// Over max concurrency(如果 this.activeRunQueue.size === 0 条件触发,代表肯定有节点在运行)
if (this.activeRunQueue.size === 0 || this.runningNodeCount >= this.maxConcurrency) {
return;
}
this.processingActive = true;

await surrenderProcess();
const nodeId = this.activeRunQueue.keys().next().value;
const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined;
try {
const runningNodePromises = new Set<Promise<unknown>>();

// 迭代循环替代递归
while (true) {
// 检查结束条件
if (this.activeRunQueue.size === 0 && this.runningNodeCount === 0) {
if (isDebugMode) {
// 没有下一个激活节点,说明debug 进入了一个”即将结束”状态。可以开始处理 skip 节点
if (this.debugNextStepRunNodes.length === 0 && this.skipNodeQueue.size > 0) {
await this.processSkipNodes();
continue;
} else {
this.resolve(this);
break;
}
}

if (nodeId) {
this.activeRunQueue.delete(nodeId);
}
if (node) {
this.runningNodeCount++;
// 如果没有交互响应,则开始处理 skip(交互响应的 skip 需要留给后续处理)
if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) {
await this.processSkipNodes();
continue;
} else {
this.resolve(this);
break;
}
}

this.checkNodeCanRun(node).finally(() => {
this.runningNodeCount--;
this.processActiveNode();
});
// 检查并发限制
if (this.activeRunQueue.size === 0 || this.runningNodeCount >= this.maxConcurrency) {
if (runningNodePromises.size > 0) {
await Promise.race(runningNodePromises);
} else {
// 理论上不应出现此情况,防御性退回到让出进程
await surrenderProcess();
}
continue;
}

// 处理下一个节点
const nodeId = this.activeRunQueue.keys().next().value;
const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined;

if (nodeId) {
this.activeRunQueue.delete(nodeId);
}

if (node) {
this.runningNodeCount++;

// 不再递归调用,异步执行节点(不等待完成)
let nodePromise: Promise<unknown>;
nodePromise = this.checkNodeCanRun(node).finally(() => {
this.runningNodeCount--;
runningNodePromises.delete(nodePromise);
});
runningNodePromises.add(nodePromise);
}
}
} finally {
this.processingActive = false;
}
}

Expand All @@ -453,17 +486,16 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR

this.skipNodeQueue.set(node.nodeId, { node, skippedNodeIdList: concatSkippedNodeIdList });
}

// 迭代处理 skip 节点(每次只处理一个,然后返回主循环检查 active)
private async processSkipNodes() {
// 取一个 node,并且从队列里删除
await surrenderProcess();
const skipItem = this.skipNodeQueue.values().next().value;
if (skipItem) {
this.skipNodeQueue.delete(skipItem.node.nodeId);
this.checkNodeCanRun(skipItem.node, skipItem.skippedNodeIdList).finally(() => {
this.processActiveNode();
await this.checkNodeCanRun(skipItem.node, skipItem.skippedNodeIdList).catch((error) => {
logger.error('Workflow skip node run error', { error });
});
} else {
this.processActiveNode();
}
}

Expand Down Expand Up @@ -900,11 +932,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
return;
}

logger.debug('Run workflow node', {
maxRunTimes: data.maxRunTimes,
appId: data.runningAppInfo.id
});

// Get node run status by edges
const status = checkNodeRunStatus({
nodesMap: this.runtimeNodesMap,
Expand Down Expand Up @@ -1111,6 +1138,10 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
});

const workflowQueue = await new Promise<WorkflowQueue>((resolve) => {
logger.info('Workflow run start', {
maxRunTimes: data.maxRunTimes,
appId: data.runningAppInfo.id
});
const workflowQueue = new WorkflowQueue({
resolve,
defaultSkipNodeQueue: data.lastInteractive?.skipNodeQueue || data.defaultSkipNodeQueue
Expand Down
2 changes: 1 addition & 1 deletion projects/app/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "app",
"version": "4.14.8.1",
"version": "4.14.8.4",
"private": false,
"scripts": {
"dev": "npm run build:workers && next dev",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const getFlattenedErrorKeys = (errors: any, prefix = ''): string[] => {
const LabelAndFormRender = ({
label,
required,
description,
placeholder,
inputType,
showValueType,
Expand All @@ -40,6 +41,7 @@ const LabelAndFormRender = ({
}: {
label: string | React.ReactNode;
required?: boolean;
description?: string;
placeholder?: string;
showValueType?: boolean;
form: UseFormReturn<any>;
Expand All @@ -57,7 +59,7 @@ const LabelAndFormRender = ({
<Box _notLast={{ mb: 4 }}>
<Flex alignItems={'center'} mb={1}>
{typeof label === 'string' ? <FormLabel required={required}>{t(label)}</FormLabel> : label}
{placeholder && <QuestionTip ml={1} label={placeholder} />}
{description && <QuestionTip ml={1} label={description} />}
</Flex>

<Controller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ const VariableInputForm = ({
{...item}
isUnChange={isUnChange}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type, item.valueType)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down Expand Up @@ -150,7 +150,7 @@ const VariableInputForm = ({
{...item}
isUnChange={isUnChange}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type, item.valueType)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down Expand Up @@ -192,7 +192,7 @@ const VariableInputForm = ({
{...item}
isUnChange={isUnChange}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
bg={'myGray.50'}
form={variablesForm}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const ChatHomeVariablesForm = ({ chatForm }: Props) => {
{...item}
key={item.key}
fieldName={`variables.${item.key}`}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type, item.valueType)}
form={variablesForm}
bg={'myGray.50'}
Expand All @@ -63,7 +63,7 @@ const ChatHomeVariablesForm = ({ chatForm }: Props) => {
{...item}
key={item.key}
fieldName={`variables.${item.key}`}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
bg={'myGray.50'}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const VariablePopover = ({ chatType }: { chatType: ChatTypeEnum }) => {
<LabelAndFormRender
{...item}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down Expand Up @@ -137,7 +137,7 @@ const VariablePopover = ({ chatType }: { chatType: ChatTypeEnum }) => {
<LabelAndFormRender
{...item}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand All @@ -156,7 +156,7 @@ const VariablePopover = ({ chatType }: { chatType: ChatTypeEnum }) => {
<LabelAndFormRender
{...item}
key={item.key}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ const ChatTest = ({
inputType={inputType}
fieldName={paramName}
form={form}
placeholder={paramName}
description={paramName}
/>
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ const ChatTest = ({
inputType={inputType}
form={form}
fieldName={paramName}
placeholder={paramInfo.description}
bg={'myGray.50'}
description={paramInfo.description}
/>
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ export const useDebug = () => {
key={item.key}
label={item.label}
required={item.required}
placeholder={t(item.placeholder || item.description)}
description={t(item.placeholder || item.description)}
inputType={nodeInputTypeToInputType(item.renderTypeList)}
form={variablesForm}
fieldName={`nodeVariables.${item.key}`}
Expand All @@ -284,7 +284,7 @@ export const useDebug = () => {
key={item.key}
label={item.label}
required={item.required}
placeholder={t(item.description)}
description={t(item.description)}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand All @@ -297,7 +297,7 @@ export const useDebug = () => {
key={item.key}
label={item.label}
required={item.required}
placeholder={t(item.description)}
description={t(item.description)}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand All @@ -310,7 +310,7 @@ export const useDebug = () => {
key={item.key}
label={item.label}
required={item.required}
placeholder={item.description}
description={item.description}
inputType={variableInputTypeToInputType(item.type)}
form={variablesForm}
fieldName={`variables.${item.key}`}
Expand Down
Loading