-
Notifications
You must be signed in to change notification settings - Fork 19
Expand file tree
/
Copy pathexecutable.py
More file actions
374 lines (311 loc) · 12.6 KB
/
executable.py
File metadata and controls
374 lines (311 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
"""
This module contains abstract class for executables (programs and instances) running inside Firecracker MicroVMs.
"""
import asyncio
import logging
from dataclasses import dataclass, field
from multiprocessing import Process, set_start_method
from os.path import exists, isfile
from pathlib import Path
from typing import Generic, TypeVar
from aiohttp import ClientResponseError
from aleph_message.models import ExecutableContent, ItemHash
from aleph_message.models.execution.environment import MachineResources
from aleph_message.models.execution.volume import MachineVolume, PersistentVolume
from aleph.vm.conf import settings
from aleph.vm.controllers.configuration import (
Configuration,
VMConfiguration,
save_controller_configuration,
)
from aleph.vm.controllers.firecracker.snapshots import CompressedDiskVolumeSnapshot
from aleph.vm.controllers.interface import AlephVmControllerInterface
from aleph.vm.guest_api.__main__ import run_guest_api
from aleph.vm.hypervisors.firecracker.microvm import FirecrackerConfig, MicroVM
from aleph.vm.network.firewall import teardown_nftables_for_vm
from aleph.vm.network.interfaces import TapInterface
from aleph.vm.storage import chown_to_jailman, get_volume_path
try:
import psutil # type: ignore [no-redef]
except ImportError:
psutil = None # type: ignore [assignment]
logger = logging.getLogger(__name__)
try:
set_start_method("spawn")
except RuntimeError as error:
if error.args == ("context has already been set",):
logger.info("Start method has already been set")
pass
else:
raise error
class ResourceDownloadError(ClientResponseError):
"""An error occurred while downloading a VM resource file"""
def __init__(self, error: ClientResponseError):
super().__init__(
request_info=error.request_info,
history=error.history,
status=error.status,
message=error.message,
headers=error.headers,
)
@dataclass
class Volume:
mount: str
device: str
read_only: bool
@dataclass
class HostVolume:
mount: str
path_on_host: Path
read_only: bool
size_mib: int | None
@dataclass
class BaseConfiguration:
vm_hash: ItemHash
ip: str | None = None
route: str | None = None
dns_servers: list[str] = field(default_factory=list)
volumes: list[Volume] = field(default_factory=list)
variables: dict[str, str] | None = None
@dataclass
class ConfigurationResponse:
success: bool
error: str | None = None
traceback: str | None = None
class AlephFirecrackerResources:
"""Resources required to start a Firecracker VM"""
message_content: ExecutableContent
kernel_image_path: Path
rootfs_path: Path
volumes: list[HostVolume]
namespace: str
def get_disk_usage_delta(self) -> int:
"""Difference between the size requested and what is currently used on disk.
Count rootfs and volumes.
Used to calculate an estimate of space resource available for use.
Value in bytes and is negative"""
total_delta = 0
# Root fs
if hasattr(self.message_content, "rootfs"):
volume = self.message_content.rootfs
used_size = self.rootfs_path.stat().st_size if self.rootfs_path.exists() else 0
requested_size = int(volume.size_mib * 1024 * 1024)
size_delta = used_size - requested_size
total_delta += size_delta
# Count each extra volume
for volume in self.volumes:
if not volume.size_mib:
# planned size not set on immutable volume
size_delta = 0
else:
used_size = volume.path_on_host.stat().st_size if volume.path_on_host.exists() else 0
requested_size = int(volume.size_mib * 1024 * 1024)
size_delta = used_size - requested_size
total_delta += size_delta
return total_delta
def __init__(self, message_content: ExecutableContent, namespace: str):
self.message_content = message_content
self.namespace = namespace
def to_dict(self):
return self.__dict__
async def download_kernel(self):
# Assumes kernel is already present on the host
self.kernel_image_path = Path(settings.LINUX_PATH)
assert isfile(self.kernel_image_path)
async def download_volumes(self):
volumes = []
# TODO: Download in parallel and prevent duplicated volume names
volume: MachineVolume
for i, volume in enumerate(self.message_content.volumes):
# only persistent volume has name and mount
if isinstance(volume, PersistentVolume):
if not volume.name:
volume.name = f"unamed_volume_{i}"
if not volume.mount:
volume.mount = f"/mnt/{volume.name}"
volumes.append(
HostVolume(
mount=volume.mount,
path_on_host=(await get_volume_path(volume=volume, namespace=self.namespace)),
read_only=volume.is_read_only(),
size_mib=getattr(volume, "size_mib", None),
)
)
self.volumes = volumes
async def download_all(self):
await asyncio.gather(
self.download_kernel(),
self.download_volumes(),
)
class VmSetupError(Exception):
pass
class VmInitNotConnectedError(Exception):
pass
ConfigurationType = TypeVar("ConfigurationType")
class AlephFirecrackerExecutable(Generic[ConfigurationType], AlephVmControllerInterface):
vm_id: int
vm_hash: ItemHash
resources: AlephFirecrackerResources
enable_console: bool
enable_networking: bool
hardware_resources: MachineResources
tap_interface: TapInterface | None = None
fvm: MicroVM
vm_configuration: ConfigurationType | None
guest_api_process: Process | None = None
is_instance: bool
persistent: bool
_firecracker_config: FirecrackerConfig | None = None
controller_configuration: Configuration | None = None
support_snapshot: bool
@property
def resources_path(self) -> Path:
return Path(self.fvm.namespace_path)
def __init__(
self,
vm_id: int,
vm_hash: ItemHash,
resources: AlephFirecrackerResources,
enable_networking: bool = False,
enable_console: bool | None = None,
hardware_resources: MachineResources | None = None,
tap_interface: TapInterface | None = None,
persistent: bool = False,
prepare_jailer: bool = True,
):
self.vm_id = vm_id
self.vm_hash = vm_hash
self.resources = resources
if enable_console is None:
enable_console = settings.PRINT_SYSTEM_LOGS
self.enable_console = enable_console
self.enable_networking = enable_networking and settings.ALLOW_VM_NETWORKING
self.hardware_resources = hardware_resources or MachineResources()
self.tap_interface = tap_interface
self.persistent = persistent
self.fvm = MicroVM(
vm_id=self.vm_id,
vm_hash=vm_hash,
firecracker_bin_path=settings.FIRECRACKER_PATH,
jailer_base_directory=settings.JAILER_BASE_DIR,
use_jailer=settings.USE_JAILER,
jailer_bin_path=settings.JAILER_PATH,
init_timeout=settings.INIT_TIMEOUT,
enable_log=enable_console,
)
if prepare_jailer:
self.fvm.prepare_jailer()
# These properties are set later in the setup and configuration.
self.vm_configuration = None
self.guest_api_process = None
self._firecracker_config = None
def to_dict(self):
"""Dict representation of the virtual machine. Used to record resource usage and for JSON serialization."""
if self.fvm.proc and psutil:
# The firecracker process is still running and process information can be obtained from `psutil`.
try:
p = psutil.Process(self.fvm.proc.pid)
pid_info = {
"status": p.status(),
"create_time": p.create_time(),
"cpu_times": p.cpu_times(),
"cpu_percent": p.cpu_percent(),
"memory_info": p.memory_info(),
"io_counters": p.io_counters(),
"open_files": p.open_files(),
"connections": p.connections(),
"num_threads": p.num_threads(),
"num_ctx_switches": p.num_ctx_switches(),
}
except psutil.NoSuchProcess:
logger.warning("Cannot read process metrics (process not found)")
pid_info = None
else:
pid_info = None
return {
"process": pid_info,
**self.__dict__,
}
async def setup(self):
# self._firecracker_config = FirecrackerConfig(...)
raise NotImplementedError()
async def start(self):
logger.debug(f"Starting VM={self.vm_id}")
if not self.fvm:
msg = "No VM found. Call setup() before start()"
raise ValueError(msg)
if self.is_instance or self.persistent:
msg = "VM should be started using SystemD Manager class"
raise ValueError(msg)
try:
firecracker_config_path = await self.fvm.save_configuration_file(self._firecracker_config)
await self.fvm.start(firecracker_config_path)
logger.debug("setup done")
except Exception:
# Stop the VM and clear network interfaces in case any error prevented the start of the virtual machine.
logger.error("VM startup failed, cleaning up network")
await self.fvm.teardown()
teardown_nftables_for_vm(self.vm_id)
if self.tap_interface:
await self.tap_interface.delete()
raise
await self.wait_for_init()
logger.debug(f"started fvm {self.vm_id}")
await self.load_configuration()
async def wait_for_init(self) -> None:
"""Wait for the init process of the virtual machine to be ready.
May be empty."""
return
async def configure(self, incoming_migration_port: int | None = None):
"""Configure the VM by saving controller service configuration.
Note: incoming_migration_port is ignored for Firecracker VMs as they don't support live migration.
"""
if self.persistent:
firecracker_config_path = await self.fvm.save_configuration_file(self._firecracker_config)
vm_configuration = VMConfiguration(
firecracker_bin_path=self.fvm.firecracker_bin_path,
use_jailer=self.fvm.use_jailer,
jailer_bin_path=self.fvm.jailer_bin_path,
init_timeout=self.fvm.init_timeout,
config_file_path=firecracker_config_path,
)
configuration = Configuration(
vm_id=self.vm_id,
vm_hash=self.vm_hash,
settings=settings,
vm_configuration=vm_configuration,
)
save_controller_configuration(self.vm_hash, configuration)
async def load_configuration(self):
"""Load configuration settings for programs."""
return
async def start_guest_api(self):
vsock_path = Path(f"{self.fvm.vsock_path}_53")
# Ensure that the directory where the VSOCK socket will be created exists
vsock_path.parent.mkdir(parents=True, exist_ok=True)
logger.debug(f"starting guest API for {self.vm_id} on {vsock_path}")
vm_hash = self.vm_hash
self.guest_api_process = Process(
target=run_guest_api,
args=(vsock_path, vm_hash, settings.SENTRY_DSN, settings.DOMAIN_NAME),
)
self.guest_api_process.start()
while not exists(vsock_path):
await asyncio.sleep(0.01)
await chown_to_jailman(Path(vsock_path))
logger.debug(f"started guest API for {self.vm_id}")
async def stop_guest_api(self):
if self.guest_api_process and self.guest_api_process.is_alive():
self.guest_api_process.terminate()
await asyncio.sleep(5)
if self.guest_api_process.is_alive():
self.guest_api_process.kill()
async def teardown(self):
if self.fvm:
await self.fvm.teardown()
teardown_nftables_for_vm(self.vm_id)
if self.tap_interface:
await self.tap_interface.delete()
await self.stop_guest_api()
async def create_snapshot(self) -> CompressedDiskVolumeSnapshot:
raise NotImplementedError()