Skip to content

Commit b4f70de

Browse files
Major Server Refactor: Centralized Runner Management & Service Architecture (#211)
* feat: create service package with basic lifecycle management - Add Service struct as root lifecycle owner - Implement Run/Shutdown methods with proper context handling - Use atomic.Bool to prevent double shutdown races - Add comprehensive unit tests with synctest and testify - Service blocks in Run() and shutdown is non-blocking - Graceful shutdown with configurable grace period - Thread-safe lifecycle state management using channels * Implement root service package with proper lifecycle management Create a service package as the root lifecycle owner for the cog runtime, replacing the tangled mess of context.TODO and mixed shutdown responsibilities. Key features: - Service struct with proper lifecycle state management using channels - Initialize() method that's idempotent and component-based - Run() method with errgroup coordination and proper shutdown flow - Shutdown() for graceful shutdown with runner grace periods - Context cancellation triggers immediate hard shutdown - Server interface allowing testing with mock servers - Integration with existing handler.Stop() for runner management - Comprehensive tests using synctest and testify The service provides clean separation between: - Graceful shutdown (Shutdown()) - signals runners, waits for grace period - Hard shutdown (context cancel) - immediate Close() with no grace period This creates a solid foundation for incremental refactoring while maintaining compatibility with existing runner/handler code. * Cleanup Cleanup Logging to use correct form of logger method Remove useless comments * Add signal handling for graceful shutdown - Add SIGTERM handling in await-explicit-shutdown mode - Add context parameter to Initialize method * Implement centralized runner management architecture Complete architectural rewrite of the runner system to address fundamental design limitations and enable reliable production operation. This represents the culmination of architectural work following preparatory commits that established service lifecycle management. ## Background Previous commits (b7b0405, 0747310, 2dd89e3, ed1b657) laid groundwork with service lifecycle management and signal handling. This commit completes the transformation by extracting and rebuilding the core runner architecture. ## Architectural Changes ### Core Extraction & Separation of Concerns - Extract all runner logic from `internal/server` to new `internal/runner` package - Restructure 908 lines of server/runner code into focused, testable modules: - `manager.go` (1111 lines): Centralized runner lifecycle and slot management - `runner.go` (992 lines): Individual runner process management - `types.go` (228 lines): Clean type definitions and interfaces - Comprehensive test coverage (2000+ lines across multiple test files) ### Runner Management Transformation - Replace direct slice manipulation in HTTP handlers with centralized Manager - Implement proper slot claiming/releasing for concurrency control - Add atomic operations for safe concurrent access - Improve resource management and cleanup paths ### Webhook Architecture Overhaul - Replace distributed webhook logic with dedicated `internal/webhook` package - Implement per-prediction response watchers to prevent duplicate webhooks - Add atomic CAS pattern for terminal webhook deduplication - Fix webhook timing and log accumulation issues ### Concurrency & Safety Improvements - Implement centralized concurrency management with clear ownership model - Add proper context cancellation throughout runner lifecycle - Enhance graceful shutdown with configurable timeouts - Resolve prediction cancellation and cleanup edge cases ## Test Harness Adaptations (Minimal Changes) The existing functional end-to-end test harness required only interface updates to work with the new architecture, demonstrating API compatibility was maintained: - Update test imports to use `internal/runner` and `internal/config` packages - Adapt `setupCogRuntime()` to use new Manager initialization patterns - Modify assertions to work with new response types (server vs runner response formats) - Remove obsolete `procedure_url_test.go` (154 lines) - functionality moved to integration tests - Skip `TestLogs` pending reimplementation with new log processor architecture - Net change: -24 lines (362 insertions, 386 deletions) across 16 test files All functional end-to-end tests continue to pass, validating that the architectural rewrite maintains behavioral compatibility with the previous implementation. ## Design Rationale The previous architecture concentrated HTTP handling, runner lifecycle, webhook sending, and concurrency control in a single file without clear boundaries. A complete rewrite was chosen to: - Establish clean separation of concerns with testable interfaces - Implement safe concurrency patterns throughout the system - Enable comprehensive test coverage for regression prevention - Create a maintainable foundation for future development ## Production Impact - Maintains full API compatibility - no breaking changes to external interfaces - Comprehensive test coverage prevents regressions during ongoing development - Clean architecture enables future reliability improvements - Resolves several classes of concurrency issues and resource management problems This rewrite establishes a robust foundation for reliable production operation and provides clear architectural boundaries for future development. * Update ARCHITECTURE.md to reflect centralized runner management - Add detailed package structure documentation for internal/server, internal/runner, internal/webhook, and internal/service - Document 6-step request processing flow through the component architecture - Update execution modes to describe current Manager and Runner behavior - Focus on architecture benefits and capabilities - Maintain hybrid Go/Python design documentation with current internal organization * Move PendingPrediction from runner.go to types.go Move PendingPrediction type and methods to types.go since it's used by both Manager and Runner components. This improves code organization by keeping shared types in their designated location. Changes: - Move PendingPrediction struct definition to types.go - Move all PendingPrediction methods (safeSend, safeClose, sendWebhook, sendWebhookSync) - Add comprehensive tests for PendingPrediction methods in types_test.go - Update concurrent operation tests to use new sync.WaitGroup.Go No functional changes - purely organizational refactoring. * Update isolation test to use waitgroup.Go function * Add webhook.Sender interface and refactor webhook handling Introduce Sender interface for webhook delivery to improve testability and enable future alternative implementations. Changes: - Add webhook.Sender interface with Send and SendConditional methods - Rename concrete implementation to DefaultSender with build-time assertion - Update all webhook usage to use interface instead of concrete type - Consolidate webhook event types to use webhook.Event consistently - Update PendingPrediction methods to use consistent parameter passing - Update all tests to use new interface and event types No functional changes - purely architectural improvement for better testability and extensibility. * Refactor webhook system to use io.Reader and fix race conditions Eliminate race conditions in webhook delivery by serializing JSON payloads under mutex protection and refactoring the webhook interface to accept io.Reader instead of any. Changes: - Change webhook.Sender interface to use io.Reader for type safety - Serialize PredictionResponse under mutex before webhook delivery - Update PendingPrediction.sendWebhook methods to handle serialization - Add jsonReader test helper for cleaner test payload creation - Fix all webhook tests to use new io.Reader interface This refactoring ensures thread-safe webhook delivery by performing JSON marshaling atomically under lock protection, preventing partial reads of mutating prediction state. The io.Reader interface provides better type safety and flexibility compared to the previous any type. * Implement forced shutdown on cleanup timeout (lost in refactor) Re-implement the cleanup timeout mechanism that was lost during the major server refactor. This restores the behavior from commit 575d218 where cleanup failures trigger forced process termination. Key changes: - Add ForceShutdownSignal for idempotent shutdown coordination - Implement cleanup token system in ForceKill for procedure mode only - Add background process verification with configurable timeout - Mark runners as DEFUNCT on kill failures to prevent ready status - Wire service to monitor for forced shutdown and call os.Exit(1) Behavior: - Non-procedure mode: Simple kill without cleanup monitoring - Procedure mode: Full cleanup verification with forced shutdown on timeout - Idempotent ForceKill calls prevent multiple shutdown attempts - Failed cleanup in procedure mode causes immediate server exit This ensures that stuck processes in procedure mode cannot leave the server in an unrecoverable state, maintaining system reliability.
1 parent 2854611 commit b4f70de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+7741
-3041
lines changed

ARCHITECTURE.md

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,33 @@ This hybrid architecture combines Go's performance advantages with Python's mach
2525

2626
### Go HTTP server
2727

28-
The HTTP server component handles all external communication and process management:
28+
The HTTP server component is organized into focused packages with clear separation of concerns:
2929

30+
#### `internal/server` - HTTP API Layer
3031
**HTTP routing**: Implements the Cog prediction API with endpoints for predictions, health checks, cancellation, and shutdown. Routes are dynamically configured based on runtime mode (standard vs procedure).
3132

32-
**Process management**: Manages Python runner processes including startup, shutdown, health monitoring, and crash recovery. In procedure mode, can manage multiple isolated runners with automatic eviction policies.
33+
**Request handling**: Processes incoming HTTP requests, validates payloads, and coordinates with the runner management layer. Handles both synchronous and asynchronous prediction workflows.
3334

34-
**Request coordination**: Handles request queuing, concurrency limits, and response aggregation. Maps HTTP requests to appropriate Python runners and manages the full request lifecycle.
35+
**Response management**: Aggregates results from runner layer and formats responses according to API specifications. Manages streaming responses and webhook notifications.
3536

36-
**File I/O handling**: Manages input file downloads and output file uploads, with support for both local storage and external upload endpoints. Handles path resolution and cleanup automatically.
37+
#### `internal/runner` - Process Management Layer
38+
**Centralized runner management**: The `Manager` component handles all Python process lifecycle including startup, shutdown, health monitoring, and crash recovery. Provides slot-based concurrency control and automatic resource cleanup.
3739

38-
**IPC coordination**: Receives status updates from Python processes via HTTP and manages bidirectional communication through the filesystem.
40+
**Individual runner management**: The `Runner` component manages individual Python processes with proper context cancellation, log accumulation, and per-prediction response tracking.
41+
42+
**Procedure support**: Dynamic runner creation for different source URLs with automatic eviction policies. Handles isolation requirements and resource allocation in multi-tenant scenarios.
43+
44+
**Configuration management**: Handles cog.yaml parsing, environment setup, and runtime configuration for both standard and procedure modes.
45+
46+
#### `internal/webhook` - Webhook Delivery
47+
**Webhook coordination**: Manages webhook delivery with deduplication, retry logic, and proper event filtering. Uses atomic operations to prevent duplicate terminal webhooks.
48+
49+
**Event tracking**: Tracks webhook events per prediction with proper timing and log accumulation to ensure complete notification delivery.
50+
51+
#### `internal/service` - Application Lifecycle
52+
**Service coordination**: Manages overall application lifecycle including graceful shutdown, signal handling, and component initialization.
53+
54+
**Configuration integration**: Bridges CLI configuration with internal component configuration and handles service-level concerns like working directory management.
3955

4056
### Python model runner (coglet)
4157

@@ -53,11 +69,24 @@ The `coglet` component focuses purely on model execution and introspection:
5369

5470
### Request flow architecture
5571

56-
**Standard mode**: Single Python runner handling requests sequentially or with limited concurrency based on predictor capabilities.
72+
The architecture provides clean separation between HTTP handling, runner management, and process execution:
73+
74+
#### Request Processing Flow
75+
76+
1. **HTTP Request**: `internal/server` receives and validates incoming requests
77+
2. **Runner Assignment**: `internal/runner/Manager` assigns requests to available runners using slot-based concurrency control
78+
3. **Process Execution**: `internal/runner/Runner` manages individual Python process interaction via file-based IPC
79+
4. **Response Tracking**: Per-prediction watchers monitor Python process responses and handle log accumulation
80+
5. **Webhook Delivery**: `internal/webhook` manages asynchronous webhook notifications with deduplication
81+
6. **HTTP Response**: `internal/server` formats and returns final responses to clients
82+
83+
#### Execution Modes
84+
85+
**Standard mode**: Single Python runner managed by the system with configurable concurrency based on predictor capabilities. The Manager creates and maintains one long-lived runner process.
5786

58-
**Procedure mode**: Dynamic runner management where the Go server creates Python processes on-demand for different source URLs, with automatic scaling and eviction based on usage patterns.
87+
**Procedure mode**: Dynamic runner management where the Manager creates Python processes on-demand for different source URLs. Implements LRU eviction, automatic scaling, and resource isolation between procedures.
5988

60-
**Concurrency handling**: The Go server aggregates concurrency limits across all runners and provides global throttling while individual Python processes handle their own internal concurrency based on predictor type (sync vs async).
89+
**Concurrency handling**: The Manager provides global slot-based concurrency control while individual Runners handle per-process concurrency limits. Atomic operations ensure safe concurrent access to shared state.
6190

6291
## Communication patterns
6392

@@ -87,38 +116,38 @@ The server exposes a RESTful API compatible with the original Cog specification:
87116

88117
**Output processing**: Python runners write outputs to files when needed, and the Go server handles upload/base64 encoding based on client preferences.
89118

90-
## Contrast with old Cog server
119+
## Architecture benefits
91120

92-
The new architecture addresses several limitations of the original FastAPI-based implementation:
121+
The hybrid Go/Python architecture provides several key advantages:
93122

94-
### Performance improvements
123+
### Performance characteristics
95124

96-
**Go HTTP handling**: The Go server can handle much higher request throughput and lower latency compared to Python's uvicorn, especially for health checks and simple requests.
125+
**Go HTTP handling**: The Go server provides high request throughput and low latency, especially for health checks and management requests.
97126

98-
**Process isolation**: Model crashes or hangs no longer affect the HTTP server, providing better availability and faster recovery.
127+
**Process isolation**: Model crashes or hangs do not affect the HTTP server, providing better availability and faster recovery.
99128

100-
**Concurrent processing**: Better support for concurrent predictions with proper resource accounting and backpressure management.
129+
**Concurrent processing**: Supports concurrent predictions with proper resource accounting and backpressure management through slot-based concurrency control.
101130

102-
### Reliability improvements
131+
### Reliability features
103132

104-
**Fault tolerance**: Python process crashes are isolated and can be recovered without restarting the entire server.
133+
**Fault tolerance**: Python process crashes are isolated and recovered without affecting other operations or requiring server restart.
105134

106-
**Resource management**: Better control over memory usage, file descriptor limits, and process lifecycle.
135+
**Resource management**: Provides precise control over memory usage, file descriptor limits, and process lifecycle with automatic cleanup.
107136

108137
**Dependency isolation**: Zero Python dependencies in the runtime layer eliminates version conflicts with model requirements.
109138

110-
### Operational improvements
139+
### Operational capabilities
111140

112-
**Multi-tenancy**: Procedure mode allows serving multiple models/procedures from a single server instance with proper isolation.
141+
**Multi-tenancy**: Procedure mode serves multiple models/procedures from a single server instance with proper isolation and resource allocation.
113142

114-
**Debuggability**: File-based IPC makes it easy to inspect request/response payloads and trace execution flow.
143+
**Debuggability**: File-based IPC enables easy inspection of request/response payloads and execution flow tracing.
115144

116-
**Resource cleanup**: Automatic cleanup of temporary files, processes, and other resources with proper error handling.
145+
**Resource cleanup**: Automatic cleanup of temporary files, processes, and other resources with comprehensive error handling.
117146

118-
### API compatibility
147+
### API design
119148

120-
**Backward compatibility**: Maintains full API compatibility with existing Cog clients while providing performance and reliability improvements.
149+
**Client compatibility**: Maintains full API compatibility with existing Cog clients while providing enhanced performance and reliability.
121150

122-
**Extended features**: Adds procedure mode capabilities while preserving the original single-model deployment pattern.
151+
**Flexible deployment**: Supports both single-model deployment patterns and multi-tenant procedure mode from the same codebase.
123152

124-
The old server's single-process architecture required careful management of async/await patterns and could suffer from blocking operations affecting the entire service. The new architecture's process separation eliminates these concerns while providing better scalability and fault isolation.
153+
The process separation architecture eliminates concerns around blocking operations affecting the entire service while providing better scalability and fault isolation through clear component boundaries.

cmd/cog/main.go

Lines changed: 84 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,20 @@ package main
22

33
import (
44
"context"
5-
"errors"
65
"fmt"
7-
"net/http"
86
"os"
97
"os/exec"
10-
"os/signal"
11-
"strconv"
128
"syscall"
139
"time"
1410

1511
"github.com/alecthomas/kong"
12+
"github.com/replicate/go/logging"
13+
"go.uber.org/zap"
14+
"go.uber.org/zap/zapcore"
1615

17-
"github.com/replicate/cog-runtime/internal/server"
16+
"github.com/replicate/cog-runtime/internal/config"
17+
"github.com/replicate/cog-runtime/internal/runner"
18+
"github.com/replicate/cog-runtime/internal/service"
1819
"github.com/replicate/cog-runtime/internal/util"
1920
)
2021

@@ -28,6 +29,7 @@ type ServerCmd struct {
2829
WorkingDirectory string `help:"Override the working directory for predictions" name:"working-directory" env:"COG_WORKING_DIRECTORY"`
2930
RunnerShutdownGracePeriod time.Duration `help:"Grace period before force-killing prediction runners" name:"runner-shutdown-grace-period" default:"600s" env:"COG_RUNNER_SHUTDOWN_GRACE_PERIOD"`
3031
CleanupTimeout time.Duration `help:"Maximum time to wait for process cleanup before hard exit" name:"cleanup-timeout" default:"10s" env:"COG_CLEANUP_TIMEOUT"`
32+
MaxRunners int `help:"Maximum number of runners to allow (0 for auto-detect)" name:"max-runners" env:"COG_MAX_RUNNERS" default:"0"`
3133
}
3234

3335
type SchemaCmd struct{}
@@ -40,125 +42,114 @@ type CLI struct {
4042
Test TestCmd `cmd:"" help:"Run model tests to verify functionality"`
4143
}
4244

43-
var logger = util.CreateLogger("cog")
45+
// createBaseLogger creates a base logger with configurable level
46+
func createBaseLogger(name string) *zap.Logger {
47+
logLevel := os.Getenv("COG_LOG_LEVEL")
48+
if logLevel == "" {
49+
logLevel = "info"
50+
}
51+
_, err := zapcore.ParseLevel(logLevel)
52+
if err != nil {
53+
fmt.Printf("Failed to parse log level \"%s\": %s\n", logLevel, err) //nolint:forbidigo // logger setup error reporting
54+
}
4455

45-
func (s *ServerCmd) Run() error {
46-
log := logger.Sugar()
56+
return logging.New(name).WithOptions(zap.IncreaseLevel(zapcore.DebugLevel))
57+
}
4758

59+
// buildServiceConfig converts CLI ServerCmd to service configuration
60+
func buildServiceConfig(s *ServerCmd) (config.Config, error) {
61+
log := createBaseLogger("cog-config").Sugar()
62+
63+
logLevel := log.Level()
64+
log.Infow("log level", "level", logLevel)
65+
log.Infow("env log level", "level", os.Getenv("COG_LOG_LEVEL"))
4866
// One-shot mode requires procedure mode
4967
if s.OneShot && !s.UseProcedureMode {
50-
log.Errorw("one-shot mode requires procedure mode")
51-
return fmt.Errorf("one-shot mode requires procedure mode, use --use-procedure-mode")
68+
log.Error("one-shot mode requires procedure mode")
69+
return config.Config{}, fmt.Errorf("one-shot mode requires procedure mode, use --use-procedure-mode")
5270
}
5371

5472
// Procedure mode implies await explicit shutdown
55-
// i.e. Python process exit should not trigger shutdown
73+
awaitExplicitShutdown := s.AwaitExplicitShutdown
5674
if s.UseProcedureMode {
57-
s.AwaitExplicitShutdown = true
75+
awaitExplicitShutdown = true
5876
}
59-
log.Infow("configuration",
60-
"use-procedure-mode", s.UseProcedureMode,
61-
"await-explicit-shutdown", s.AwaitExplicitShutdown,
62-
"one-shot", s.OneShot,
63-
"upload-url", s.UploadURL,
64-
)
65-
66-
addr := fmt.Sprintf("%s:%d", s.Host, s.Port)
67-
log.Infow("starting Cog HTTP server", "addr", addr, "version", util.Version(), "pid", os.Getpid())
6877

69-
var err error
70-
currentWorkingDirectory := s.WorkingDirectory
71-
if currentWorkingDirectory == "" {
72-
currentWorkingDirectory, err = os.Getwd()
78+
// Resolve working directory
79+
workingDir := s.WorkingDirectory
80+
if workingDir == "" {
81+
var err error
82+
workingDir, err = os.Getwd()
7383
if err != nil {
7484
log.Errorw("failed to get current working directory", "error", err)
75-
return err
85+
return config.Config{}, fmt.Errorf("failed to get current working directory: %w", err)
7686
}
7787
}
7888

79-
forceShutdown := make(chan struct{}, 1)
80-
81-
serverCfg := server.Config{
89+
cfg := config.Config{
90+
Host: s.Host,
91+
Port: s.Port,
8292
UseProcedureMode: s.UseProcedureMode,
83-
AwaitExplicitShutdown: s.AwaitExplicitShutdown,
93+
AwaitExplicitShutdown: awaitExplicitShutdown,
8494
OneShot: s.OneShot,
85-
IPCUrl: fmt.Sprintf("http://localhost:%d/_ipc", s.Port),
95+
WorkingDirectory: workingDir,
8696
UploadURL: s.UploadURL,
87-
WorkingDirectory: currentWorkingDirectory,
97+
IPCUrl: fmt.Sprintf("http://localhost:%d/_ipc", s.Port),
98+
MaxRunners: s.MaxRunners,
8899
RunnerShutdownGracePeriod: s.RunnerShutdownGracePeriod,
89100
CleanupTimeout: s.CleanupTimeout,
90-
ForceShutdown: forceShutdown,
91-
}
92-
// FIXME: in non-procedure mode we do not support concurrency in a meaningful way, we
93-
// statically create the runner list sized at 1.
94-
if maxRunners, ok := os.LookupEnv("COG_MAX_RUNNERS"); ok && s.UseProcedureMode {
95-
if i, err := strconv.Atoi(maxRunners); err == nil {
96-
serverCfg.MaxRunners = i
97-
} else {
98-
log.Errorw("failed to parse COG_MAX_RUNNERS", "value", maxRunners)
99-
}
100101
}
101-
ctx, cancel := context.WithCancel(context.Background())
102-
h, err := server.NewHandler(serverCfg, cancel) //nolint:contextcheck // context passing not viable in current architecture
102+
103+
log.Infow("service configuration",
104+
"use_procedure_mode", cfg.UseProcedureMode,
105+
"await_explicit_shutdown", cfg.AwaitExplicitShutdown,
106+
"one_shot", cfg.OneShot,
107+
"upload_url", cfg.UploadURL,
108+
"working_directory", cfg.WorkingDirectory,
109+
"max_runners", cfg.MaxRunners,
110+
)
111+
112+
return cfg, nil
113+
}
114+
115+
func (s *ServerCmd) Run() error {
116+
// Create base logger
117+
baseLogger := createBaseLogger("cog")
118+
log := baseLogger.Sugar()
119+
120+
// Build service configuration
121+
cfg, err := buildServiceConfig(s)
103122
if err != nil {
104-
log.Errorw("failed to create server handler", "error", err)
105123
return err
106124
}
107-
mux := server.NewServeMux(h, s.UseProcedureMode)
108-
httpServer := &http.Server{
109-
Addr: addr,
110-
Handler: mux,
111-
ReadHeaderTimeout: 5 * time.Second, // TODO: is 5s too long? likely
112-
}
113-
go func() {
114-
<-ctx.Done()
115-
if err := httpServer.Shutdown(ctx); err != nil {
116-
log.Errorw("failed to shutdown server", "error", err)
117-
os.Exit(1)
118-
}
119-
}()
120-
go func() {
121-
ch := make(chan os.Signal, 1)
122-
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
123-
for {
124-
select {
125-
case sig := <-ch:
126-
if sig == syscall.SIGTERM && s.AwaitExplicitShutdown {
127-
log.Warnw("ignoring signal to stop", "signal", sig)
128-
} else {
129-
log.Infow("stopping Cog HTTP server", "signal", sig)
130-
if err := h.Stop(); err != nil {
131-
log.Errorw("failed to stop server handler", "error", err)
132-
os.Exit(1)
133-
}
134-
}
135-
case <-forceShutdown:
136-
log.Errorw("cleanup timeout reached, forcing ungraceful shutdown")
137-
os.Exit(1)
138-
}
139-
}
140-
}()
141-
if err := httpServer.ListenAndServe(); errors.Is(err, http.ErrServerClosed) {
142-
exitCode := h.ExitCode()
143-
if exitCode == 0 {
144-
log.Infow("shutdown completed normally")
145-
} else {
146-
log.Errorw("python runner exited with code", "code", exitCode)
147-
}
148-
return nil
125+
126+
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
127+
log.Infow("starting Cog HTTP server", "addr", addr, "version", util.Version(), "pid", os.Getpid())
128+
129+
// Create service with base logger
130+
svc := service.New(cfg, baseLogger)
131+
132+
// Create root context for the entire service
133+
ctx, cancel := context.WithCancel(context.Background())
134+
defer cancel()
135+
136+
// Initialize service components
137+
if err := svc.Initialize(ctx); err != nil {
138+
return err
149139
}
150-
return err
140+
141+
return svc.Run(ctx)
151142
}
152143

153144
func (s *SchemaCmd) Run() error {
154-
log := logger.Sugar()
145+
log := createBaseLogger("cog-schema").Sugar()
155146

156147
wd, err := os.Getwd()
157148
if err != nil {
158149
log.Errorw("failed to get working directory", "error", err)
159150
return err
160151
}
161-
y, err := util.ReadCogYaml(wd)
152+
y, err := runner.ReadCogYaml(wd)
162153
if err != nil {
163154
log.Errorw("failed to read cog.yaml", "error", err)
164155
return err
@@ -177,14 +168,14 @@ func (s *SchemaCmd) Run() error {
177168
}
178169

179170
func (t *TestCmd) Run() error {
180-
log := logger.Sugar()
171+
log := createBaseLogger("cog-test").Sugar()
181172

182173
wd, err := os.Getwd()
183174
if err != nil {
184175
log.Errorw("failed to get working directory", "error", err)
185176
return err
186177
}
187-
y, err := util.ReadCogYaml(wd)
178+
y, err := runner.ReadCogYaml(wd)
188179
if err != nil {
189180
log.Errorw("failed to read cog.yaml", "error", err)
190181
return err
@@ -203,8 +194,6 @@ func (t *TestCmd) Run() error {
203194
}
204195

205196
func main() {
206-
log := logger.Sugar()
207-
208197
var cli CLI
209198
ctx := kong.Parse(&cli,
210199
kong.Name("cog"),
@@ -214,7 +203,7 @@ func main() {
214203

215204
err := ctx.Run()
216205
if err != nil {
217-
log.Error(err)
206+
fmt.Fprintf(os.Stderr, "Error: %v\n", err) //nolint:forbidigo // main function error handling
218207
os.Exit(1)
219208
}
220209
}

0 commit comments

Comments
 (0)