Skip to content

Commit 17981ad

Browse files
committed
feat: support exec
Signed-off-by: thxCode <[email protected]>
1 parent 43a7697 commit 17981ad

File tree

9 files changed

+307
-8
lines changed

9 files changed

+307
-8
lines changed

gpustack_runtime/__main__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
CreateWorkloadSubCommand,
1010
DeleteWorkloadSubCommand,
1111
DetectDevicesSubCommand,
12+
ExecWorkloadSubCommand,
1213
GetWorkloadSubCommand,
1314
ListWorkloadsSubCommand,
1415
LogsWorkloadSubCommand,
@@ -38,6 +39,7 @@ def main():
3839
GetWorkloadSubCommand.register(subcommand_parser)
3940
ListWorkloadsSubCommand.register(subcommand_parser)
4041
LogsWorkloadSubCommand.register(subcommand_parser)
42+
ExecWorkloadSubCommand.register(subcommand_parser)
4143
DetectDevicesSubCommand.register(subcommand_parser)
4244

4345
# Parse

gpustack_runtime/cmds/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
CreateRunnerWorkloadSubCommand,
55
CreateWorkloadSubCommand,
66
DeleteWorkloadSubCommand,
7+
ExecWorkloadSubCommand,
78
GetWorkloadSubCommand,
89
ListWorkloadsSubCommand,
910
LogsWorkloadSubCommand,
@@ -15,6 +16,7 @@
1516
"CreateWorkloadSubCommand",
1617
"DeleteWorkloadSubCommand",
1718
"DetectDevicesSubCommand",
19+
"ExecWorkloadSubCommand",
1820
"GetWorkloadSubCommand",
1921
"ListWorkloadsSubCommand",
2022
"LogsWorkloadSubCommand",

gpustack_runtime/cmds/deployer.py

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import json
55
import os
66
import platform
7+
import sys
78
import time
89
from argparse import REMAINDER
910
from typing import TYPE_CHECKING
@@ -21,6 +22,7 @@
2122
WorkloadStatus,
2223
create_workload,
2324
delete_workload,
25+
exec_workload,
2426
get_workload,
2527
list_workloads,
2628
logs_workload,
@@ -567,22 +569,132 @@ def __init__(self, args: Namespace):
567569
def run(self):
568570
try:
569571
print("\033[2J\033[H", end="")
570-
logs_stream = logs_workload(self.name, tail=self.tail, follow=self.follow)
572+
logs_stream = logs_workload(
573+
self.name,
574+
tail=self.tail,
575+
follow=self.follow,
576+
)
571577
with contextlib.closing(logs_stream) as logs:
572578
for line in logs:
573579
print(line.decode("utf-8").rstrip())
574580
except KeyboardInterrupt:
575581
print("\033[2J\033[H", end="")
576582

577583

