Skip to content

Commit e90d371

Browse files
author
cw
committed
feat: add visual pipeline builder for node-based task orchestration
Adds a full-stack visual pipeline editor inspired by n8n/ComfyUI, allowing users to drag-and-drop domain tasks onto a canvas, connect them with edges, and execute pipelines with real-time progress visualization. Daemon: Pipeline engine with topological sort execution, parallel node processing, {{nodeId.field}} variable substitution, file-system storage, and REST API (CRUD + execute + node catalog from 13 domains/37 task types). Web UI: React Flow canvas with drag-drop node catalog, config panel, save/load, and WebSocket-based execution progress with live node status coloring. Includes 3 pipeline templates (morning briefing, system health check, smart reminder).
1 parent ada20a6 commit e90d371

21 files changed

+2964
-7
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
{
2+
"id": "template_morning_briefing",
3+
"name": "Morning Briefing",
4+
"description": "Get weather and calendar events, then summarize with AI",
5+
"nodes": [
6+
{
7+
"id": "node_1",
8+
"type": "weather_current",
9+
"domain": "weather",
10+
"label": "Get Weather",
11+
"position": { "x": 100, "y": 150 },
12+
"params": {
13+
"location": "{{params.location}}"
14+
}
15+
},
16+
{
17+
"id": "node_2",
18+
"type": "calendar_today",
19+
"domain": "calendar",
20+
"label": "Today's Events",
21+
"position": { "x": 100, "y": 350 },
22+
"params": {}
23+
},
24+
{
25+
"id": "node_3",
26+
"type": "ai_query",
27+
"domain": "ai",
28+
"label": "Create Briefing",
29+
"position": { "x": 450, "y": 250 },
30+
"params": {
31+
"query": "Create a concise morning briefing. Weather: {{node_1.condition}} {{node_1.temperature_c}}C. Events: {{node_2.events}}. Keep it short and actionable."
32+
}
33+
}
34+
],
35+
"edges": [
36+
{
37+
"id": "edge_1",
38+
"source": "node_1",
39+
"source_port": "output",
40+
"target": "node_3",
41+
"target_port": "input"
42+
},
43+
{
44+
"id": "edge_2",
45+
"source": "node_2",
46+
"source_port": "output",
47+
"target": "node_3",
48+
"target_port": "input"
49+
}
50+
],
51+
"parameters": [
52+
{
53+
"name": "location",
54+
"type": "string",
55+
"default": "San Francisco",
56+
"description": "Weather location"
57+
}
58+
]
59+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"id": "template_smart_reminder",
3+
"name": "Smart Reminder",
4+
"description": "Calculate a future time and create a reminder",
5+
"nodes": [
6+
{
7+
"id": "node_1",
8+
"type": "calculator_date_math",
9+
"domain": "calculator",
10+
"label": "Calculate Time",
11+
"position": { "x": 100, "y": 200 },
12+
"params": {
13+
"expression": "{{params.offset}}"
14+
}
15+
},
16+
{
17+
"id": "node_2",
18+
"type": "reminders_add",
19+
"domain": "reminders",
20+
"label": "Create Reminder",
21+
"position": { "x": 400, "y": 200 },
22+
"params": {
23+
"title": "{{params.reminder_text}}",
24+
"due_date": "{{node_1.result}}"
25+
}
26+
}
27+
],
28+
"edges": [
29+
{
30+
"id": "edge_1",
31+
"source": "node_1",
32+
"source_port": "output",
33+
"target": "node_2",
34+
"target_port": "input"
35+
}
36+
],
37+
"parameters": [
38+
{
39+
"name": "offset",
40+
"type": "string",
41+
"default": "+2 hours",
42+
"description": "Time offset for reminder"
43+
},
44+
{
45+
"name": "reminder_text",
46+
"type": "string",
47+
"default": "Follow up",
48+
"description": "Reminder text"
49+
}
50+
]
51+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
{
2+
"id": "template_system_health",
3+
"name": "System Health Check",
4+
"description": "Check system info and processes, then generate a health report",
5+
"nodes": [
6+
{
7+
"id": "node_1",
8+
"type": "system_info",
9+
"domain": "system",
10+
"label": "System Info",
11+
"position": { "x": 100, "y": 150 },
12+
"params": {}
13+
},
14+
{
15+
"id": "node_2",
16+
"type": "run_command",
17+
"domain": "system",
18+
"label": "Disk Usage",
19+
"position": { "x": 100, "y": 350 },
20+
"params": {
21+
"command": "df -h /"
22+
}
23+
},
24+
{
25+
"id": "node_3",
26+
"type": "ai_query",
27+
"domain": "ai",
28+
"label": "Health Report",
29+
"position": { "x": 450, "y": 250 },
30+
"params": {
31+
"query": "Generate a brief system health report. System: {{node_1.output}}. Disk: {{node_2.stdout}}. Flag any concerns."
32+
}
33+
}
34+
],
35+
"edges": [
36+
{
37+
"id": "edge_1",
38+
"source": "node_1",
39+
"source_port": "output",
40+
"target": "node_3",
41+
"target_port": "input"
42+
},
43+
{
44+
"id": "edge_2",
45+
"source": "node_2",
46+
"source_port": "output",
47+
"target": "node_3",
48+
"target_port": "input"
49+
}
50+
],
51+
"parameters": []
52+
}

daemon/lib/api/unified_api_server.dart

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import 'package:opencli_daemon/core/request_router.dart';
88
import 'package:opencli_daemon/ipc/ipc_protocol.dart';
99
import 'package:opencli_daemon/api/api_translator.dart';
1010
import 'package:opencli_daemon/api/message_handler.dart';
11+
import 'package:opencli_daemon/pipeline/pipeline_api.dart';
1112

1213
/// Unified API server on port 9529 for Web UI integration
1314
///
@@ -18,13 +19,21 @@ class UnifiedApiServer {
1819
final MessageHandler _messageHandler;
1920
final int port;
2021
HttpServer? _server;
22+
PipelineApi? _pipelineApi;
2123

2224
UnifiedApiServer({
2325
required RequestRouter requestRouter,
2426
required MessageHandler messageHandler,
2527
this.port = 9529,
28+
PipelineApi? pipelineApi,
2629
}) : _requestRouter = requestRouter,
27-
_messageHandler = messageHandler;
30+
_messageHandler = messageHandler,
31+
_pipelineApi = pipelineApi;
32+
33+
/// Set pipeline API (can be configured after construction).
34+
void setPipelineApi(PipelineApi api) {
35+
_pipelineApi = api;
36+
}
2837

2938
Future<void> start() async {
3039
final router = Router();
@@ -41,6 +50,9 @@ class UnifiedApiServer {
4150
// WebSocket /ws - Real-time messaging
4251
router.get('/ws', _messageHandler.handler);
4352

53+
// Pipeline API routes
54+
_pipelineApi?.registerRoutes(router);
55+
4456
final handler = const shelf.Pipeline()
4557
.addMiddleware(shelf.logRequests())
4658
.addMiddleware(_corsMiddleware())
@@ -178,7 +190,7 @@ class UnifiedApiServer {
178190

179191
Map<String, String> get _corsHeaders => {
180192
'Access-Control-Allow-Origin': '*',
181-
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
193+
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
182194
'Access-Control-Allow-Headers': 'Content-Type',
183195
};
184196

daemon/lib/core/daemon.dart

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import 'package:opencli_daemon/api/unified_api_server.dart';
1818
import 'package:opencli_daemon/api/message_handler.dart';
1919
import 'package:opencli_daemon/domains/domain_registry.dart';
2020
import 'package:opencli_daemon/domains/domain_plugin_adapter.dart';
21+
import 'package:opencli_daemon/pipeline/pipeline_store.dart';
22+
import 'package:opencli_daemon/pipeline/pipeline_executor.dart';
23+
import 'package:opencli_daemon/pipeline/pipeline_api.dart';
2124

2225
class Daemon {
2326
static const String version = '0.2.0';
@@ -212,12 +215,30 @@ class Daemon {
212215
TerminalUI.success('Plugin marketplace UI listening on port 9877',
213216
prefix: ' ✓');
214217

218+
// Initialize pipeline system
219+
TerminalUI.printInitStep('Initializing pipeline system');
220+
final pipelineStore = PipelineStore();
221+
await pipelineStore.initialize();
222+
final pipelineExecutor = PipelineExecutor(
223+
store: pipelineStore,
224+
executors: _mobileTaskHandler.executors,
225+
);
226+
_mobileTaskHandler.registerExecutor('pipeline_execute', pipelineExecutor);
227+
final pipelineApi = PipelineApi(
228+
store: pipelineStore,
229+
executor: pipelineExecutor,
230+
domainRegistry: _domainRegistry,
231+
connectionManager: _mobileManager,
232+
);
233+
TerminalUI.success('Pipeline system initialized', prefix: ' ✓');
234+
215235
// Start unified API server for Web UI integration
216236
TerminalUI.printInitStep('Starting unified API server', last: true);
217237
_unifiedApiServer = UnifiedApiServer(
218238
requestRouter: _router,
219239
messageHandler: MessageHandler(), // Create new instance for unified API
220240
port: 9529,
241+
pipelineApi: pipelineApi,
221242
);
222243
await _unifiedApiServer!.start();
223244
TerminalUI.success('Unified API server listening on port 9529',

daemon/lib/mobile/mobile_connection_manager.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,11 @@ class MobileConnectionManager {
533533
}
534534
}
535535

536+
/// Public broadcast for pipeline execution progress and other subsystems.
537+
void broadcastMessage(Map<String, dynamic> message) {
538+
_broadcastMessage(message);
539+
}
540+
536541
/// Send error message to mobile client
537542
void _sendError(WebSocketChannel channel, String error) {
538543
_sendMessage(channel, {

daemon/lib/mobile/mobile_task_handler.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ class MobileTaskHandler {
1414
final MobileConnectionManager connectionManager;
1515
final Map<String, TaskExecutor> _executors = {};
1616

17+
/// Access to registered executors (for pipeline system).
18+
Map<String, TaskExecutor> get executors => _executors;
19+
1720
/// Capability system components (optional, can be initialized separately)
1821
CapabilityLoader? _capabilityLoader;
1922
CapabilityRegistry? _capabilityRegistry;

0 commit comments

Comments
 (0)