Skip to content

Commit 669beb1

Browse files
committed
feat(workflow): add initial workflow package with types and execution logic
1 parent 8574e70 commit 669beb1

File tree

6 files changed

+578
-0
lines changed

6 files changed

+578
-0
lines changed

packages/workflow/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
## workflow
2+
3+
> 为 ChatLuna 提供 workflow 工作流支持

packages/workflow/package.json

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
{
2+
"name": "@cortexluna/workflow",
3+
"description": "workflow support for cortexluna",
4+
"version": "0.0.1",
5+
"main": "lib/index.cjs",
6+
"module": "lib/index.mjs",
7+
"typings": "lib/index.d.ts",
8+
"files": [
9+
"lib",
10+
"dist"
11+
],
12+
"exports": {
13+
".": {
14+
"import": "./lib/index.mjs",
15+
"require": "./lib/index.cjs",
16+
"types": "./lib/index.d.ts"
17+
},
18+
"./package.json": "./package.json"
19+
},
20+
"type": "module",
21+
"author": "dingyi222666 <[email protected]>",
22+
"repository": {
23+
"type": "git",
24+
"url": "https://github.com/ChatLunaLab/chatluna.git",
25+
"directory": "packages/workflow"
26+
},
27+
"license": "AGPL-3.0",
28+
"bugs": {
29+
"url": "https://github.com/ChatLunaLab/cortexluna/issues"
30+
},
31+
"homepage": "https://github.com/ChatLunaLab/cortexluna/tree/main/packages/workflow#readme",
32+
"scripts": {
33+
"build": "atsc -b"
34+
},
35+
"engines": {
36+
"node": ">=18.0.0"
37+
},
38+
"keywords": [
39+
"chatbot",
40+
"koishi",
41+
"plugin",
42+
"service",
43+
"chatgpt",
44+
"gpt",
45+
"chatluna",
46+
"provider"
47+
],
48+
"dependencies": {
49+
"cortexluna": "^0.0.1",
50+
"zod": "3.24.2"
51+
},
52+
"devDependencies": {
53+
"atsc": "^2.1.0",
54+
"cordis": "^3.18.1"
55+
},
56+
"peerDependencies": {
57+
"cordis": "^3.18.1",
58+
"cortexluna": "^0.0.1"
59+
},
60+
"peerDependenciesMeta": {
61+
"cordis": {
62+
"optional": true
63+
}
64+
}
65+
}

