@@ -12,231 +12,189 @@ import (
1212 "syscall"
1313 "time"
1414
15- "github.com/peterbourgon/ff/v4"
16- "github.com/peterbourgon/ff/v4/ffhelp"
17- _ "go.uber.org/automaxprocs"
15+ "github.com/alecthomas/kong"
1816
1917 "github.com/replicate/cog-runtime/internal/server"
2018 "github.com/replicate/cog-runtime/internal/util"
2119)
2220
23- type ServerConfig struct {
24- Host string `ff:"long: host, default: 0.0.0.0, usage: HTTP server host "`
25- Port int `ff:"long: port, default: 5000, usage: HTTP server port "`
26- UseProcedureMode bool `ff:"long: use- procedure- mode, default: false, usage: use-procedure mode"`
27- AwaitExplicitShutdown bool `ff:"long: await- explicit- shutdown, default: false, usage: await explicit shutdown"`
28- UploadURL string `ff:"long: upload-url, nodefault, usage: output file upload URL "`
29- WorkingDirectory string `ff:"long: working- directory, nodefault, usage: explicit working directory override "`
30- RunnerShutdownGracePeriod time.Duration `ff:"long: runner-shutdown-grace- period, default: 600s, usage: how long to wait before force-killing runners after Stop() "`
21+ type ServerCmd struct {
22+ Host string `help:"Host address to bind the HTTP server to" default:" 0.0.0.0"`
23+ Port int `help:"Port number for the HTTP server" default:"5000 "`
24+ UseProcedureMode bool `help:"Enable procedure mode for concurrent predictions" name:" use-procedure- mode"`
25+ AwaitExplicitShutdown bool `help:"Wait for explicit shutdown signal instead of auto-shutdown" name:"await- explicit- shutdown"`
26+ UploadURL string `help:"Base URL for uploading prediction output files" name:" upload-url "`
27+ WorkingDirectory string `help:"Override the working directory for predictions" name:" working- directory"`
28+ RunnerShutdownGracePeriod time.Duration `help:"Grace period before force-killing prediction runners" name:"runner-shutdown-grace-period" default:"600s "`
3129}
3230
33- var logger = util . CreateLogger ( "cog" )
31+ type SchemaCmd struct {}
3432
35- func schemaCommand () * ff.Command {
36- log := logger .Sugar ()
33+ type TestCmd struct {}
3734
38- flags := ff .NewFlagSet ("schema" )
39-
40- return & ff.Command {
41- Name : "schema" ,
42- Usage : "schema [FLAGS]" ,
43- Flags : flags ,
44- Exec : func (ctx context.Context , args []string ) error {
45- wd , err := os .Getwd ()
46- if err != nil {
47- log .Errorw ("failed to get working directory" , "error" , err )
48- return err
49- }
50- y , err := util .ReadCogYaml (wd )
51- if err != nil {
52- log .Errorw ("failed to read cog.yaml" , "error" , err )
53- return err
54- }
55- m , c , err := y .PredictModuleAndPredictor ()
56- if err != nil {
57- log .Errorw ("failed to parse predict" , "error" , err )
58- return err
59- }
60- bin , err := exec .LookPath ("python3" )
61- if err != nil {
62- log .Errorw ("failed to find python3" , "error" , err )
63- return err
64- }
65- return syscall .Exec (bin , []string {bin , "-m" , "coglet.schema" , m , c }, os .Environ ()) //nolint:gosec // expected subprocess launched with variable
66- },
67- }
35+ type CLI struct {
36+ Server ServerCmd `cmd:"" help:"Start the Cog HTTP server for serving predictions"`
37+ Schema SchemaCmd `cmd:"" help:"Generate OpenAPI schema from model definition"`
38+ Test TestCmd `cmd:"" help:"Run model tests to verify functionality"`
6839}
6940
70- func serverCommand () (* ff.Command , error ) {
71- log := logger .Sugar ()
41+ var logger = util .CreateLogger ("cog" )
7242
73- var cfg ServerConfig
74- flags := ff . NewFlagSet ( "server" )
43+ func ( s * ServerCmd ) Run () error {
44+ log := logger . Sugar ( )
7545
76- if err := flags .AddStruct (& cfg ); err != nil {
77- return nil , err
46+ // Procedure mode implies await explicit shutdown
47+ // i.e. Python process exit should not trigger shutdown
48+ if s .UseProcedureMode {
49+ s .AwaitExplicitShutdown = true
50+ }
51+ log .Infow ("configuration" ,
52+ "use-procedure-mode" , s .UseProcedureMode ,
53+ "await-explicit-shutdown" , s .AwaitExplicitShutdown ,
54+ "upload-url" , s .UploadURL ,
55+ )
56+
57+ addr := fmt .Sprintf ("%s:%d" , s .Host , s .Port )
58+ log .Infow ("starting Cog HTTP server" , "addr" , addr , "version" , util .Version (), "pid" , os .Getpid ())
59+
60+ var err error
61+ currentWorkingDirectory := s .WorkingDirectory
62+ if currentWorkingDirectory == "" {
63+ currentWorkingDirectory , err = os .Getwd ()
64+ if err != nil {
65+ log .Errorw ("failed to get current working directory" , "error" , err )
66+ return err
67+ }
7868 }
7969
80- return & ff.Command {
81- Name : "server" ,
82- Usage : "server [FLAGS]" ,
83- Flags : flags ,
84- Exec : func (ctx context.Context , args []string ) error {
85- // Procedure mode implies await explicit shutdown
86- // i.e. Python process exit should not trigger shutdown
87- if cfg .UseProcedureMode {
88- cfg .AwaitExplicitShutdown = true
89- }
90- log .Infow ("configuration" ,
91- "use-procedure-mode" , cfg .UseProcedureMode ,
92- "await-explicit-shutdown" , cfg .AwaitExplicitShutdown ,
93- "upload-url" , cfg .UploadURL ,
94- )
95-
96- addr := fmt .Sprintf ("%s:%d" , cfg .Host , cfg .Port )
97- log .Infow ("starting Cog HTTP server" , "addr" , addr , "version" , util .Version (), "pid" , os .Getpid ())
98-
99- var err error
100- currentWorkingDirectory := cfg .WorkingDirectory
101- if currentWorkingDirectory == "" {
102- currentWorkingDirectory , err = os .Getwd ()
103- if err != nil {
104- log .Errorw ("failed to get current working directory" , "error" , err )
105- return err
106- }
107- }
108-
109- serverCfg := server.Config {
110- UseProcedureMode : cfg .UseProcedureMode ,
111- AwaitExplicitShutdown : cfg .AwaitExplicitShutdown ,
112- IPCUrl : fmt .Sprintf ("http://localhost:%d/_ipc" , cfg .Port ),
113- UploadURL : cfg .UploadURL ,
114- WorkingDirectory : currentWorkingDirectory ,
115- RunnerShutdownGracePeriod : cfg .RunnerShutdownGracePeriod ,
116- }
117- // FIXME: in non-procedure mode we do not support concurrency in a meaningful way, we
118- // statically create the runner list sized at 1.
119- if s , ok := os .LookupEnv ("COG_MAX_RUNNERS" ); ok && cfg .UseProcedureMode {
120- if i , err := strconv .Atoi (s ); err == nil {
121- serverCfg .MaxRunners = i
122- } else {
123- log .Errorw ("failed to parse COG_MAX_RUNNERS" , "value" , s )
124- }
125- }
126- ctx , cancel := context .WithCancel (ctx )
127- h , err := server .NewHandler (serverCfg , cancel ) //nolint:contextcheck // context passing not viable in current architecture
128- if err != nil {
129- log .Errorw ("failed to create server handler" , "error" , err )
130- return err
131- }
132- mux := server .NewServeMux (h , cfg .UseProcedureMode )
133- s := & http.Server {
134- Addr : addr ,
135- Handler : mux ,
136- ReadHeaderTimeout : 5 * time .Second , // TODO: is 5s too long? likely
137- }
138- go func () {
139- <- ctx .Done ()
140- if err := s .Shutdown (ctx ); err != nil {
141- log .Errorw ("failed to shutdown server" , "error" , err )
70+ serverCfg := server.Config {
71+ UseProcedureMode : s .UseProcedureMode ,
72+ AwaitExplicitShutdown : s .AwaitExplicitShutdown ,
73+ IPCUrl : fmt .Sprintf ("http://localhost:%d/_ipc" , s .Port ),
74+ UploadURL : s .UploadURL ,
75+ WorkingDirectory : currentWorkingDirectory ,
76+ RunnerShutdownGracePeriod : s .RunnerShutdownGracePeriod ,
77+ }
78+ // FIXME: in non-procedure mode we do not support concurrency in a meaningful way, we
79+ // statically create the runner list sized at 1.
80+ if maxRunners , ok := os .LookupEnv ("COG_MAX_RUNNERS" ); ok && s .UseProcedureMode {
81+ if i , err := strconv .Atoi (maxRunners ); err == nil {
82+ serverCfg .MaxRunners = i
83+ } else {
84+ log .Errorw ("failed to parse COG_MAX_RUNNERS" , "value" , maxRunners )
85+ }
86+ }
87+ ctx , cancel := context .WithCancel (context .Background ())
88+ h , err := server .NewHandler (serverCfg , cancel ) //nolint:contextcheck // context passing not viable in current architecture
89+ if err != nil {
90+ log .Errorw ("failed to create server handler" , "error" , err )
91+ return err
92+ }
93+ mux := server .NewServeMux (h , s .UseProcedureMode )
94+ httpServer := & http.Server {
95+ Addr : addr ,
96+ Handler : mux ,
97+ ReadHeaderTimeout : 5 * time .Second , // TODO: is 5s too long? likely
98+ }
99+ go func () {
100+ <- ctx .Done ()
101+ if err := httpServer .Shutdown (ctx ); err != nil {
102+ log .Errorw ("failed to shutdown server" , "error" , err )
103+ os .Exit (1 )
104+ }
105+ }()
106+ go func () {
107+ ch := make (chan os.Signal , 1 )
108+ signal .Notify (ch , os .Interrupt , syscall .SIGTERM )
109+ for {
110+ sig := <- ch
111+ if sig == syscall .SIGTERM && s .AwaitExplicitShutdown {
112+ log .Warnw ("ignoring signal to stop" , "signal" , sig )
113+ } else {
114+ log .Infow ("stopping Cog HTTP server" , "signal" , sig )
115+ if err := h .Stop (); err != nil {
116+ log .Errorw ("failed to stop server handler" , "error" , err )
142117 os .Exit (1 )
143118 }
144- }()
145- go func () {
146- ch := make (chan os.Signal , 1 )
147- signal .Notify (ch , os .Interrupt , syscall .SIGTERM )
148- for {
149- sig := <- ch
150- if sig == syscall .SIGTERM && cfg .AwaitExplicitShutdown {
151- log .Warnw ("ignoring signal to stop" , "signal" , sig )
152- } else {
153- log .Infow ("stopping Cog HTTP server" , "signal" , sig )
154- if err := h .Stop (); err != nil {
155- log .Errorw ("failed to stop server handler" , "error" , err )
156- os .Exit (1 )
157- }
158- }
159- }
160- }()
161- if err := s .ListenAndServe (); errors .Is (err , http .ErrServerClosed ) {
162- exitCode := h .ExitCode ()
163- if exitCode == 0 {
164- log .Infow ("shutdown completed normally" )
165- } else {
166- log .Errorw ("python runner exited with code" , "code" , exitCode )
167- }
168- return nil
169119 }
170- return err
171- },
172- }, nil
120+ }
121+ }()
122+ if err := httpServer .ListenAndServe (); errors .Is (err , http .ErrServerClosed ) {
123+ exitCode := h .ExitCode ()
124+ if exitCode == 0 {
125+ log .Infow ("shutdown completed normally" )
126+ } else {
127+ log .Errorw ("python runner exited with code" , "code" , exitCode )
128+ }
129+ return nil
130+ }
131+ return err
173132}
174133
175- func testCommand () * ff. Command {
134+ func ( s * SchemaCmd ) Run () error {
176135 log := logger .Sugar ()
177136
178- flags := ff .NewFlagSet ("test" )
179-
180- return & ff.Command {
181- Name : "test" ,
182- Usage : "test [FLAGS]" ,
183- Flags : flags ,
184- Exec : func (ctx context.Context , args []string ) error {
185- wd , err := os .Getwd ()
186- if err != nil {
187- log .Errorw ("failed to get working directory" , "error" , err )
188- return err
189- }
190- y , err := util .ReadCogYaml (wd )
191- if err != nil {
192- log .Errorw ("failed to read cog.yaml" , "error" , err )
193- return err
194- }
195- m , c , err := y .PredictModuleAndPredictor ()
196- if err != nil {
197- log .Errorw ("failed to parse predict" , "error" , err )
198- return err
199- }
200- bin , err := exec .LookPath ("python3" )
201- if err != nil {
202- log .Errorw ("failed to find python3" , "error" , err )
203- return err
204- }
205- return syscall .Exec (bin , []string {bin , "-m" , "coglet.test" , m , c }, os .Environ ()) //nolint:gosec // expected subprocess launched with variable
206- },
137+ wd , err := os .Getwd ()
138+ if err != nil {
139+ log .Errorw ("failed to get working directory" , "error" , err )
140+ return err
207141 }
142+ y , err := util .ReadCogYaml (wd )
143+ if err != nil {
144+ log .Errorw ("failed to read cog.yaml" , "error" , err )
145+ return err
146+ }
147+ m , c , err := y .PredictModuleAndPredictor ()
148+ if err != nil {
149+ log .Errorw ("failed to parse predict" , "error" , err )
150+ return err
151+ }
152+ bin , err := exec .LookPath ("python3" )
153+ if err != nil {
154+ log .Errorw ("failed to find python3" , "error" , err )
155+ return err
156+ }
157+ return syscall .Exec (bin , []string {bin , "-m" , "coglet.schema" , m , c }, os .Environ ()) //nolint:gosec // expected subprocess launched with variable
208158}
209159
210- func main () {
160+ func ( t * TestCmd ) Run () error {
211161 log := logger .Sugar ()
212- flags := ff . NewFlagSet ( "cog" )
213- serverCommand , err := serverCommand ()
162+
163+ wd , err := os . Getwd ()
214164 if err != nil {
215- log .Errorw ("failed to create server command " , "error" , err )
216- os . Exit ( 1 )
165+ log .Errorw ("failed to get working directory " , "error" , err )
166+ return err
217167 }
218- cmd := & ff.Command {
219- Name : "cog" ,
220- Usage : "cog <COMMAND> [FLAGS]" ,
221- Flags : flags ,
222- Exec : func (ctx context.Context , args []string ) error {
223- return ff .ErrHelp
224- },
225- Subcommands : []* ff.Command {
226- schemaCommand (),
227- serverCommand ,
228- testCommand (),
229- },
230- }
231- err = cmd .ParseAndRun (context .Background (), os .Args [1 :])
232- switch {
233- case errors .Is (err , ff .ErrHelp ):
234- _ , err := fmt .Fprintln (os .Stderr , ffhelp .Command (cmd ))
235- if err != nil {
236- log .Errorw ("failed to print help" , "error" , err )
237- }
238- os .Exit (1 )
239- case err != nil :
168+ y , err := util .ReadCogYaml (wd )
169+ if err != nil {
170+ log .Errorw ("failed to read cog.yaml" , "error" , err )
171+ return err
172+ }
173+ m , c , err := y .PredictModuleAndPredictor ()
174+ if err != nil {
175+ log .Errorw ("failed to parse predict" , "error" , err )
176+ return err
177+ }
178+ bin , err := exec .LookPath ("python3" )
179+ if err != nil {
180+ log .Errorw ("failed to find python3" , "error" , err )
181+ return err
182+ }
183+ return syscall .Exec (bin , []string {bin , "-m" , "coglet.test" , m , c }, os .Environ ()) //nolint:gosec // expected subprocess launched with variable
184+ }
185+
186+ func main () {
187+ log := logger .Sugar ()
188+
189+ var cli CLI
190+ ctx := kong .Parse (& cli ,
191+ kong .Name ("cog" ),
192+ kong .Description ("Cog runtime for serving machine learning models via HTTP API" ),
193+ kong .UsageOnError (),
194+ )
195+
196+ err := ctx .Run ()
197+ if err != nil {
240198 log .Error (err )
241199 os .Exit (1 )
242200 }
0 commit comments