Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions src/pkg/maestro/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
package maestro

import (
"encoding/json"
"fmt"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"go.uber.org/zap"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -440,4 +443,210 @@ func CreateConfigMap(agentsYAML string, workflowYAML string) error {
return nil
}

// DeployDefault deploys the workflow with API server and UI (default deployment method).
// This is similar to the Python __deploy_agents_workflow_node implementation.
func (d *Deploy) DeployDefault() error {
// Create temporary directory for deployment
d.TmpDir = filepath.Join(os.TempDir(), "maestro")
if err := os.MkdirAll(d.TmpDir, 0755); err != nil {
return fmt.Errorf("failed to create temporary directory: %w", err)
}

// Create src directory in the temporary directory
srcDir := filepath.Join(d.TmpDir, "src")
if err := os.MkdirAll(srcDir, 0755); err != nil {
return fmt.Errorf("failed to create src directory: %w", err)
}

// Write agent contents to file
agentsFile := filepath.Join(srcDir, "agents.yaml")
if err := os.WriteFile(agentsFile, []byte(d.Agent), 0644); err != nil {
return fmt.Errorf("failed to write agents file: %w", err)
}

// Write workflow contents to file
workflowFile := filepath.Join(srcDir, "workflow.yaml")
if err := os.WriteFile(workflowFile, []byte(d.Workflow), 0644); err != nil {
return fmt.Errorf("failed to write workflow file: %w", err)
}

// Get API host and port from environment or use defaults
apiHost := os.Getenv("MAESTRO_API_HOST")
if apiHost == "" {
apiHost = "127.0.0.1"
}

apiPortStr := os.Getenv("MAESTRO_API_PORT")
if apiPortStr == "" {
apiPortStr = "8000"
}

uiPort := os.Getenv("MAESTRO_UI_PORT")
if uiPort == "" {
uiPort = "5173"
}

// Convert port string to int
apiPort := 8000
if port, err := fmt.Sscanf(apiPortStr, "%d", &apiPort); err != nil || port != 1 {
d.Logger.Warn("Invalid API port, using default 8000",
zap.String("provided_port", apiPortStr))
apiPort = 8000
}

// Set CORS environment variable
corsOrigins := os.Getenv("CORS_ALLOW_ORIGINS")
if corsOrigins == "" {
corsOrigins = fmt.Sprintf("http://%s:%s", apiHost, uiPort)
os.Setenv("CORS_ALLOW_ORIGINS", corsOrigins)
}

d.Logger.Info("Starting default deployment",
zap.String("api_host", apiHost),
zap.Int("api_port", apiPort),
zap.String("ui_port", uiPort),
zap.String("agents_file", agentsFile),
zap.String("workflow_file", workflowFile))

// Start the API server using ServeWorkflow in a goroutine
apiErrChan := make(chan error, 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing where we read from this channel, if the API server fails immediately, the error goes into the channel but we only get the generic "failed to become healthy" error at line 524, If the server crashes after we return, the channel buffer fills up and the goroutine might block I think.

Is the intent to check this channel during waitForAPIHealth to catch early failures? In any case, it doesn't seem like apiErrChan is read from anywhere afaik? Not sure

go func() {
if err := ServeWorkflow(agentsFile, workflowFile, apiHost, apiPort); err != nil {
d.Logger.Error("Failed to start API server",
zap.Error(err))
apiErrChan <- err
}
}()

// Wait for API server to be healthy
if err := waitForAPIHealth(apiHost, apiPort, 60, 1, apiErrChan, d.Logger); err != nil {
d.Logger.Error("Failed to start API server - health check failed",
zap.Error(err))
return fmt.Errorf("API server failed to become healthy: %w", err)
}

// Start the UI server
if err := startUIServer(uiPort, d.Logger); err != nil {
d.Logger.Error("Failed to start UI server",
zap.Error(err))
return fmt.Errorf("failed to start UI server: %w", err)
}

d.Logger.Info("Default deployment started successfully",
zap.String("api_url", fmt.Sprintf("http://%s:%d", apiHost, apiPort)),
zap.String("ui_url", fmt.Sprintf("http://localhost:%s", uiPort)))

return nil
}

// waitForAPIHealth waits for the API server to be healthy by polling the /health endpoint.
// Parameters:
// - host: API server host
// - port: API server port
// - timeout: Maximum time to wait in seconds
// - checkInterval: Time between health checks in seconds
// - apiErrChan: Channel to receive errors from the API server goroutine
// - logger: Logger instance
//
// Returns:
// - error: nil if API is healthy, error if startup failed or timeout reached
func waitForAPIHealth(host string, port int, timeout int, checkInterval int, apiErrChan <-chan error, logger *zap.Logger) error {
url := fmt.Sprintf("http://%s:%d/health", host, port)
startTime := time.Now()
timeoutDuration := time.Duration(timeout) * time.Second
ticker := time.NewTicker(time.Duration(checkInterval) * time.Second)
defer ticker.Stop()

logger.Info("Waiting for API server to be ready", zap.String("url", url))

for {
select {
case err := <-apiErrChan:
// API server failed to start
logger.Error("API server startup failed", zap.Error(err))
return fmt.Errorf("API server startup failed: %w", err)

case <-ticker.C:
// Check if timeout has been reached
if time.Since(startTime) >= timeoutDuration {
logger.Error("API server failed to become ready", zap.Int("timeout_seconds", timeout))
return fmt.Errorf("API server health check timeout after %d seconds", timeout)
}

// Try to check health endpoint
resp, err := http.Get(url)
if err == nil {
defer resp.Body.Close()
if resp.StatusCode == 200 {
var healthData map[string]interface{}
if err := json.NewDecoder(resp.Body).Decode(&healthData); err == nil {
if status, ok := healthData["status"].(string); ok && status == "healthy" {
logger.Info("API server is ready!")
return nil
}
}
}
}
}
}
}

// startUIServer starts the UI server using npm.
// Parameters:
// - uiPort: Port for the UI server
// - logger: Logger instance
//
// Returns:
// - error if any
func startUIServer(uiPort string, logger *zap.Logger) error {
// Get the project root directory
// Assuming the current working directory structure
cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("failed to get current working directory: %w", err)
}

// Try to find the UI directory
// Look for src/maestro/ui or ui directory
uiPaths := []string{
filepath.Join(cwd, "src", "maestro", "ui"),
filepath.Join(cwd, "ui"),
filepath.Join(cwd, "..", "maestro", "src", "maestro", "ui"),
}

var uiCwd string
for _, path := range uiPaths {
if _, err := os.Stat(path); err == nil {
uiCwd = path
break
}
}

if uiCwd == "" {
logger.Warn("UI directory not found, skipping UI server startup")
return nil
}

// Set up environment for UI server
uiEnv := os.Environ()
uiEnv = append(uiEnv, fmt.Sprintf("PORT=%s", uiPort))

// Start the UI server
npmCmd := exec.Command("npm", "run", "dev")
npmCmd.Dir = uiCwd
npmCmd.Env = uiEnv
// Suppress stderr to avoid cluttering logs
npmCmd.Stderr = nil

if err := npmCmd.Start(); err != nil {
return fmt.Errorf("failed to start UI server: %w", err)
}

logger.Info("UI server started",
zap.String("ui_cwd", uiCwd),
zap.String("port", uiPort))

return nil
}

// Made with Bob
13 changes: 12 additions & 1 deletion src/pkg/mcp/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,19 @@ func handleDeployWorkflow(ctx context.Context, request mcp.CallToolRequest) (*mc
}
}()
GlobalServerState.Logger.Info("Started asynchronous Kubernetes deployment")
} else {
// Default deployment: serve workflow with API and UI
go func() {
err := deploy.DeployDefault()
if err != nil {
GlobalServerState.Logger.Error("Failed to deploy workflow",
zap.Error(err))
} else {
GlobalServerState.Logger.Info("Default deployment completed successfully")
}
}()
GlobalServerState.Logger.Info("Started asynchronous default deployment")
}
// TODO: Implement workflow deployment logic
// This would involve deploying the workflow to the specified target

GlobalServerState.Logger.Info("Deploying workflow",
Expand Down
Loading