-
-
Notifications
You must be signed in to change notification settings - Fork 5
feat(cli): add schedule trigger links #148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,150 @@ | ||
| package scheduleserver | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "net" | ||
| "net/http" | ||
| "strconv" | ||
|
|
||
| "github.com/nitrictech/suga/cli/internal/netx" | ||
| ) | ||
|
|
||
| // Server provides HTTP endpoints to manually trigger schedules | ||
| type Server struct { | ||
| services map[string]ServiceWithSchedules | ||
| mux *http.ServeMux | ||
| listener net.Listener | ||
| port netx.ReservedPort | ||
| server *http.Server | ||
| } | ||
|
|
||
| // ServiceWithSchedules interface for services that support schedule triggering | ||
| type ServiceWithSchedules interface { | ||
| GetName() string | ||
| TriggerSchedule(index int, async bool) error | ||
| } | ||
|
|
||
| // NewServer creates a new schedule trigger server | ||
| func NewServer(services map[string]ServiceWithSchedules) (*Server, error) { | ||
| // Get an available port | ||
| port, err := netx.GetNextPort(netx.MinPort(8000), netx.MaxPort(8999)) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to find open port: %w", err) | ||
| } | ||
|
|
||
| s := &Server{ | ||
| services: services, | ||
| mux: http.NewServeMux(), | ||
| port: port, | ||
| } | ||
|
|
||
| s.setupRoutes() | ||
|
|
||
| return s, nil | ||
| } | ||
|
|
||
| func (s *Server) setupRoutes() { | ||
| // Schedule trigger endpoint: GET /schedules/{serviceId}/{scheduleIndex}?async=true | ||
| s.mux.HandleFunc("/schedules/{serviceId}/{scheduleIndex}", s.handleTriggerSchedule) | ||
| } | ||
|
|
||
| func (s *Server) handleTriggerSchedule(w http.ResponseWriter, r *http.Request) { | ||
| // Only accept GET requests | ||
| if r.Method != http.MethodGet { | ||
| w.WriteHeader(http.StatusMethodNotAllowed) | ||
| fmt.Fprintf(w, "✗ Method not allowed. Use GET to trigger schedules.\n") | ||
| return | ||
| } | ||
|
|
||
| // Extract path parameters | ||
| serviceId := r.PathValue("serviceId") | ||
| scheduleIndexStr := r.PathValue("scheduleIndex") | ||
|
|
||
| // Parse schedule index | ||
| scheduleIndex, err := strconv.Atoi(scheduleIndexStr) | ||
| if err != nil { | ||
| w.WriteHeader(http.StatusBadRequest) | ||
| fmt.Fprintf(w, "✗ Invalid schedule index: %s\n", scheduleIndexStr) | ||
| return | ||
| } | ||
|
|
||
| // Check if schedule index is valid (non-negative) | ||
| if scheduleIndex < 0 { | ||
| w.WriteHeader(http.StatusBadRequest) | ||
| fmt.Fprintf(w, "✗ Schedule index must be non-negative\n") | ||
| return | ||
| } | ||
|
|
||
| // Find the service | ||
| svc, ok := s.services[serviceId] | ||
| if !ok { | ||
| w.WriteHeader(http.StatusNotFound) | ||
| fmt.Fprintf(w, "✗ Service '%s' not found\n", serviceId) | ||
| return | ||
| } | ||
|
|
||
| // Parse async query parameter (defaults to false for synchronous execution) | ||
| asyncStr := r.URL.Query().Get("async") | ||
| async := false | ||
| if asyncStr == "true" || asyncStr == "1" { | ||
| async = true | ||
| } | ||
|
|
||
| // Trigger the schedule | ||
| err = svc.TriggerSchedule(scheduleIndex, async) | ||
| if err != nil { | ||
| w.WriteHeader(http.StatusInternalServerError) | ||
| fmt.Fprintf(w, "✗ Failed to trigger schedule %d on service '%s': %v\n", scheduleIndex, serviceId, err) | ||
| return | ||
| } | ||
|
|
||
| // Success response | ||
| w.WriteHeader(http.StatusOK) | ||
| if async { | ||
| fmt.Fprintf(w, "✓ Schedule %d on service '%s' triggered asynchronously\n", scheduleIndex, serviceId) | ||
| } else { | ||
| fmt.Fprintf(w, "✓ Schedule %d on service '%s' executed successfully\n", scheduleIndex, serviceId) | ||
| } | ||
| } | ||
|
|
||
| // Start starts the HTTP server | ||
| func (s *Server) Start() error { | ||
| addr := fmt.Sprintf("localhost:%d", s.port) | ||
|
|
||
| listener, err := net.Listen("tcp", addr) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to listen on %s: %w", addr, err) | ||
| } | ||
|
|
||
| s.listener = listener | ||
| s.server = &http.Server{ | ||
| Handler: s.mux, | ||
| } | ||
|
|
||
| // Start server in goroutine | ||
| go func() { | ||
| if err := s.server.Serve(listener); err != nil && err != http.ErrServerClosed { | ||
| fmt.Printf("Schedule trigger server error: %v\n", err) | ||
| } | ||
| }() | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Stop stops the HTTP server | ||
| func (s *Server) Stop() error { | ||
| if s.server != nil { | ||
| return s.server.Close() | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // GetPort returns the port the server is listening on | ||
| func (s *Server) GetPort() int { | ||
| return int(s.port) | ||
| } | ||
|
|
||
| // GetURL returns the base URL of the schedule trigger server | ||
| func (s *Server) GetURL() string { | ||
| return fmt.Sprintf("http://localhost:%d", s.port) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -126,7 +126,7 @@ func (s *ServiceSimulation) hasExceededFailureLimit() bool { | |
| return len(s.consecutiveFailures) >= s.maxFailures | ||
| } | ||
|
|
||
| func (s *ServiceSimulation) startSchedules(stdoutWriter, stderrorWriter io.Writer) (*cron.Cron, error) { | ||
| func (s *ServiceSimulation) startSchedules(stderrorWriter io.Writer) (*cron.Cron, error) { | ||
| cron := cron.New() | ||
|
|
||
| for _, schedule := range s.intent.Schedules { | ||
|
|
@@ -244,7 +244,7 @@ func (s *ServiceSimulation) Start(autoRestart bool) error { | |
| s.cmd = srvCommand | ||
| s.updateStatus(Status_Running) | ||
|
|
||
| cron, err := s.startSchedules(stdoutWriter, stderrWriter) | ||
| cron, err := s.startSchedules(stderrWriter) | ||
| if err != nil { | ||
| s.updateStatus(Status_Fatal) | ||
| return err | ||
|
|
@@ -277,6 +277,61 @@ func (s *ServiceSimulation) Start(autoRestart bool) error { | |
| return nil | ||
| } | ||
|
|
||
| // TriggerSchedule manually triggers a schedule by index | ||
| // If async is true, the schedule runs in a goroutine and returns immediately | ||
| // If async is false, waits for the HTTP response | ||
| func (s *ServiceSimulation) TriggerSchedule(index int, async bool) error { | ||
| // Validate schedule index | ||
| if index < 0 || index >= len(s.intent.Schedules) { | ||
| return fmt.Errorf("schedule index %d out of range (service has %d schedules)", index, len(s.intent.Schedules)) | ||
| } | ||
|
|
||
| // Check if service is running | ||
| if s.currentStatus != Status_Running { | ||
| return fmt.Errorf("service is not running (current status: %v)", s.currentStatus) | ||
| } | ||
|
|
||
| schedule := s.intent.Schedules[index] | ||
|
|
||
| // Build the URL | ||
| url := url.URL{ | ||
| Scheme: "http", | ||
| Host: fmt.Sprintf("localhost:%d", s.port), | ||
| Path: schedule.Path, | ||
| } | ||
|
|
||
| // Function to execute the schedule | ||
| executeSchedule := func() error { | ||
| req, err := http.NewRequest(http.MethodPost, url.String(), nil) | ||
| if err != nil { | ||
| return fmt.Errorf("error creating request: %w", err) | ||
| } | ||
|
|
||
| resp, err := http.DefaultClient.Do(req) | ||
| if err != nil { | ||
| return fmt.Errorf("error sending request: %w", err) | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| if resp.StatusCode < 200 || resp.StatusCode >= 300 { | ||
| return fmt.Errorf("request returned status %d", resp.StatusCode) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| if async { | ||
| // Asynchronous execution | ||
| go func() { | ||
| _ = executeSchedule() | ||
| }() | ||
| return nil | ||
| } | ||
|
|
||
| // Synchronous execution | ||
| return executeSchedule() | ||
| } | ||
|
Comment on lines
+280
to
+333
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well ackchyually… TriggerSchedule works but drops async errors and has no HTTP timeout. The synchronous path is fine, but for If you care about observability and avoiding indefinite hangs, consider:
For example: + httpClient := &http.Client{Timeout: 30 * time.Second}
+
// Function to execute the schedule
executeSchedule := func() error {
- req, err := http.NewRequest(http.MethodPost, url.String(), nil)
+ req, err := http.NewRequest(http.MethodPost, url.String(), nil)
if err != nil {
return fmt.Errorf("error creating request: %w", err)
}
- resp, err := http.DefaultClient.Do(req)
+ resp, err := httpClient.Do(req)
@@
if async {
// Asynchronous execution
go func() {
- _ = executeSchedule()
+ if err := executeSchedule(); err != nil {
+ // TODO: route this through the service’s logging if desired
+ fmt.Fprintf(os.Stderr, "schedule trigger failed for %s[%d]: %v\n", s.name, index, err)
+ }
}()
return nil
}That keeps the current API while making async failures visible and bounded in time.
🤖 Prompt for AI Agents |
||
|
|
||
| func NewServiceSimulation(name string, intent schema.ServiceIntent, port netx.ReservedPort, apiPort netx.ReservedPort) (*ServiceSimulation, <-chan ServiceEvent, error) { | ||
| if intent.Dev == nil { | ||
| return nil, nil, fmt.Errorf("service does not have a dev configuration and cannot be started") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ import ( | |
| "sync" | ||
|
|
||
| "github.com/nitrictech/suga/cli/internal/netx" | ||
| "github.com/nitrictech/suga/cli/internal/scheduleserver" | ||
| "github.com/nitrictech/suga/cli/internal/simulation/database" | ||
| "github.com/nitrictech/suga/cli/internal/simulation/middleware" | ||
| "github.com/nitrictech/suga/cli/internal/simulation/service" | ||
|
|
@@ -37,11 +38,12 @@ type SimulationServer struct { | |
| storagepb.UnimplementedStorageServer | ||
| pubsubpb.UnimplementedPubsubServer | ||
|
|
||
| apiPort netx.ReservedPort | ||
| fileServerPort int | ||
| services map[string]*service.ServiceSimulation | ||
| databaseManager *database.DatabaseManager | ||
| servicesWg sync.WaitGroup | ||
| apiPort netx.ReservedPort | ||
| fileServerPort int | ||
| services map[string]*service.ServiceSimulation | ||
| databaseManager *database.DatabaseManager | ||
| servicesWg sync.WaitGroup | ||
| scheduleTriggerSv *scheduleserver.Server | ||
| } | ||
|
|
||
| const ( | ||
|
|
@@ -408,6 +410,64 @@ func styledName(name string, styleFunc func(...string) string) string { | |
| return styledNames[name] | ||
| } | ||
|
|
||
| func (s *SimulationServer) startScheduleTriggerServer(output io.Writer) error { | ||
| // Check if any service has schedules | ||
| hasSchedules := false | ||
| for _, serviceIntent := range s.appSpec.ServiceIntents { | ||
| if len(serviceIntent.Schedules) > 0 { | ||
| hasSchedules = true | ||
| break | ||
| } | ||
| } | ||
|
|
||
| if !hasSchedules { | ||
| return nil | ||
| } | ||
|
|
||
| // Convert services map to interface map for schedule trigger server | ||
| servicesWithSchedules := make(map[string]scheduleserver.ServiceWithSchedules) | ||
| for name, svc := range s.services { | ||
| servicesWithSchedules[name] = svc | ||
| } | ||
|
|
||
| // Create and start the schedule trigger server | ||
| triggerServer, err := scheduleserver.NewServer(servicesWithSchedules) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create schedule trigger server: %w", err) | ||
| } | ||
|
|
||
| err = triggerServer.Start() | ||
| if err != nil { | ||
| return fmt.Errorf("failed to start schedule trigger server: %w", err) | ||
| } | ||
|
|
||
| s.scheduleTriggerSv = triggerServer | ||
|
|
||
| fmt.Fprintf(output, "%s\n\n", style.Purple("Schedule Triggers")) | ||
|
|
||
| // Print clickable trigger URLs for each service's schedules | ||
| for serviceName, serviceIntent := range s.appSpec.ServiceIntents { | ||
| if len(serviceIntent.Schedules) == 0 { | ||
| continue | ||
| } | ||
|
|
||
| for i, schedule := range serviceIntent.Schedules { | ||
| triggerURL := fmt.Sprintf("%s/schedules/%s/%d", triggerServer.GetURL(), serviceName, i) | ||
| fmt.Fprintf(output, "%s %s schedule %d (%s -> %s)\n Trigger: %s\n", | ||
| greenCheck, | ||
| styledName(serviceName, style.Teal), | ||
| i, | ||
| style.Gray(schedule.Cron), | ||
| style.Gray(fmt.Sprintf("POST %s", schedule.Path)), | ||
| style.Cyan(triggerURL)) | ||
| } | ||
| } | ||
|
|
||
| fmt.Fprint(output, "\n") | ||
|
|
||
| return nil | ||
| } | ||
|
Comment on lines
+413
to
+469
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well ackchyually… schedule trigger wiring looks good, but you may want to tighten service selection and shutdown behavior. A couple of small nits:
Something like: - servicesWithSchedules := make(map[string]scheduleserver.ServiceWithSchedules)
- for name, svc := range s.services {
- servicesWithSchedules[name] = svc
- }
+ servicesWithSchedules := make(map[string]scheduleserver.ServiceWithSchedules)
+ for name, svc := range s.services {
+ if intent, ok := s.appSpec.ServiceIntents[name]; ok && len(intent.Schedules) > 0 {
+ servicesWithSchedules[name] = svc
+ }
+ }and: - if s.scheduleTriggerSv != nil {
- fmt.Println("Stopping schedule trigger server...")
- if err := s.scheduleTriggerSv.Stop(); err != nil {
- return fmt.Errorf("failed to stop schedule trigger server: %w", err)
- }
- }
+ if s.scheduleTriggerSv != nil {
+ fmt.Println("Stopping schedule trigger server...")
+ if err := s.scheduleTriggerSv.Stop(); err != nil {
+ fmt.Printf("Warning: failed to stop schedule trigger server: %v\n", err)
+ }
+ }would keep behavior robust with minimal extra complexity. Also applies to: 512-518, 530-536 🤖 Prompt for AI Agents |
||
|
|
||
| func (s *SimulationServer) Start(output io.Writer) error { | ||
| err := s.startSugaApis() | ||
| if err != nil { | ||
|
|
@@ -449,6 +509,14 @@ func (s *SimulationServer) Start(output io.Writer) error { | |
| fmt.Fprint(output, "\n") | ||
| } | ||
|
|
||
| // Start schedule trigger server after services are running | ||
| if len(s.services) > 0 { | ||
| err = s.startScheduleTriggerServer(output) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| fmt.Println(style.Gray("Use Ctrl-C to exit\n")) | ||
|
|
||
| // block on handling service outputs for now | ||
|
|
@@ -459,6 +527,14 @@ func (s *SimulationServer) Start(output io.Writer) error { | |
|
|
||
| // Stop gracefully shuts down the simulation server and cleans up resources | ||
| func (s *SimulationServer) Stop() error { | ||
| // Stop the schedule trigger server first | ||
| if s.scheduleTriggerSv != nil { | ||
| fmt.Println("Stopping schedule trigger server...") | ||
| if err := s.scheduleTriggerSv.Stop(); err != nil { | ||
| return fmt.Errorf("failed to stop schedule trigger server: %w", err) | ||
| } | ||
| } | ||
|
|
||
| // Stop services first before stopping database | ||
| for serviceName, svc := range s.services { | ||
| if svc != nil { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well ackchyually… the schedule trigger HTTP API treats all TriggerSchedule failures as 500, even for client mistakes.
Right now
handleTriggerScheduleturns anyTriggerScheduleerror intohttp.StatusInternalServerError, which means user errors like “schedule index out of range” or “service not running” will look like server faults.If you’re willing to standardize a couple of sentinel error values or types from your
ServiceWithSchedulesimplementations (e.g.ErrScheduleNotFound,ErrServiceNotRunning), you could map them to more appropriate responses:Sketch:
and then:
That keeps the core wiring intact while making the HTTP surface a bit more self-explanatory for callers.
🤖 Prompt for AI Agents