99import platform
1010import shutil
1111import socket
12+ import sys
1213import tempfile
1314import typing as t
1415
1718
1819from _bentoml_sdk import Service
1920from bentoml ._internal .container import BentoMLContainer
21+ from bentoml ._internal .utils import expand_envs
22+ from bentoml ._internal .utils import reserve_free_port
2023from bentoml ._internal .utils .circus import Server
2124from bentoml .exceptions import BentoMLConfigException
2225
@@ -64,8 +67,6 @@ def _get_server_socket(
6467 ) -> tuple [str , CircusSocket ]:
6568 from circus .sockets import CircusSocket
6669
67- from bentoml ._internal .utils import reserve_free_port
68-
6970 runner_port = port_stack .enter_context (reserve_free_port ())
7071 runner_host = "127.0.0.1"
7172
@@ -103,30 +104,49 @@ def create_dependency_watcher(
103104 working_dir : str | None = None ,
104105 env : dict [str , str ] | None = None ,
105106 bento_args : dict [str , t .Any ] = Provide [BentoMLContainer .bento_arguments ],
106- ) -> tuple [Watcher , CircusSocket , str ]:
107+ ) -> tuple [Watcher , CircusSocket | None , str ]:
107108 from bentoml .serving import create_watcher
108109
109- num_workers , worker_envs = scheduler .get_worker_env (svc )
110- uri , socket = _get_server_socket (svc , uds_path , port_stack , backlog )
111- args = [
112- "-m" ,
113- _SERVICE_WORKER_SCRIPT ,
114- bento_identifier ,
115- "--service-name" ,
116- svc .name ,
117- "--fd" ,
118- f"$(circus.sockets.{ svc .name } )" ,
119- "--worker-id" ,
120- "$(CIRCUS.WID)" ,
121- "--args" ,
122- json .dumps (bento_args ),
123- ]
124-
125- if worker_envs :
126- args .extend (["--worker-env" , json .dumps (worker_envs )])
110+ if svc .cmd is not None :
111+ num_workers = 1 # Custom command only runs a single worker
112+ svc_port = port_stack .enter_context (reserve_free_port ())
113+ if env is None :
114+ env = {** os .environ , "PORT" : str (svc_port )}
115+ else :
116+ env = {** os .environ , ** env , "PORT" : str (svc_port )}
117+ uri = f"http://127.0.0.1:{ svc_port } "
118+ socket = None
119+ working_dir = svc .working_dir
120+ logger .info (
121+ "Starting with custom command for dependency service(%s): %s" ,
122+ svc .name ,
123+ svc .cmd ,
124+ )
125+ cmd , * args = [expand_envs (p , env ) for p in svc .cmd ]
126+ else :
127+ num_workers , worker_envs = scheduler .get_worker_env (svc )
128+ uri , socket = _get_server_socket (svc , uds_path , port_stack , backlog )
129+ args = [
130+ "-m" ,
131+ _SERVICE_WORKER_SCRIPT ,
132+ bento_identifier ,
133+ "--service-name" ,
134+ svc .name ,
135+ "--fd" ,
136+ f"$(circus.sockets.{ svc .name } )" ,
137+ "--worker-id" ,
138+ "$(CIRCUS.WID)" ,
139+ "--args" ,
140+ json .dumps (bento_args ),
141+ ]
142+ cmd = sys .executable
143+
144+ if worker_envs :
145+ args .extend (["--worker-env" , json .dumps (worker_envs )])
127146
128147 watcher = create_watcher (
129148 name = f"service_{ svc .name } " ,
149+ cmd = cmd ,
130150 args = args ,
131151 numprocesses = num_workers ,
132152 working_dir = working_dir ,
@@ -269,7 +289,8 @@ def serve_http(
269289 env = {k : str (v ) for k , v in dependency_env .items ()},
270290 )
271291 watchers .append (new_watcher )
272- sockets .append (new_socket )
292+ if new_socket :
293+ sockets .append (new_socket )
273294 dependency_map [name ] = uri
274295 server_on_deployment (dep_svc )
275296 # reserve one more to avoid conflicts
@@ -287,62 +308,78 @@ def serve_http(
287308 )
288309 except ValueError as e :
289310 raise BentoMLConfigException (f"Invalid host IP address: { host } " ) from e
290-
291- sockets .append (
292- CircusSocket (
293- name = API_SERVER_NAME ,
294- host = host ,
295- port = port ,
296- family = family ,
297- backlog = backlog ,
311+ if svc .cmd is not None :
312+ logger .info ("Starting with custom command for entry service: %s" , svc .cmd )
313+ num_workers = 1 # Custom command only runs a single worker
314+ env .update (os .environ )
315+ env .update (
316+ {
317+ "PORT" : str (port ),
318+ "BENTOML_HOST" : host ,
319+ "BENTOML_PORT" : str (port ),
320+ }
298321 )
299- )
300- if BentoMLContainer .ssl .enabled .get () and not ssl_certfile :
301- raise BentoMLConfigException ("ssl_certfile is required when ssl is enabled" )
302-
303- ssl_args = construct_ssl_args (
304- ssl_certfile = ssl_certfile ,
305- ssl_keyfile = ssl_keyfile ,
306- ssl_keyfile_password = ssl_keyfile_password ,
307- ssl_version = ssl_version ,
308- ssl_cert_reqs = ssl_cert_reqs ,
309- ssl_ca_certs = ssl_ca_certs ,
310- ssl_ciphers = ssl_ciphers ,
311- )
312- timeouts_args = construct_timeouts_args (
313- timeout_keep_alive = timeout_keep_alive ,
314- timeout_graceful_shutdown = timeout_graceful_shutdown ,
315- )
316- timeout_args = ["--timeout" , str (timeout )] if timeout else []
317- bento_args = BentoMLContainer .bento_arguments .get ()
322+ server_cmd , * server_args = [expand_envs (p , env ) for p in svc .cmd ]
323+ bento_path = pathlib .Path (svc .working_dir )
324+ else :
325+ sockets .append (
326+ CircusSocket (
327+ name = API_SERVER_NAME ,
328+ host = host ,
329+ port = port ,
330+ family = family ,
331+ backlog = backlog ,
332+ )
333+ )
334+ if BentoMLContainer .ssl .enabled .get () and not ssl_certfile :
335+ raise BentoMLConfigException (
336+ "ssl_certfile is required when ssl is enabled"
337+ )
318338
319- server_args = [
320- "-m" ,
321- _SERVICE_WORKER_SCRIPT ,
322- bento_identifier ,
323- "--fd" ,
324- f"$(circus.sockets.{ API_SERVER_NAME } )" ,
325- "--service-name" ,
326- svc .name ,
327- "--backlog" ,
328- str (backlog ),
329- "--worker-id" ,
330- "$(CIRCUS.WID)" ,
331- "--args" ,
332- json .dumps (bento_args ),
333- * ssl_args ,
334- * timeouts_args ,
335- * timeout_args ,
336- ]
337- if worker_envs :
338- server_args .extend (["--worker-env" , json .dumps (worker_envs )])
339- if development_mode :
340- server_args .append ("--development-mode" )
339+ ssl_args = construct_ssl_args (
340+ ssl_certfile = ssl_certfile ,
341+ ssl_keyfile = ssl_keyfile ,
342+ ssl_keyfile_password = ssl_keyfile_password ,
343+ ssl_version = ssl_version ,
344+ ssl_cert_reqs = ssl_cert_reqs ,
345+ ssl_ca_certs = ssl_ca_certs ,
346+ ssl_ciphers = ssl_ciphers ,
347+ )
348+ timeouts_args = construct_timeouts_args (
349+ timeout_keep_alive = timeout_keep_alive ,
350+ timeout_graceful_shutdown = timeout_graceful_shutdown ,
351+ )
352+ timeout_args = ["--timeout" , str (timeout )] if timeout else []
353+ bento_args = BentoMLContainer .bento_arguments .get ()
354+ server_cmd = sys .executable
355+ server_args = [
356+ "-m" ,
357+ _SERVICE_WORKER_SCRIPT ,
358+ bento_identifier ,
359+ "--fd" ,
360+ f"$(circus.sockets.{ API_SERVER_NAME } )" ,
361+ "--service-name" ,
362+ svc .name ,
363+ "--backlog" ,
364+ str (backlog ),
365+ "--worker-id" ,
366+ "$(CIRCUS.WID)" ,
367+ "--args" ,
368+ json .dumps (bento_args ),
369+ * ssl_args ,
370+ * timeouts_args ,
371+ * timeout_args ,
372+ ]
373+ if worker_envs :
374+ server_args .extend (["--worker-env" , json .dumps (worker_envs )])
375+ if development_mode :
376+ server_args .append ("--development-mode" )
341377
342378 scheme = "https" if BentoMLContainer .ssl .enabled .get () else "http"
343379 watchers .append (
344380 create_watcher (
345381 name = "service" ,
382+ cmd = server_cmd ,
346383 args = server_args ,
347384 working_dir = str (bento_path .absolute ()),
348385 numprocesses = num_workers ,
0 commit comments