584+
class ExecWorkloadSubCommand(SubCommand):
585+
"""
586+
Command to execute a command in a workload deployment.
587+
"""
588+
589+
name: str
590+
command: list[str]
591+
interactive: bool = False
592+
593+
@staticmethod
594+
def register(parser: _SubParsersAction):
595+
exec_parser = parser.add_parser(
596+
"exec",
597+
help="execute a command in a workload deployment",
598+
)
599+
600+
exec_parser.add_argument(
601+
"name",
602+
type=str,
603+
help="name of the workload",
604+
)
605+
606+
exec_parser.add_argument(
607+
"command",
608+
nargs=REMAINDER,
609+
help="command to execute in the workload",
610+
)
611+
612+
exec_parser.add_argument(
613+
"--interactive",
614+
"-i",
615+
action="store_true",
616+
help="interactive mode",
617+
)
618+
619+
exec_parser.set_defaults(func=ExecWorkloadSubCommand)
620+
621+
def __init__(self, args: Namespace):
622+
self.name = args.name
623+
self.command = args.command
624+
self.interactive = args.interactive
625+
626+
if not self.name:
627+
msg = "The name argument is required."
628+
raise ValueError(msg)
629+
630+
def run(self):
631+
try:
632+
if self.interactive:
633+
from dockerpty import io, pty # noqa: PLC0415
634+
except ImportError:
635+
print(
636+
"dockerpty is required for interactive mode. "
637+
"Please install it via 'pip install dockerpty'.",
638+
)
639+
sys.exit(1)
640+
641+
try:
642+
print("\033[2J\033[H", end="")
643+
exec_result = exec_workload(
644+
self.name,
645+
detach=not self.interactive,
646+
command=self.command,
647+
)
648+
649+
# Non-interactive mode: print output and exit with the command's exit code
650+
651+
if not self.interactive:
652+
if isinstance(exec_result.output, bytes):
653+
print(exec_result.output.decode("utf-8").rstrip())
654+
else:
655+
print(exec_result.output)
656+
sys.exit(exec_result.exit_code)
657+
658+
# Interactive mode: use dockerpty to attach to the exec session
659+
660+
class ExecOperation(pty.Operation):
661+
def __init__(self, socket):
662+
self.stdin = sys.stdin
663+
self.stdout = sys.stdout
664+
self.socket = io.Stream(socket)
665+
666+
def israw(self, **_):
667+
return self.stdout.isatty()
668+
669+
def start(self, **_):
670+
stream = self.sockets()
671+
return [
672+
io.Pump(io.Stream(self.stdin), stream, wait_for_output=False),
673+
io.Pump(stream, io.Stream(self.stdout), propagate_close=False),
674+
]
675+
676+
def resize(self, height, width, **_):
677+
pass
678+
679+
def sockets(self):
680+
return self.socket
681+
682+
exec_op = ExecOperation(exec_result.output)
683+
pty.PseudoTerminal(None, exec_op).start()
684+
except KeyboardInterrupt:
685+
print("\033[2J\033[H", end="")
686+
687+
578688
def format_workloads_json(sts: list[WorkloadStatus]) -> str:
579689
return json.dumps([st.__dict__ for st in sts], indent=2)
580690

581691

582-
def format_workloads_table(sts: list[WorkloadStatus], width: int = 100) -> str:
692+
def format_workloads_table(sts: list[WorkloadStatus]) -> str:
583693
if not sts:
584694
return "No workloads found."
585695