packages/workflow/src/index.ts

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
import {
2+
NodeContext,
3+
NodeDefinition,
4+
NodeDependency,
5+
NodeExecutionTiming,
6+
NodeFactory,
7+
NodeId,
8+
NodeIO,
9+
NodeResult,
10+
WorkflowNode,
11+
WorkflowOptions
12+
} from './types.ts'
13+
14+
export * from './types.ts'
15+
16+
export function createNodeFactory(): NodeFactory {
17+
const nodes = new Map<string, NodeDefinition>()
18+
19+
return {
20+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
21+
registerNode: (type: string, definition: NodeDefinition<any, any>) => {
22+
nodes.set(type, definition)
23+
},
24+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
25+
getNode: (type: string) => nodes.get(type) as NodeDefinition<any, any>
26+
}
27+
}
28+
29+
function resolveDependency(dep: NodeDependency): string {
30+
return typeof dep === 'string' ? dep : dep.nodeId
31+
}
32+
33+
function detectCircularDependencies(nodes: WorkflowNode[]): boolean {
34+
const visited = new Set<NodeId>()
35+
const recursionStack = new Set<NodeId>()
36+
37+
function hasCycle(nodeId: NodeId): boolean {
38+
if (recursionStack.has(nodeId)) return true
39+
if (visited.has(nodeId)) return false
40+
41+
visited.add(nodeId)
42+
recursionStack.add(nodeId)
43+
44+
const node = nodes.find((n) => n.id === nodeId)
45+
if (node) {
46+
for (const dep of node.dependencies) {
47+
const depId = resolveDependency(dep)
48+
if (hasCycle(depId)) return true
49+
}
50+
}
51+
52+
recursionStack.delete(nodeId)
53+
return false
54+
}
55+
56+
return nodes.some((node) => hasCycle(node.id))
57+
}
58+
59+
function getExecutableNodes(
60+
nodes: WorkflowNode[],
61+
completed: Set<NodeId>,
62+
failed: Set<NodeId>
63+
): WorkflowNode[] {
64+
return nodes.filter((node) => {
65+
if (completed.has(node.id) || failed.has(node.id)) return false
66+
return node.dependencies.every((dep) => {
67+
const depId = resolveDependency(dep)
68+
return completed.has(depId) && !failed.has(depId)
69+
})
70+
})
71+
}
72+
73+
function validateNodeInput(
74+
node: WorkflowNode,
75+
definition: NodeDefinition,
76+
input: NodeIO
77+
): boolean {
78+
if (!definition.inputSchema) return true
79+
try {
80+
definition.inputSchema.parse(input)
81+
return true
82+
} catch {
83+
return false
84+
}
85+
}
86+
87+
export async function executeWorkflow(
88+
nodes: WorkflowNode[],
89+
factory: NodeFactory,
90+
initialContext: NodeContext = { variables: {}, metadata: {} },
91+
options: WorkflowOptions = {}
92+
): Promise<Record<NodeId, NodeResult>> {
93+
const { maxRetries = 3, maxParallel = 4, callbacks = {} } = options
94+
95+
if (detectCircularDependencies(nodes)) {
96+
throw new Error('Circular dependencies detected in workflow')
97+
}
98+
99+
const completed = new Set<NodeId>()
100+
const failed = new Set<NodeId>()
101+
const results: Record<NodeId, NodeResult> = {}
102+
const context: NodeContext = { ...initialContext }
103+
104+
while (completed.size + failed.size < nodes.length) {
105+
const executableNodes = getExecutableNodes(nodes, completed, failed)
106+
if (executableNodes.length === 0) break
107+
108+
const executions = executableNodes
109+
.slice(0, maxParallel)
110+
.map(async (node) => {
111+
const definition = factory.getNode(node.type)
112+
if (!definition) {
113+
failed.add(node.id)
114+
return
115+
}
116+
117+
const timing: NodeExecutionTiming = {
118+
startTime: Date.now()
119+
}
120+
121+
callbacks.onNodeStart?.(node.id, node.type)
122+
123+
const input =
124+
node.dependencies.length === 0
125+
? { ...context.variables }
126+
: node.dependencies.reduce((acc, dep) => {
127+
const depId = resolveDependency(dep)
128+
const depOutput = results[depId]?.output || {}
129+
130+
if (typeof dep === 'object' && dep.portId) {
131+
const inputId = dep.inputId || dep.portId
132+
return {
133+
...acc,
134+
[inputId]: depOutput[dep.portId]
135+
}
136+
}
137+
138+
return { ...acc, ...depOutput }
139+
}, {} as NodeIO)
140+
141+
if (!validateNodeInput(node, definition, input)) {
142+
const error = new Error(`Invalid input for node ${node.id}`)
143+
timing.endTime = Date.now()
144+
timing.duration = timing.endTime - timing.startTime
145+
callbacks.onNodeError?.(node.id, node.type, error, timing)
146+
failed.add(node.id)
147+
return
148+
}
149+
150+
let retries = 0
151+
while (retries <= maxRetries) {
152+
try {
153+
const output = await definition.run(input, context)
154+
timing.endTime = Date.now()
155+
timing.duration = timing.endTime - timing.startTime
156+
157+
const result: NodeResult = {
158+
state: 'completed',
159+
output
160+
}
161+
162+
results[node.id] = result
163+
completed.add(node.id)
164+
callbacks.onNodeComplete?.(
165+
node.id,
166+
node.type,
167+
result,
168+
timing
169+
)
170+
break
171+
} catch (error) {
172+
retries++
173+
if (retries > maxRetries) {
174+
timing.endTime = Date.now()
175+
timing.duration = timing.endTime - timing.startTime
176+
177+
const result: NodeResult = {
178+
state: 'failed',
179+
output: {},
180+
error: error as Error
181+
}
182+
results[node.id] = result
183+
failed.add(node.id)
184+
callbacks.onNodeError?.(
185+
node.id,
186+
node.type,
187+
error as Error,
188+
timing
189+
)
190+
}
191+
}
192+
}
193+
})
194+
195+
await Promise.all(executions)
196+
}
197+
198+
callbacks.onWorkflowComplete?.(results)
199+
return results
200+
}

