-
Notifications
You must be signed in to change notification settings - Fork 218
Expand file tree
/
Copy pathworkspace.py
More file actions
442 lines (378 loc) · 15.6 KB
/
workspace.py
File metadata and controls
442 lines (378 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
"""Docker-based remote workspace implementation."""
import os
import subprocess
import sys
import threading
import time
import uuid
from typing import Any
from urllib.request import urlopen
from pydantic import Field, PrivateAttr, model_validator
from openhands.sdk.logger import get_logger
from openhands.sdk.utils.command import execute_command
from openhands.sdk.utils.deprecation import warn_deprecated
from openhands.sdk.workspace import PlatformType, RemoteWorkspace
logger = get_logger(__name__)
def check_port_available(port: int) -> bool:
"""Check if a port is available for binding."""
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.bind(("0.0.0.0", port))
return True
except OSError:
time.sleep(0.1)
return False
finally:
sock.close()
def find_available_tcp_port(
min_port: int = 30000, max_port: int = 39999, max_attempts: int = 50
) -> int:
"""Find an available TCP port in a specified range."""
import random
rng = random.SystemRandom()
ports = list(range(min_port, max_port + 1))
rng.shuffle(ports)
for port in ports[:max_attempts]:
if check_port_available(port):
return port
return -1
class DockerWorkspace(RemoteWorkspace):
"""Remote workspace that sets up and manages a Docker container.
This workspace creates a Docker container running a pre-built OpenHands agent
server image, waits for it to become healthy, and then provides remote workspace
operations through the container's HTTP API.
Note: This class only works with pre-built images. To build images on-the-fly
from a base image, use DockerDevWorkspace instead.
Example:
with DockerWorkspace(
server_image="ghcr.io/openhands/agent-server:latest"
) as workspace:
result = workspace.execute_command("ls -la")
"""
# Override parent fields with defaults
working_dir: str = Field(
default="/workspace",
description="Working directory inside the container.",
)
host: str = Field(
default="",
description=("Remote host URL (set automatically during container startup)."),
)
# Docker-specific configuration
server_image: str | None = Field(
default="ghcr.io/openhands/agent-server:latest-python",
description="Pre-built agent server image to use.",
)
host_port: int | None = Field(
default=None,
description="Port to bind the container to. If None, finds available port.",
)
forward_env: list[str] = Field(
default_factory=lambda: ["DEBUG"],
description="Environment variables to forward to the container.",
)
laminar_api_key: str | None = Field(
default=None,
description="Laminar API key for observability tracing. "
"When provided, injected as LMNR_PROJECT_API_KEY in the container. "
"Should NOT be included in forward_env to avoid logging leaks.",
)
mount_dir: str | None = Field(
default=None,
description="Optional host directory to mount into the container.",
)
volumes: list[str] = Field(
default_factory=list,
description="Additional volume mounts for the Docker container.",
)
detach_logs: bool = Field(
default=True, description="Whether to stream Docker logs in background."
)
platform: PlatformType = Field(
default="linux/amd64", description="Platform for the Docker image."
)
extra_ports: bool = Field(
default=False,
description="Whether to expose additional ports (VSCode, VNC).",
)
enable_gpu: bool = Field(
default=False,
description="Whether to enable GPU support with --gpus all.",
)
cleanup_image: bool = Field(
default=False,
description="Whether to delete the Docker image when cleaning up workspace.",
)
network: str | None = Field(
default=None,
description="Connect a container to the specified Docker network.",
)
_container_id: str | None = PrivateAttr(default=None)
_image_name: str | None = PrivateAttr(default=None)
_logs_thread: threading.Thread | None = PrivateAttr(default=None)
_stop_logs: threading.Event = PrivateAttr(default_factory=threading.Event)
@model_validator(mode="after")
def _validate_server_image(self):
"""Ensure server_image is set when using DockerWorkspace directly."""
if self.__class__ is DockerWorkspace and self.server_image is None:
raise ValueError("server_image must be provided")
return self
@model_validator(mode="after")
def _validate_mount_dir(self):
if self.mount_dir:
warn_deprecated(
"DockerWorkspace.mount_dir",
deprecated_in="1.10.0",
removed_in=None,
details="Use DockerWorkspace.volumes instead",
)
self.volumes.append(f"{self.mount_dir}:/workspace")
return self
def model_post_init(self, context: Any) -> None:
"""Set up the Docker container and initialize the remote workspace."""
# Subclasses should call get_image() to get the image to use
# This allows them to build or prepare the image before container startup
image = self.get_image()
self._start_container(image, context)
def get_image(self) -> str:
"""Get the Docker image to use for the container.
Subclasses can override this to provide custom image resolution logic
(e.g., building images on-the-fly).
Returns:
The Docker image tag to use.
"""
if self.server_image is None:
raise ValueError("server_image must be set")
return self.server_image
def _start_container(self, image: str, context: Any) -> None:
"""Start the Docker container with the given image.
This method handles all container lifecycle: port allocation, Docker
validation, container creation, health checks, and RemoteWorkspace
initialization.
Args:
image: The Docker image tag to use.
context: The Pydantic context from model_post_init.
"""
# Store the image name for cleanup
self._image_name = image
# Determine port
if self.host_port is None:
self.host_port = find_available_tcp_port()
else:
self.host_port = int(self.host_port)
if not check_port_available(self.host_port):
raise RuntimeError(f"Port {self.host_port} is not available")
if self.extra_ports:
if not check_port_available(self.host_port + 1):
raise RuntimeError(
f"Port {self.host_port + 1} is not available for VSCode"
)
if not check_port_available(self.host_port + 2):
raise RuntimeError(
f"Port {self.host_port + 2} is not available for VNC"
)
# Ensure docker is available
docker_ver = execute_command(["docker", "version"]).returncode
if docker_ver != 0:
raise RuntimeError(
"Docker is not available. Please install and start "
"Docker Desktop/daemon."
)
# Prepare Docker run flags
flags: list[str] = []
for key in self.forward_env:
if key in os.environ:
# Skip LMNR_PROJECT_API_KEY if it's in forward_env to avoid log leaks
# Use laminar_api_key field instead
if key == "LMNR_PROJECT_API_KEY":
logger.warning(
"LMNR_PROJECT_API_KEY is in forward_env list. "
"This may cause credential leaks in logs. "
"Use the 'laminar_api_key' field instead."
)
continue
flags += ["-e", f"{key}={os.environ[key]}"]
# Inject Laminar API key directly (not via environment variable)
# This ensures credentials don't appear in logged payloads
if self.laminar_api_key:
flags += ["-e", f"LMNR_PROJECT_API_KEY={self.laminar_api_key}"]
for volume in self.volumes:
flags += ["-v", volume]
logger.info(f"Adding volume mount: {volume}")
ports = ["-p", f"{self.host_port}:8000"]
if self.extra_ports:
ports += [
"-p",
f"{self.host_port + 1}:8001", # VSCode
"-p",
f"{self.host_port + 2}:8002", # Desktop VNC
]
flags += ports
# Add GPU support if enabled
if self.enable_gpu:
flags += ["--gpus", "all"]
# Connect container to the specified Docker network
if self.network:
flags += ["--network", self.network]
# Run container
run_cmd = [
"docker",
"run",
"-d",
"--platform",
self.platform,
"--rm",
"--ulimit",
"nofile=65536:65536", # prevent "too many open files" errors
"--name",
f"agent-server-{uuid.uuid4()}",
*flags,
image,
"--host",
"0.0.0.0",
"--port",
"8000",
]
proc = execute_command(run_cmd)
if proc.returncode != 0:
raise RuntimeError(f"Failed to run docker container: {proc.stderr}")
self._container_id = proc.stdout.strip()
logger.info(f"Started container: {self._container_id}")
# Optionally stream logs in background
if self.detach_logs:
self._logs_thread = threading.Thread(
target=self._stream_docker_logs, daemon=True
)
self._logs_thread.start()
# Set host for RemoteWorkspace to use
# The container exposes port 8000, mapped to self.host_port
# Override parent's host initialization
if not self.host:
object.__setattr__(self, "host", f"http://127.0.0.1:{self.host_port}")
object.__setattr__(self, "api_key", None)
# Wait for container to be healthy
self._wait_for_health()
logger.info(f"Docker workspace is ready at {self.host}")
# Now initialize the parent RemoteWorkspace with the container URL
super().model_post_init(context)
def _stream_docker_logs(self) -> None:
"""Stream Docker logs to stdout in the background."""
if not self._container_id:
return
try:
p = subprocess.Popen(
["docker", "logs", "-f", self._container_id],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
if p.stdout is None:
return
for line in iter(p.stdout.readline, ""):
if self._stop_logs.is_set():
break
if line:
sys.stdout.write(f"[DOCKER] {line}")
sys.stdout.flush()
except Exception as e:
sys.stderr.write(f"Error streaming docker logs: {e}\n")
finally:
try:
self._stop_logs.set()
except Exception:
pass
def _wait_for_health(self, timeout: float = 120.0) -> None:
"""Wait for the Docker container to become healthy."""
start = time.time()
# We can construct the health URL based on self.host if available,
# or fallback to localhost
base_url = self.host.rstrip("/")
health_url = f"{base_url}/health"
while time.time() - start < timeout:
try:
with urlopen(health_url, timeout=1.0) as resp:
if 200 <= getattr(resp, "status", 200) < 300:
return
except Exception:
pass
# Check if container is still running
if self._container_id:
ps = execute_command(
[
"docker",
"inspect",
"-f",
"{{.State.Running}}",
self._container_id,
]
)
if ps.stdout.strip() != "true":
logs = execute_command(["docker", "logs", self._container_id])
msg = (
"Container stopped unexpectedly. Logs:\n"
f"{logs.stdout}\n{logs.stderr}"
)
raise RuntimeError(msg)
time.sleep(1)
raise RuntimeError("Container failed to become healthy in time")
def __enter__(self) -> "DockerWorkspace":
"""Context manager entry - returns the workspace itself."""
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore[no-untyped-def]
"""Context manager exit - cleans up the Docker container."""
self.cleanup()
def __del__(self) -> None:
"""Clean up the Docker container when the workspace is destroyed."""
self.cleanup()
def cleanup(self) -> None:
"""Stop and remove the Docker container."""
if self._container_id:
# Stop logs streaming
self._stop_logs.set()
if self._logs_thread and self._logs_thread.is_alive():
self._logs_thread.join(timeout=2)
# Stop and remove the container
logger.info(f"Stopping container: {self._container_id}")
execute_command(["docker", "stop", self._container_id])
self._container_id = None
# Optionally delete the Docker image
if self.cleanup_image and self._image_name:
logger.info(f"Deleting Docker image: {self._image_name}")
result = execute_command(["docker", "rmi", "-f", self._image_name])
if result.returncode == 0:
logger.info(f"Successfully deleted image: {self._image_name}")
else:
logger.warning(
f"Failed to delete image {self._image_name}: {result.stderr}"
)
self._image_name = None
def pause(self) -> None:
"""Pause the Docker container to conserve resources.
Uses `docker pause` to freeze all processes in the container without
stopping it. The container can be resumed later with `resume()`.
Raises:
RuntimeError: If the container is not running or pause fails.
"""
if not self._container_id:
raise RuntimeError("Cannot pause: container is not running")
logger.info(f"Pausing container: {self._container_id}")
result = execute_command(["docker", "pause", self._container_id])
if result.returncode != 0:
raise RuntimeError(f"Failed to pause container: {result.stderr}")
logger.info(f"Container paused: {self._container_id}")
def resume(self) -> None:
"""Resume a paused Docker container.
Uses `docker unpause` to resume all processes in the container.
Raises:
RuntimeError: If the container is not running or resume fails.
"""
if not self._container_id:
raise RuntimeError("Cannot resume: container is not running")
logger.info(f"Resuming container: {self._container_id}")
result = execute_command(["docker", "unpause", self._container_id])
if result.returncode != 0:
raise RuntimeError(f"Failed to resume container: {result.stderr}")
# Wait for health after resuming (use same timeout as initial startup)
self._wait_for_health(timeout=120.0)
logger.info(f"Container resumed: {self._container_id}")