Skip to content

Commit 823791a

Browse files
committed
Add --compiler-worker-max-rss
1 parent c373682 commit 823791a

File tree

6 files changed

+42
-0
lines changed

6 files changed

+42
-0
lines changed

edb/server/args.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ class ServerConfig(NamedTuple):
257257
compiler_pool_mode: CompilerPoolMode
258258
compiler_pool_addr: tuple[str, int]
259259
compiler_pool_tenant_cache_size: int
260+
compiler_worker_max_rss: Optional[int]
261+
260262
echo_runtime_info: bool
261263
emit_server_status: str
262264
temp_dir: bool
@@ -1135,6 +1137,12 @@ def resolve_envvar_value(self, ctx: click.Context):
11351137
help='Path to a TOML file to configure the server.',
11361138
hidden=True,
11371139
),
1140+
click.option(
1141+
'--compiler-worker-max-rss',
1142+
type=int,
1143+
help='Maximum allowed RSS (in KiB) per compiler worker process. Any '
1144+
'worker exceeding this limit will be terminated and recreated.',
1145+
),
11381146
])
11391147

11401148

@@ -1175,6 +1183,12 @@ def resolve_envvar_value(self, ctx: click.Context):
11751183
'--metrics-port', type=PortType(),
11761184
help=f'Port to listen on for metrics HTTP API.',
11771185
),
1186+
click.option(
1187+
'--worker-max-rss',
1188+
type=int,
1189+
help='Maximum allowed RSS (in KiB) per worker process. Any worker '
1190+
'exceeding this limit will be terminated and recreated.',
1191+
),
11781192
])
11791193

11801194

@@ -1409,6 +1423,10 @@ def parse_args(**kwargs: Any):
14091423
kwargs['compiler_pool_addr'] = (
14101424
"localhost", defines.EDGEDB_REMOTE_COMPILER_PORT
14111425
)
1426+
if kwargs['compiler_worker_max_rss'] is not None:
1427+
abort('cannot set --compiler-worker-max-rss when using '
1428+
'--compiler-pool-mode=remote')
1429+
14121430
elif kwargs['compiler_pool_addr'] is not None:
14131431
abort('--compiler-pool-addr is only meaningful '
14141432
'under --compiler-pool-mode=remote')

edb/server/compiler_pool/pool.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import time
4747

4848
import immutables
49+
import psutil
4950

5051
from edb.common import debug
5152
from edb.common import lru
@@ -208,6 +209,7 @@ async def _request(
208209
class Worker(BaseWorker):
209210

210211
_pid: int
212+
_proc: psutil.Process
211213
_manager: BaseLocalPool
212214
_server: amsg.Server
213215

@@ -221,6 +223,7 @@ def __init__(
221223
super().__init__(*args)
222224

223225
self._pid = pid
226+
self._proc = psutil.Process(pid)
224227
self._manager = manager
225228
self._server = server
226229

@@ -237,6 +240,9 @@ async def _attach(self, init_args_pickled: bytes) -> None:
237240
def get_pid(self) -> int:
238241
return self._pid
239242

243+
def get_rss(self) -> int:
244+
return self._proc.memory_info().rss // 1024
245+
240246
def close(self) -> None:
241247
if self._closed:
242248
return
@@ -835,6 +841,7 @@ class BaseLocalPool(
835841

836842
_poolsock_name: str
837843
_pool_size: int
844+
_worker_max_rss: Optional[int]
838845
_server: Optional[amsg.Server]
839846
_ready_evt: asyncio.Event
840847
_running: Optional[bool]
@@ -846,6 +853,7 @@ def __init__(
846853
*,
847854
runstate_dir: str,
848855
pool_size: int,
856+
worker_max_rss: Optional[int] = None,
849857
**kwargs: Any,
850858
) -> None:
851859
super().__init__(**kwargs)
@@ -859,6 +867,7 @@ def __init__(
859867

860868
assert pool_size >= 1
861869
self._pool_size = pool_size
870+
self._worker_max_rss = worker_max_rss
862871
self._workers = {}
863872

864873
self._server = amsg.Server(self._poolsock_name, self._loop, self)
@@ -1036,6 +1045,12 @@ def _release_worker(
10361045
) -> None:
10371046
# Skip disconnected workers
10381047
if worker.get_pid() in self._workers:
1048+
if self._worker_max_rss is not None:
1049+
if worker.get_rss() > self._worker_max_rss:
1050+
if debug.flags.server:
1051+
print(f"HIT MEMORY LIMIT, KILLING {worker.get_pid()}")
1052+
worker.close()
1053+
return
10391054
self._workers_queue.release(worker, put_in_front=put_in_front)
10401055

10411056
def get_debug_info(self) -> dict[str, Any]:

edb/server/compiler_pool/server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@ async def server_main(
648648
client_schema_cache_size: int,
649649
runstate_dir: Optional[str | pathlib.Path],
650650
metrics_port: Optional[int],
651+
worker_max_rss: Optional[int],
651652
):
652653
if listen_port is None:
653654
listen_port = defines.EDGEDB_REMOTE_COMPILER_PORT
@@ -674,6 +675,7 @@ async def server_main(
674675
pool_size=pool_size,
675676
cache_size=client_schema_cache_size,
676677
secret=secret.encode(),
678+
worker_max_rss=worker_max_rss,
677679
)
678680
await pool.start()
679681
try:

edb/server/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ async def _run_server(
230230
compiler_pool_size=args.compiler_pool_size,
231231
compiler_pool_mode=args.compiler_pool_mode,
232232
compiler_pool_addr=args.compiler_pool_addr,
233+
compiler_worker_max_rss=args.compiler_worker_max_rss,
233234
nethosts=args.bind_addresses,
234235
netport=args.port,
235236
listen_sockets=tuple(s for ss in sockets.values() for s in ss),

edb/server/multitenant.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ async def run_server(
480480
compiler_pool_tenant_cache_size=(
481481
args.compiler_pool_tenant_cache_size
482482
),
483+
compiler_worker_max_rss=args.compiler_worker_max_rss,
483484
compiler_state=compiler_state,
484485
use_monitor_fs=args.reload_config_files in [
485486
srvargs.ReloadTrigger.Default,

edb/server/server.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ def __init__(
147147
compiler_pool_addr,
148148
nethosts,
149149
netport,
150+
compiler_worker_max_rss: Optional[int] = None,
150151
listen_sockets: tuple[socket.socket, ...] = (),
151152
testmode: bool = False,
152153
daemonized: bool = False,
@@ -192,6 +193,7 @@ def __init__(
192193
self._compiler_pool_size = compiler_pool_size
193194
self._compiler_pool_mode = compiler_pool_mode
194195
self._compiler_pool_addr = compiler_pool_addr
196+
self._compiler_worker_max_rss = compiler_worker_max_rss
195197
self._system_compile_cache = lru.LRUMapping(
196198
maxsize=defines._MAX_QUERIES_CACHE
197199
)
@@ -609,6 +611,9 @@ def _get_compiler_args(self) -> dict[str, Any]:
609611
)
610612
if self._compiler_pool_mode == srvargs.CompilerPoolMode.Remote:
611613
args['address'] = self._compiler_pool_addr
614+
else:
615+
if self._compiler_worker_max_rss is not None:
616+
args['worker_max_rss'] = self._compiler_worker_max_rss
612617
return args
613618

614619
async def _destroy_compiler_pool(self):

0 commit comments

Comments
 (0)