diff --git a/packages/components/nodes/tools/ParallelSubflows/ParallelSubflows.ts b/packages/components/nodes/tools/ParallelSubflows/ParallelSubflows.ts new file mode 100644 index 00000000000..4d81892830f --- /dev/null +++ b/packages/components/nodes/tools/ParallelSubflows/ParallelSubflows.ts @@ -0,0 +1,301 @@ +// packages/components/nodes/tools/ParallelSubflows/ParallelSubflows.ts +import { ParallelSubflowsTool, type ParallelSubflowsConfig } from './core' +import type { INode, INodeData, INodeParams, ICommonObject, IDatabaseEntity } from '../../../src/Interface' +import { getVars, prepareSandboxVars } from '../../../src/utils' +import type { DataSource } from 'typeorm' + +function substituteVariablesInObject(obj: any, sandbox: any): any { + if (typeof obj === 'string') return substituteVariablesInString(obj, sandbox) + if (Array.isArray(obj)) return obj.map((v) => substituteVariablesInObject(v, sandbox)) + if (obj && typeof obj === 'object') { + const out: any = {} + for (const [k, v] of Object.entries(obj)) out[k] = substituteVariablesInObject(v, sandbox) + return out + } + return obj +} + +function substituteVariablesInString(str: string, sandbox: any): string { + return String(str).replace(/\{\{\$([a-zA-Z_][\w]*(?:\.[a-zA-Z_][\w]*)*)\}\}/g, (match, variablePath) => { + try { + const parts = String(variablePath).split('.') + let cur: any = sandbox + for (let i = 0; i < parts.length; i++) { + const p = parts[i] + if (i === 0) { + const topKey = `$${p}` // e.g. $vars + if (cur && topKey in cur) cur = cur[topKey] + else return match + } else { + if (cur && typeof cur === 'object' && p in cur) cur = cur[p] + else return match + } + } + return typeof cur === 'string' ? cur : JSON.stringify(cur) + } catch { + return match + } + }) +} + +// Allows JS-like objects to be parsed to JSON (e.g., unquoted keys, single quotes). +function convertToValidJSONString(inputString: string) { + try { + const obj = Function('return ' + inputString)() + return JSON.stringify(obj, null, 2) + } catch { + return '' + } +} + +/** Accepts rich JSON describing branches and normalizes to array of {flowId,label,...} */ +function normalizeFlowsFreeJson(raw: any): Array<{ + flowId: string + label: string + timeoutMs?: number + apiKey?: string + vars?: Record + questionTemplate?: string +}> { + if (!raw) return [] + + // Strings -> try JSON parse first + if (typeof raw === 'string') { + const s = raw.trim() + if (!s) return [] + try { + return normalizeFlowsFreeJson(JSON.parse(s)) + } catch { + const fixed = convertToValidJSONString(s) + if (fixed) { + try { return normalizeFlowsFreeJson(JSON.parse(fixed)) } catch { /* ignore */ } + } + return [] + } + } + + // Array of strings (flow IDs) + if (Array.isArray(raw) && raw.every((x) => typeof x === 'string')) { + return (raw as string[]).map((flowId, idx) => ({ + flowId: String(flowId), + label: String.fromCharCode(65 + idx) // A,B,C,... + })) + } + + // Array of objects (preferred) + if (Array.isArray(raw) && raw.length && typeof raw[0] === 'object') { + return (raw as any[]).map((b, idx) => ({ + flowId: String(b.flowId ?? b.id ?? ''), + label: String(b.label ?? String.fromCharCode(65 + idx)), + timeoutMs: b.timeoutMs != null ? Number(b.timeoutMs) : undefined, + apiKey: b.apiKey != null ? String(b.apiKey) : undefined, + vars: (b.vars && typeof b.vars === 'object') ? b.vars : undefined, + questionTemplate: b.questionTemplate != null ? String(b.questionTemplate) : undefined + })).filter((b) => b.flowId) + } + + // Map object: { "A": "flowId", "B": { flowId, timeoutMs, ... } } + if (raw && typeof raw === 'object') { + return Object.entries(raw as Record).map(([label, val]) => { + if (typeof val === 'string') { + return { flowId: String(val), label: String(label) } + } + if (val && typeof val === 'object') { + return { + flowId: String(val.flowId ?? val.id ?? ''), + label: String(val.label ?? label), + timeoutMs: val.timeoutMs != null ? Number(val.timeoutMs) : undefined, + apiKey: val.apiKey != null ? String(val.apiKey) : undefined, + vars: (val.vars && typeof val.vars === 'object') ? val.vars : undefined, + questionTemplate: val.questionTemplate != null ? String(val.questionTemplate) : undefined + } + } + return { flowId: '', label: String(label) } + }).filter((b) => b.flowId) + } + + return [] +} + +/** ----- Node class (Tool) ----- */ + +const flowsPlaceholder = `[ + { "flowId": "1480...b584", "label": "A", "timeoutMs": 120000, "vars": { "role": "A" } }, + { "flowId": "69a9...5bc4", "label": "B", "timeoutMs": 120000, "vars": { "role": "B" } }, + { "flowId": "ed78...66c2", "label": "C", "timeoutMs": 180000, "vars": { "role": "C" } } +] +// You can also use a map: +// { "A": "1480...b584", "B": "69a9...5bc4", "C": { "flowId":"ed78...66c2", "timeoutMs": 180000 } } +// +// Supports {{$vars.someVar}} anywhere in this JSON. Example: +// { "A": { "flowId": "1480...b584", "apiKey": "{{$vars.flowKey}}" } }` + +const howToUseFlows = ` +This field accepts FREE JSON describing your branches. Supported shapes: +1) Array of objects: +[ + { "flowId":"...", "label":"A", "timeoutMs":120000, "apiKey":"...", "vars":{...}, "questionTemplate":"..." }, + { "flowId":"...", "label":"B" } +] +2) Array of strings (flow IDs): +["flowIdA","flowIdB","flowIdC"] +3) Map: +{ + "A": "flowIdA", + "B": { "flowId":"flowIdB", "timeoutMs": 180000, "vars": { "role": "B" } } +} + +You can embed {{$vars.NAME}} and they will be substituted from Flowise Variables at runtime. +` + + + +class ParallelSubflows_Tool implements INode { + label = 'Parallel Subflows' + name = 'parallelSubflows' + version = 1.1 + type = 'ParallelSubflows' + icon = 'parallel.svg' + category = 'Tools' + description = 'Fan-out to multiple Chatflows/Agentflows in parallel, wait for all, merge results, with optional timing.' + baseClasses = [this.type, 'Tool'] + inputs?: INodeParams[] + outputs?: INodeParams[] + + constructor() { + this.inputs = [ + { + label: 'Base URL', + name: 'baseUrl', + type: 'string', + default: 'http://localhost:3000/api/v1', + description: 'Flowise API base URL.' + }, + { + label: 'Default API Key', + name: 'defaultApiKey', + type: 'string', + placeholder: 'Optional', + description: 'Bearer key for protected flows (overridable per branch).', + optional: true + }, + { + label: 'Question Template', + name: 'questionTemplate', + type: 'string', + default: '{{input}}', + description: 'Template for the question. Supports {{input}} and {{vars.*}}.' + }, + { + label: 'Flows (Free JSON)', + name: 'flowsJson', + type: 'code', + hideCodeExecute: true, + placeholder: flowsPlaceholder, + hint: { label: 'How to use', value: howToUseFlows } + }, + { + label: 'Max Parallel', + name: 'maxParallel', + type: 'number', + default: 3, + description: 'Cap concurrency. If N > maxParallel, the rest are queued locally.' + }, + { + label: 'Overall Timeout (ms)', + name: 'overallTimeoutMs', + type: 'number', + default: 240000 + }, + { + label: 'Per-Flow Fail Policy', + name: 'failPolicy', + type: 'options', + options: [ + { label: 'continue (collect errors)', name: 'continue' }, + { label: 'fail-fast (cancel others)', name: 'fail-fast' } + ], + default: 'continue' + }, + { + label: 'Return Selection', + name: 'returnSelection', + type: 'options', + options: [ + { label: 'text', name: 'text' }, + { label: 'json', name: 'json' }, + { label: 'full (text/json/sourceDocuments/usedTools/sessionId)', name: 'full' } + ], + default: 'full' + }, + { + label: 'Emit Timing', + name: 'emitTiming', + type: 'boolean', + default: true, + description: 'Adds total/sum/max/speedup + ASCII timeline incl. S=sum.' + } + ] + + this.outputs = [{ label: 'Output', name: 'output', type: 'string' }] + } + + async init(nodeData: INodeData, _?: string, options?: ICommonObject): Promise { + const inputs = nodeData.inputs ?? {} + + const cfg: ParallelSubflowsConfig = { + baseUrl: (inputs.baseUrl as string)?.replace(/\/+$/, '') || 'http://localhost:3000/api/v1', + defaultApiKey: (inputs.defaultApiKey as string) || '', + questionTemplate: (inputs.questionTemplate as string) || '{{input}}', + flows: [], + maxParallel: Number(inputs.maxParallel ?? 3), + overallTimeoutMs: Number(inputs.overallTimeoutMs ?? 240000), + failPolicy: (inputs.failPolicy as any) || 'continue', + returnSelection: (inputs.returnSelection as any) || 'full', + emitTiming: Boolean(inputs.emitTiming ?? true) + } + + // Build sandbox with $vars if present + const flowsJsonRaw = (inputs.flowsJson as string) || '' + let sandbox: Record = {} + + if (flowsJsonRaw.includes('$vars') && options) { + try { + const appDataSource = options.appDataSource as DataSource + const databaseEntities = options.databaseEntities as IDatabaseEntity + const vars = await getVars(appDataSource, databaseEntities, nodeData, options) + sandbox['$vars'] = prepareSandboxVars(vars) + } catch { + // ignore; sandbox stays empty + } + } + + // Substitute vars in string, allow JS-ish JSON to be parsed, then normalize + let flowsStructured: any = flowsJsonRaw + if (typeof flowsStructured === 'string') { + const substituted = substituteVariablesInString(flowsStructured, sandbox) + const jsonish = convertToValidJSONString(substituted) || substituted + try { + flowsStructured = JSON.parse(jsonish) + } catch { + // maybe it was already strict JSON and failed substitution; try raw JSON + try { flowsStructured = JSON.parse(flowsStructured) } catch { flowsStructured = null } + } + } else if (flowsStructured && typeof flowsStructured === 'object') { + flowsStructured = substituteVariablesInObject(flowsStructured, sandbox) + } + + cfg.flows = normalizeFlowsFreeJson(flowsStructured) + + if (!cfg.flows.length) { + throw new Error( + 'ParallelSubflows: No flows configured. Provide free JSON: ' + + 'array of {flowId,label,...}, array of flowId strings, or a map label->config.' + ) + } + + return new ParallelSubflowsTool(cfg) + } +} + +module.exports = { nodeClass: ParallelSubflows_Tool } diff --git a/packages/components/nodes/tools/ParallelSubflows/README.md b/packages/components/nodes/tools/ParallelSubflows/README.md new file mode 100644 index 00000000000..d7e7d744e1b --- /dev/null +++ b/packages/components/nodes/tools/ParallelSubflows/README.md @@ -0,0 +1,193 @@ +# Parallel Subflows (Flowise Node) + +Launch multiple Flowise **subflows in parallel** via `/predict/{id}`, control concurrency, pass branch-specific variables/API keys/timeouts, and return merged results (text/json/full) with optional **timing stats + ASCII timeline** to visualize speedups. + +--- + +## ✨ Features + +- **Parallel execution** of subflows with `maxParallel` limit +- **Flexible “flows” config**: array, map, or strings — the tool normalizes them +- **Per-branch controls**: `timeoutMs`, `vars`, `apiKey`, `questionTemplate` +- **Overall timeout** and **failure policy** (`continue` or `fail-fast`) +- **Return selection**: `text`, `json`, or `full` (text/json/usedTools/sourceDocuments/sessionId) +- **Timing metrics**: total, sum, max, speedup + **ASCII timeline** (1 char = 100ms) +- **Simple templating**: `{{input}}` and `{{var}}` in `questionTemplate` + +--- + +## 📦 Installation + +> Assumes a local Flowise monorepo with packages similar to the standard structure. + +1. Create these folders (if they don’t exist): + +``` +packages/components/nodes/tools/ParallelSubflows/ +packages/components/nodes/utilities/ParallelSubflows/ +``` + +2. Add the files: + +- `ParallelSubflows.ts` → `packages/components/nodes/tools/ParallelSubflows/ParallelSubflows.ts` +- `core.ts` → `packages/components/nodes/utilities/ParallelSubflows/core.ts` +- (Optional) `parallel.svg` in the repo root (used in this README) + +3. Build & run Flowise as you normally do (e.g., `pnpm build` then start the app). +4. In the Flowise UI, add the **Parallel Subflows** node to your flow. +Addional information related to custom nodes in flowise can be found here: https://docs.flowiseai.com/contributing/building-node +--- + +## ⚙️ Inputs (Node Props) + +| Name | Type | Default | Description | +|---|---|---:|---| +| `baseUrl` | string | `http://localhost:3000/api/v1` | Flowise API base URL | +| `defaultApiKey` | string | `''` | API key used if a branch doesn’t specify one | +| `questionTemplate` | string | `{{input}}` | Mustache-like; supports `{{input}}` & variables | +| `flows` | free JSON | — | Rich config of branches (see examples) | +| `maxParallel` | number | `3` | Concurrency limit for parallel calls | +| `overallTimeoutMs` | number | `240000` | Hard ceiling for entire orchestration | +| `failPolicy` | enum | `continue` | `continue` or `fail-fast` | +| `returnSelection` | enum | `full` | `text` \| `json` \| `full` | +| `emitTiming` | boolean | `true` | Appends timing summary + ASCII timeline | + +--- + +## 🧩 “Flows” Config — Flexible Formats + +You can paste **any** of the formats below into the **Flows** input (the tool normalizes them): + +### 1) Array of objects +```json +[ + { "flowId": "1480...b584", "label": "A", "timeoutMs": 120000, "vars": { "role": "A" } }, + { "flowId": "69a9...5bc4", "label": "B", "timeoutMs": 120000, "vars": { "role": "B" } } +] +``` + +### 2) Array of flowId strings +```json +["1480...b584", "69a9...5bc4", "d2c1...7aa0"] +``` + +### 3) Map object (labels → config) +```json +{ + "A": "1480...b584", + "B": { "flowId": "69a9...5bc4", "vars": { "role": "B" }, "timeoutMs": 90000, "apiKey": "sk-..." } +} +``` + +> The tool also accepts “JS-like” objects (unquoted keys / single quotes). +> It will attempt to convert them into valid JSON automatically. + +--- + +## 🧠 Tool Input Patterns + +When the tool runs, it expects a **string** or **JSON string** as input: + +### A) Plain string input +- Your `questionTemplate` (default `{{input}}`) will inject this text. + +Example: +``` +"Summarize the latest customer feedback for Product X." +``` + +### B) JSON string with variables +- Lets you supply both the **prompt** and **branch variables**: + +```json +{ + "input": "Create a short sales blurb for Product X.", + "vars": { + "tone": "friendly", + "audience": "retail" + } +} +``` + +These variables are available in the template as `{{vars.tone}}`, etc. + +--- + +## 📤 Output + +Depends on `returnSelection`: + +- `text`: `[{ role, text }]` +- `json`: `[{ role, json }]` +- `full` (default): + ``` + [{ + role, text, json, usedTools, sourceDocuments, sessionId, + ms, tStart, tEnd, relStart, relEnd + }] + ``` +If `emitTiming` is `true`, the tool appends a final block with: +- **Total wall time**, **sum of branch times**, **max branch time** +- **Speedup** (sum / total) +- **ASCII timeline** (1 char = 100ms), e.g. +``` +A | ###### 620ms +B | #### 410ms +S | ######### 1030ms + +Timeline (1 char = 100ms): + A ###### +B #### +S ######### +``` + +--- + +## 🛡️ Failure Policy + +- `continue` (default): collect both successes and errors and return all +- `fail-fast`: abort remaining branches on first failure + +Each branch also honors its own `timeoutMs`. + +--- + +## 🔐 Authentication + +- Use `defaultApiKey` for all branches, **or** +- Set `apiKey` per branch in `flows` to override + +Requests are POSTed to: +``` +{baseUrl}/predic.../{flowId} +``` + +--- + +## 🧪 Tips + +- Use **labels** (`A`, `B`, `C`, …) to make timelines and reports readable +- Keep `maxParallel` aligned with your server capacity +- Supply `vars` per branch to tweak prompts/roles without modifying subflows +- Toggle `emitTiming` to measure real-world speedups + +--- + +## 📁 File layout (reference) + +``` +packages/ + components/ + nodes/ + tools/ + ParallelSubflows/ + ParallelSubflows.ts # Node definition + core.ts # Execution engine + parallel.svg # (optional) image +``` + +--- + +## 🙌 Acknowledgements + +Built for Flowise to make **multi-agent** and **fan-out/fan-in** patterns easy, observable, and fast. diff --git a/packages/components/nodes/tools/ParallelSubflows/core.ts b/packages/components/nodes/tools/ParallelSubflows/core.ts new file mode 100644 index 00000000000..719786c9704 --- /dev/null +++ b/packages/components/nodes/tools/ParallelSubflows/core.ts @@ -0,0 +1,405 @@ +// packages/components/nodes/utilities/ParallelSubflows/core.ts +import type { ToolParams } from '@langchain/core/tools' +import { Tool } from '@langchain/core/tools' + +export type FailPolicy = 'continue' | 'fail-fast' +export type ReturnSelection = 'text' | 'json' | 'full' + +export interface FlowBranch { + flowId: string + label?: string // e.g. 'A' | 'B' | 'C' + timeoutMs?: number + apiKey?: string + vars?: Record + // Optional per-branch question template override + questionTemplate?: string +} + +export interface ParallelSubflowsConfig { + baseUrl: string + defaultApiKey?: string + questionTemplate?: string + flows: FlowBranch[] + maxParallel: number + overallTimeoutMs: number + failPolicy: FailPolicy + returnSelection: ReturnSelection + emitTiming: boolean +} + +type OkReport = { + role: string + ok: true + status: number + text: string + json: any + sourceDocuments: any[] + usedTools?: any[] + sessionId?: string | null + ms: number + tStart: number + tEnd: number + relStart: number + relEnd: number +} + +type ErrReport = { + role: string + ok: false + status: number + error: string + body?: any + ms: number + tStart: number + tEnd: number + relStart: number + relEnd: number +} + +function uuid(): string { + if ((globalThis as any).crypto?.randomUUID) return (globalThis as any).crypto.randomUUID() + return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, (c) => { + const r = (Math.random() * 16) | 0 + const v = c === 'x' ? r : (r & 0x3) | 0x8 + return v.toString(16) + }) +} + +function msFmt(ms: number) { + return `${ms}ms (${(ms / 1000).toFixed(2)}s)` +} + +function asciiTimelineWithSum(per: Array, unit = 100) { + const lines: string[] = [] + for (const p of per) { + const start = Math.max(0, Math.round(p.relStart / unit)) + const width = Math.max(1, Math.round((p.relEnd - p.relStart) / unit)) + const bar = `${' '.repeat(start)}${'#'.repeat(width)}` + lines.push(`${p.role} | ${bar} ${p.ms}ms`) + } + const sumMs = per.reduce((acc, p) => acc + (p.ms || 0), 0) + const sumWidth = Math.max(1, Math.round(sumMs / unit)) + lines.push(`S | ${'#'.repeat(sumWidth)} ${sumMs}ms`) + return lines.join('\n') +} + +// Minimal p-limit (no deps) +function pLimit(concurrency: number) { + let active = 0 + const queue: Array<() => void> = [] + const next = () => { + active-- + const fn = queue.shift() + if (fn) fn() + } + return (fn: () => Promise): Promise => + new Promise((resolve, reject) => { + const run = async () => { + active++ + try { + resolve(await fn()) + } catch (e) { + reject(e) + } finally { + next() + } + } + if (active < concurrency) run() + else queue.push(run) + }) +} + +async function fetchWithTimeout(url: string, init: any, ms: number): Promise { + // Prefer native AbortSignal.timeout if available (Node 18+) + if (typeof (globalThis as any).AbortSignal?.timeout === 'function') { + return fetch(url, { ...init, signal: (AbortSignal as any).timeout(ms) }) + } + // Fallback: race (does not abort underlying socket) + return Promise.race([ + fetch(url, init), + new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout')), ms)) + ]) +} + +// Basic Mustache-like templating for {{input}} + {{var}} +function renderTemplate(tpl: string, scope: Record) { + return String(tpl ?? '').replace(/\{\{\s*([\w.]+)\s*\}\}/g, (_m, key) => { + const parts = String(key).split('.') + let v: any = scope + for (const p of parts) { + if (v && typeof v === 'object' && p in v) v = v[p] + else return '' + } + return v == null ? '' : String(v) + }) +} + +export class ParallelSubflowsTool extends Tool { + name = 'parallelSubflows' + description = + 'Launch multiple Flowise subflows in parallel via /prediction/{id}, wait for all, return merged results with optional timing.' + + private cfg: ParallelSubflowsConfig + + constructor(cfg: ParallelSubflowsConfig, toolParams?: ToolParams) { + super(toolParams) + this.cfg = cfg + } + + /** Tool input: + * - If `questionTemplate` includes {{input}}, it’s replaced by the tool input string. + * - Alternatively you can pass a JSON string: { "input": "question text", "vars": { ... } } + */ + async _call(rawInput: string): Promise { + const { + baseUrl, + defaultApiKey, + questionTemplate = '{{input}}', + flows, + maxParallel, + overallTimeoutMs, + failPolicy, + returnSelection, + emitTiming + } = this.cfg + + if (!Array.isArray(flows) || flows.length === 0) { + return 'ParallelSubflows: No flows configured. Provide an array of {flowId,label,...}.' + } + + // Parse optional JSON input to allow passing extra vars at runtime + let inputPayload: any = {} + try { + inputPayload = JSON.parse(rawInput) + } catch { + inputPayload = { input: rawInput } + } + const inputText: string = String(inputPayload.input ?? rawInput ?? '') + const inputVars: Record = (inputPayload.vars && typeof inputPayload.vars === 'object') + ? inputPayload.vars + : {} + + // Build semaphore + const limit = pLimit(Math.max(1, Number(maxParallel) || flows.length)) + + // Shared aborters for fail-fast + const controllers: AbortController[] = [] + let shouldAbort = false + + const headersBase: Record = { 'Content-Type': 'application/json' } + const runId = uuid() + const tAll0 = Date.now() + + // Create tasks + const tasks = flows + .filter((f) => f && typeof f.flowId === 'string' && f.flowId.trim()) + .map((f) => { + const label = f.label || f.flowId.slice(0, 6) + const branchVars = { ...(f.vars || {}), runId, label } + const tpl = f.questionTemplate || questionTemplate + const question = renderTemplate(tpl, { input: inputText, vars: inputVars, ...branchVars }) + const body = { + question, + streaming: false, + overrideConfig: { + sessionId: `${runId}-${label}`, + vars: branchVars + } + } + const url = `${baseUrl.replace(/\/+$/, '')}/prediction/${encodeURIComponent(f.flowId)}` + const apiKey = f.apiKey || defaultApiKey || '' + const headers = { ...headersBase } + if (apiKey) headers['Authorization'] = `Bearer ${apiKey}` + + const timeoutMs = Number(f.timeoutMs || 120000) + + return async (): Promise => { + if (shouldAbort) { + const now = Date.now() + return { + role: label, + ok: false, + status: 0, + error: 'cancelled (fail-fast)', + ms: 0, + tStart: now, + tEnd: now, + relStart: now - tAll0, + relEnd: now - tAll0 + } + } + const t0 = Date.now() + let res: Response | null = null + let text: string = '' + + const controller = new AbortController() + controllers.push(controller) + try { + // Prefer native fetch; fallback to node-fetch dynamically + const doFetch = + typeof fetch === 'function' + ? fetch + : (await import('node-fetch')).default as unknown as typeof fetch + + res = await fetchWithTimeout( + url, + { method: 'POST', headers, body: JSON.stringify(body), signal: controller.signal }, + timeoutMs + ) + text = await res.text() + } catch (e: any) { + const t1 = Date.now() + const out: ErrReport = { + role: label, + ok: false, + status: 0, + error: `fetch error: ${String(e)}`, + ms: t1 - t0, + tStart: t0, + tEnd: t1, + relStart: t0 - tAll0, + relEnd: t1 - tAll0 + } + if (failPolicy === 'fail-fast') { + shouldAbort = true + controllers.forEach((c) => c.abort()) + } + return out + } + + const t1 = Date.now() + const ms = t1 - t0 + + let data: any = null + try { + data = JSON.parse(text) + } catch { + data = { raw: text } + } + + if (!res!.ok) { + const out: ErrReport = { + role: label, + ok: false, + status: res!.status, + error: data?.message || data?.error || `HTTP ${res!.status}`, + body: data, + ms, + tStart: t0, + tEnd: t1, + relStart: t0 - tAll0, + relEnd: t1 - tAll0 + } + if (failPolicy === 'fail-fast') { + shouldAbort = true + controllers.forEach((c) => c.abort()) + } + return out + } + + const ok: OkReport = { + role: label, + ok: true, + status: res!.status, + text: data?.text ?? '', + json: data?.json ?? {}, + sourceDocuments: data?.sourceDocuments ?? [], + usedTools: data?.usedTools ?? [], + sessionId: data?.sessionId ?? null, + ms, + tStart: t0, + tEnd: t1, + relStart: t0 - tAll0, + relEnd: t1 - tAll0 + } + return ok + } + }) + + if (!tasks.length) { + return 'ParallelSubflows: No valid flow entries (need flowId).' + } + + const runner = (async () => { + // Queue all tasks immediately; pLimit ensures only maxParallel run at once + const promises = tasks.map((task) => limit(task)) + + // Our task wrapper already catches errors and returns ErrReport, + // so these promises shouldn't reject. allSettled adds extra safety. + const results = await Promise.allSettled(promises) + + // Normalize to OkReport | ErrReport[] + const settled: Array = results.map((r, i) => { + if (r.status === 'fulfilled') return r.value as OkReport | ErrReport + const now = Date.now() + return { + role: tasks[i] ? 'branch-' + (i + 1) : '?', + ok: false, + status: 0, + error: String(r.reason), + ms: 0, + tStart: now, + tEnd: now, + relStart: 0, + relEnd: 0 + } as ErrReport + }) + + const tAll1 = Date.now() + + const reports = settled.filter((x) => x.ok) as OkReport[] + const errors = settled.filter((x) => !x.ok) as ErrReport[] + + // Metrics + const totalMs = tAll1 - tAll0 + const perMs = settled.map((p) => p.ms) + const sumMs = perMs.reduce((a, b) => a + b, 0) + const maxMs = perMs.reduce((a, b) => Math.max(a, b), 0) + const speedup = totalMs > 0 ? sumMs / totalMs : 0 + const concurrencyFactor = Number(speedup.toFixed(2)) + + const timing = { totalMs, sumMs, maxMs, concurrencyFactor, per: settled } + + let pretty = '' + if (emitTiming) { + const lineA = `A: ${msFmt((settled.find((p) => p.role === 'A')?.ms) ?? 0)}` + const lineB = `B: ${msFmt((settled.find((p) => p.role === 'B')?.ms) ?? 0)}` + const lineC = `C: ${msFmt((settled.find((p) => p.role === 'C')?.ms) ?? 0)}` + const normalTotal = `Sequential (normal) total: ${msFmt(sumMs)}` + const parallelTotal = `Parallel (observed) total: ${msFmt(totalMs)}` + const speed = `Speedup: ×${speedup.toFixed(2)} (sum/total)` + const timeline = asciiTimelineWithSum(settled, 100) + pretty = [lineA, lineB, lineC, '', normalTotal, parallelTotal, speed, '', 'Timeline (1 char = 100ms):', timeline].join('\n') + } + + const finalReports = reports.map((r) => { + if (returnSelection === 'text') return { role: r.role, text: r.text } + if (returnSelection === 'json') return { role: r.role, json: r.json } + return { + role: r.role, + text: r.text, + json: r.json, + sourceDocuments: r.sourceDocuments, + usedTools: r.usedTools, + sessionId: r.sessionId + } + }) + + return JSON.stringify({ reports: finalReports, errors, timing, pretty }) + })() + + if (overallTimeoutMs && overallTimeoutMs > 0) { + // Hard ceiling for the whole orchestration + if (typeof (globalThis as any).AbortSignal?.timeout === 'function') { + } + return Promise.race([ + runner, + new Promise((_, reject) => + setTimeout(() => reject(new Error('ParallelSubflows: overall timeout')), overallTimeoutMs) + ) + ]).catch((e) => `ParallelSubflows error: ${String(e)}`) + } + + return runner.catch((e) => `ParallelSubflows error: ${String(e)}`) + } +} diff --git a/packages/components/nodes/tools/ParallelSubflows/parallel.svg b/packages/components/nodes/tools/ParallelSubflows/parallel.svg new file mode 100644 index 00000000000..22050bee405 --- /dev/null +++ b/packages/components/nodes/tools/ParallelSubflows/parallel.svg @@ -0,0 +1,13 @@ + + + + + + + + + + + + +