packages/workflow/src/types.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { z } from 'zod'
2+
3+
export type NodeId = string
4+
5+
export type NodeIO<T = unknown> = Record<string, T>
6+
7+
export type NodeContext = {
8+
variables: Record<string, unknown>
9+
metadata: Record<string, unknown>
10+
}
11+
12+
export type NodeState =
13+
| 'pending'
14+
| 'running'
15+
| 'completed'
16+
| 'failed'
17+
| 'skipped'
18+
19+
export type NodeResult = {
20+
state: NodeState
21+
output: NodeIO
22+
error?: Error
23+
}
24+
25+
export type NodeDefinition<
26+
TInput extends NodeIO = NodeIO,
27+
TOutput extends NodeIO = NodeIO
28+
> = {
29+
run: (input: TInput, context: NodeContext) => Promise<TOutput>
30+
inputSchema?: z.ZodType<TInput>
31+
outputSchema?: z.ZodType<TOutput>
32+
}
33+
34+
export type NodeDependency =
35+
| string
36+
| {
37+
nodeId: NodeId
38+
portId: string
39+
inputId?: string
40+
}
41+
42+
export type WorkflowNode = {
43+
id: NodeId
44+
type: string
45+
dependencies: NodeDependency[]
46+
config?: Record<string, unknown>
47+
}
48+
49+
export type NodeExecutionTiming = {
50+
startTime: number
51+
endTime?: number
52+
duration?: number
53+
}
54+
55+
export type WorkflowCallbacks = {
56+
onNodeStart?: (nodeId: NodeId, nodeType: string) => void
57+
onNodeComplete?: (
58+
nodeId: NodeId,
59+
nodeType: string,
60+
result: NodeResult,
61+
timing: NodeExecutionTiming
62+
) => void
63+
onNodeError?: (
64+
nodeId: NodeId,
65+
nodeType: string,
66+
error: Error,
67+
timing: NodeExecutionTiming
68+
) => void
69+
onNodeSkipped?: (nodeId: NodeId, nodeType: string) => void
70+
onWorkflowComplete?: (results: Record<NodeId, NodeResult>) => void
71+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
72+
onNodeRunning?: (nodeId: NodeId, nodeType: string, value: any) => void
73+
}
74+
75+
export type WorkflowOptions = {
76+
maxRetries?: number
77+
maxParallel?: number
78+
callbacks?: WorkflowCallbacks
79+
}
80+
81+
export type NodeFactory = {
82+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
83+
registerNode<
84+
TInput extends NodeIO = NodeIO<unknown>,
85+
TOutput extends NodeIO = NodeIO<unknown>
86+
>(
87+
type: string,
88+
definition: NodeDefinition<TInput, TOutput>
89+
): void
90+
getNode<
91+
TInput extends NodeIO = NodeIO<unknown>,
92+
TOutput extends NodeIO = NodeIO<unknown>
93+
>(
94+
type: string
95+
): NodeDefinition<TInput, TOutput> | undefined
96+
}

0 commit comments

Comments
 (0)