diff --git a/src/pkg/maestro/deploy.go b/src/pkg/maestro/deploy.go index 1b5344d..6bac31e 100644 --- a/src/pkg/maestro/deploy.go +++ b/src/pkg/maestro/deploy.go @@ -4,11 +4,14 @@ package maestro import ( + "encoding/json" "fmt" + "net/http" "os" "os/exec" "path/filepath" "strings" + "time" "go.uber.org/zap" "gopkg.in/yaml.v3" @@ -440,4 +443,210 @@ func CreateConfigMap(agentsYAML string, workflowYAML string) error { return nil } +// DeployDefault deploys the workflow with API server and UI (default deployment method). +// This is similar to the Python __deploy_agents_workflow_node implementation. +func (d *Deploy) DeployDefault() error { + // Create temporary directory for deployment + d.TmpDir = filepath.Join(os.TempDir(), "maestro") + if err := os.MkdirAll(d.TmpDir, 0755); err != nil { + return fmt.Errorf("failed to create temporary directory: %w", err) + } + + // Create src directory in the temporary directory + srcDir := filepath.Join(d.TmpDir, "src") + if err := os.MkdirAll(srcDir, 0755); err != nil { + return fmt.Errorf("failed to create src directory: %w", err) + } + + // Write agent contents to file + agentsFile := filepath.Join(srcDir, "agents.yaml") + if err := os.WriteFile(agentsFile, []byte(d.Agent), 0644); err != nil { + return fmt.Errorf("failed to write agents file: %w", err) + } + + // Write workflow contents to file + workflowFile := filepath.Join(srcDir, "workflow.yaml") + if err := os.WriteFile(workflowFile, []byte(d.Workflow), 0644); err != nil { + return fmt.Errorf("failed to write workflow file: %w", err) + } + + // Get API host and port from environment or use defaults + apiHost := os.Getenv("MAESTRO_API_HOST") + if apiHost == "" { + apiHost = "127.0.0.1" + } + + apiPortStr := os.Getenv("MAESTRO_API_PORT") + if apiPortStr == "" { + apiPortStr = "8000" + } + + uiPort := os.Getenv("MAESTRO_UI_PORT") + if uiPort == "" { + uiPort = "5173" + } + + // Convert port string to int + apiPort := 8000 + if port, err := fmt.Sscanf(apiPortStr, "%d", &apiPort); err != nil || port != 1 { + d.Logger.Warn("Invalid API port, using default 8000", + zap.String("provided_port", apiPortStr)) + apiPort = 8000 + } + + // Set CORS environment variable + corsOrigins := os.Getenv("CORS_ALLOW_ORIGINS") + if corsOrigins == "" { + corsOrigins = fmt.Sprintf("http://%s:%s", apiHost, uiPort) + os.Setenv("CORS_ALLOW_ORIGINS", corsOrigins) + } + + d.Logger.Info("Starting default deployment", + zap.String("api_host", apiHost), + zap.Int("api_port", apiPort), + zap.String("ui_port", uiPort), + zap.String("agents_file", agentsFile), + zap.String("workflow_file", workflowFile)) + + // Start the API server using ServeWorkflow in a goroutine + apiErrChan := make(chan error, 1) + go func() { + if err := ServeWorkflow(agentsFile, workflowFile, apiHost, apiPort); err != nil { + d.Logger.Error("Failed to start API server", + zap.Error(err)) + apiErrChan <- err + } + }() + + // Wait for API server to be healthy + if err := waitForAPIHealth(apiHost, apiPort, 60, 1, apiErrChan, d.Logger); err != nil { + d.Logger.Error("Failed to start API server - health check failed", + zap.Error(err)) + return fmt.Errorf("API server failed to become healthy: %w", err) + } + + // Start the UI server + if err := startUIServer(uiPort, d.Logger); err != nil { + d.Logger.Error("Failed to start UI server", + zap.Error(err)) + return fmt.Errorf("failed to start UI server: %w", err) + } + + d.Logger.Info("Default deployment started successfully", + zap.String("api_url", fmt.Sprintf("http://%s:%d", apiHost, apiPort)), + zap.String("ui_url", fmt.Sprintf("http://localhost:%s", uiPort))) + + return nil +} + +// waitForAPIHealth waits for the API server to be healthy by polling the /health endpoint. +// Parameters: +// - host: API server host +// - port: API server port +// - timeout: Maximum time to wait in seconds +// - checkInterval: Time between health checks in seconds +// - apiErrChan: Channel to receive errors from the API server goroutine +// - logger: Logger instance +// +// Returns: +// - error: nil if API is healthy, error if startup failed or timeout reached +func waitForAPIHealth(host string, port int, timeout int, checkInterval int, apiErrChan <-chan error, logger *zap.Logger) error { + url := fmt.Sprintf("http://%s:%d/health", host, port) + startTime := time.Now() + timeoutDuration := time.Duration(timeout) * time.Second + ticker := time.NewTicker(time.Duration(checkInterval) * time.Second) + defer ticker.Stop() + + logger.Info("Waiting for API server to be ready", zap.String("url", url)) + + for { + select { + case err := <-apiErrChan: + // API server failed to start + logger.Error("API server startup failed", zap.Error(err)) + return fmt.Errorf("API server startup failed: %w", err) + + case <-ticker.C: + // Check if timeout has been reached + if time.Since(startTime) >= timeoutDuration { + logger.Error("API server failed to become ready", zap.Int("timeout_seconds", timeout)) + return fmt.Errorf("API server health check timeout after %d seconds", timeout) + } + + // Try to check health endpoint + resp, err := http.Get(url) + if err == nil { + defer resp.Body.Close() + if resp.StatusCode == 200 { + var healthData map[string]interface{} + if err := json.NewDecoder(resp.Body).Decode(&healthData); err == nil { + if status, ok := healthData["status"].(string); ok && status == "healthy" { + logger.Info("API server is ready!") + return nil + } + } + } + } + } + } +} + +// startUIServer starts the UI server using npm. +// Parameters: +// - uiPort: Port for the UI server +// - logger: Logger instance +// +// Returns: +// - error if any +func startUIServer(uiPort string, logger *zap.Logger) error { + // Get the project root directory + // Assuming the current working directory structure + cwd, err := os.Getwd() + if err != nil { + return fmt.Errorf("failed to get current working directory: %w", err) + } + + // Try to find the UI directory + // Look for src/maestro/ui or ui directory + uiPaths := []string{ + filepath.Join(cwd, "src", "maestro", "ui"), + filepath.Join(cwd, "ui"), + filepath.Join(cwd, "..", "maestro", "src", "maestro", "ui"), + } + + var uiCwd string + for _, path := range uiPaths { + if _, err := os.Stat(path); err == nil { + uiCwd = path + break + } + } + + if uiCwd == "" { + logger.Warn("UI directory not found, skipping UI server startup") + return nil + } + + // Set up environment for UI server + uiEnv := os.Environ() + uiEnv = append(uiEnv, fmt.Sprintf("PORT=%s", uiPort)) + + // Start the UI server + npmCmd := exec.Command("npm", "run", "dev") + npmCmd.Dir = uiCwd + npmCmd.Env = uiEnv + // Suppress stderr to avoid cluttering logs + npmCmd.Stderr = nil + + if err := npmCmd.Start(); err != nil { + return fmt.Errorf("failed to start UI server: %w", err) + } + + logger.Info("UI server started", + zap.String("ui_cwd", uiCwd), + zap.String("port", uiPort)) + + return nil +} + // Made with Bob diff --git a/src/pkg/mcp/handlers.go b/src/pkg/mcp/handlers.go index 37fce51..217bf0b 100644 --- a/src/pkg/mcp/handlers.go +++ b/src/pkg/mcp/handlers.go @@ -830,8 +830,19 @@ func handleDeployWorkflow(ctx context.Context, request mcp.CallToolRequest) (*mc } }() GlobalServerState.Logger.Info("Started asynchronous Kubernetes deployment") + } else { + // Default deployment: serve workflow with API and UI + go func() { + err := deploy.DeployDefault() + if err != nil { + GlobalServerState.Logger.Error("Failed to deploy workflow", + zap.Error(err)) + } else { + GlobalServerState.Logger.Info("Default deployment completed successfully") + } + }() + GlobalServerState.Logger.Info("Started asynchronous default deployment") } - // TODO: Implement workflow deployment logic // This would involve deploying the workflow to the specified target GlobalServerState.Logger.Info("Deploying workflow",