696+
width = 100
697+
586698
headers = ["Name", "State", "Created At"]
587699
col_widths = [
588700
len(str(getattr(st, attr.lower().replace(" ", "_"))))

gpustack_runtime/cmds/detector.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,12 @@ def format_devices_json(devs: Devices) -> str:
6969
return json.dumps([dev.__dict__ for dev in devs], indent=2)
7070

7171

72-
def format_devices_table(devs: Devices, width: int = 100) -> str:
72+
def format_devices_table(devs: Devices) -> str:
7373
if not devs:
7474
return "No GPUs detected."
7575

76+
width = 100
77+
7678
# Header section
7779
dev = devs[0]
7880
header_content = f"{dev.manufacturer.upper()} "

gpustack_runtime/deployer/__init__.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
ContainerSecurity,
2323
OperationError,
2424
UnsupportedError,
25+
WorkloadExecResult,
2526
WorkloadOperationToken,
2627
WorkloadPlan,
2728
WorkloadSecurity,
@@ -32,6 +33,8 @@
3233
from .docker import DockerDeployer, DockerWorkloadPlan, DockerWorkloadStatus
3334

3435
if TYPE_CHECKING:
36+
from collections.abc import Generator
37+
3538
from .__types__ import Deployer, WorkloadName
3639

3740
deployers: list[Deployer] = [
@@ -156,15 +159,15 @@ def logs_workload(
156159
tail: int | None = None,
157160
since: int | None = None,
158161
follow: bool = False,
159-
):
162+
) -> Generator[bytes, None, None] | bytes:
160163
"""
161164
Get the logs of a workload.
162165
163166
Args:
164167
name:
165168
The name of the workload to get logs.
166169
token:
167-
The operation token for authentication.
170+
The token for operation.
168171
timestamps:
169172
Whether to include timestamps in the logs.
170173
tail:
@@ -194,6 +197,49 @@ def logs_workload(
194197
raise UnsupportedError(msg)
195198

196199

200+
def exec_workload(
201+
name: WorkloadName,
202+
token: WorkloadOperationToken | None = None,
203+
detach: bool = True,
204+
command: list[str] | None = None,
205+
args: list[str] | None = None,
206+
) -> WorkloadExecResult:
207+
"""
208+
Execute a command in a running workload.
209+
210+
Args:
211+
name:
212+
The name of the workload to execute the command in.
213+
token:
214+
The token for operation.
215+
detach:
216+
Whether to detach from the command execution.
217+
command:
218+
The command to execute.
219+
args:
220+
The arguments to pass to the command.
221+
222+
Returns:
223+
If detach is False, return a socket object in the output of WorkloadExecResult.
224+
otherwise, return the exit code and output of the command in WorkloadExecResult.
225+
226+
Raises:
227+
UnsupportedError:
228+
If no deployer supports the given workload.
229+
OperationError:
230+
If the deployer fails to execute the command in the workload.
231+
232+
"""
233+
for dep in deployers:
234+
if not dep.is_supported():
235+
continue
236+
237+
return dep.exec(name, token, detach, command, args)
238+
239+
msg = "No deployer supports"
240+
raise UnsupportedError(msg)
241+
242+
197243
__all__ = [
198244
"Container",
199245
"ContainerCapabilities",
@@ -216,6 +262,7 @@ def logs_workload(
216262
"DockerWorkloadStatus",
217263
"OperationError",
218264
"UnsupportedError",
265+
"WorkloadExecResult",
219266
"WorkloadOperationToken",
220267
"WorkloadPlan",
221268
"WorkloadPlan",
@@ -225,6 +272,7 @@ def logs_workload(
225272
"WorkloadStatusStateEnum",
226273
"create_workload",
227274
"delete_workload",
275+
"exec_workload",
228276
"get_workload",
229277
"list_workloads",
230278
"logs_workload",

gpustack_runtime/deployer/__types__.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
from abc import ABC, abstractmethod
44
from dataclasses import dataclass, field
55
from enum import Enum
6+
from typing import TYPE_CHECKING, NamedTuple
67

78
from dataclasses_json import dataclass_json
89

10+
if TYPE_CHECKING:
11+
from collections.abc import Generator
12+
913

1014
class UnsupportedError(Exception):
1115
"""
@@ -788,6 +792,16 @@ class WorkloadStatus:
788792
"""
789793

790794

795+
class WorkloadExecResult(NamedTuple):
796+
"""
797+
Result of an exec command.
798+
799+
"""
800+
801+
exit_code: int | None
802+
output: str | bytes | object | None
803+
804+
791805
class Deployer(ABC):
792806
"""
793807
Base class for all deployers.
@@ -895,7 +909,7 @@ def logs(
895909
tail: int | None = None,
896910
since: int | None = None,
897911
follow: bool = False,
898-
):
912+
) -> Generator[bytes, None, None] | bytes:
899913
"""
900914
Get the logs of a workload.
901915
@@ -925,3 +939,42 @@ def logs(
925939
926940
"""
927941
raise NotImplementedError
942+
943+
@abstractmethod
944+
def exec(
945+
self,
946+
name: WorkloadName,
947+
token: WorkloadOperationToken | None = None,
948+
detach: bool = True,
949+
command: list[str] | None = None,
950+
args: list[str] | None = None,
951+
) -> WorkloadExecResult:
952+
"""
953+
Execute a command in a workload.
954+
955+
Args:
956+
name:
957+
The name of the workload.
958+
token:
959+
The operation token of the workload.
960+
If not specified, execute in the first executable container.
961+
detach:
962+
Whether to detach from the command.
963+
command:
964+
The command to execute.
965+
If not specified, use the command defined in the container.
966+
args:
967+
The arguments to pass to the command.
968+
969+
Returns:
970+
If detach is False, return a socket object in the output of WorkloadExecResult.
971+
otherwise, return the exit code and output of the command in WorkloadExecResult.
972+
973+
Raises:
974+
UnsupportedError:
975+
If the deployer is not supported in the current environment.
976+
OperationError:
977+
If the workload fails to execute the command.
978+
979+
"""
980+
raise NotImplementedError

0 commit comments

Comments
 (0)