Skip to content

Commit 15407a7

Browse files
authored
fix: include detailed memory metric in event record (#354)
* chore: add dependency * chore: update `Cargo.lock` * fix: include more details in the event record when terminated due to memory limit
1 parent 551769e commit 15407a7

File tree

10 files changed

+120
-42
lines changed

10 files changed

+120
-42
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ glob = "0.3.1"
7878
httparse = "1.8"
7979
http = "0.2"
8080
faster-hex = "0.9.0"
81+
strum = "0.25"
8182

8283
# DEBUG
8384
#[patch.crates-io]

crates/base/src/deno_runtime.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use std::borrow::Cow;
3232
use std::collections::HashMap;
3333
use std::ffi::c_void;
3434
use std::fmt;
35+
use std::sync::atomic::{AtomicUsize, Ordering};
3536
use std::sync::Arc;
3637
use std::task::Poll;
3738
use std::time::Duration;
@@ -123,6 +124,7 @@ struct MemCheckState {
123124
limit: Option<usize>,
124125
waker: Arc<AtomicWaker>,
125126
notify: Arc<Notify>,
127+
current_bytes: Arc<AtomicUsize>,
126128

127129
#[cfg(debug_assertions)]
128130
exceeded: Arc<AtomicFlag>,
@@ -156,6 +158,8 @@ impl MemCheckState {
156158
.saturating_add(used_heap_bytes)
157159
.saturating_add(external_bytes);
158160

161+
self.current_bytes.store(total_bytes, Ordering::Release);
162+
159163
if total_bytes >= limit {
160164
self.notify.notify_waiters();
161165

@@ -816,19 +820,24 @@ impl DenoRuntime {
816820
self.maybe_inspector.clone()
817821
}
818822

823+
pub fn mem_check_captured_bytes(&self) -> Arc<AtomicUsize> {
824+
self.mem_check_state.current_bytes.clone()
825+
}
826+
819827
pub fn add_memory_limit_callback<C>(&self, mut cb: C)
820828
where
821829
// XXX(Nyannyacha): Should we relax bounds a bit more?
822-
C: FnMut() -> bool + Send + 'static,
830+
C: FnMut(usize) -> bool + Send + 'static,
823831
{
824832
let notify = self.mem_check_state.notify.clone();
825833
let drop_token = self.mem_check_state.drop_token.clone();
834+
let current_bytes = self.mem_check_state.current_bytes.clone();
826835

827836
drop(rt::SUPERVISOR_RT.spawn(async move {
828837
loop {
829838
tokio::select! {
830839
_ = notify.notified() => {
831-
if cb() {
840+
if cb(current_bytes.load(Ordering::Acquire)) {
832841
break;
833842
}
834843
}
@@ -1586,7 +1595,7 @@ mod test {
15861595
let waker = user_rt.js_runtime.op_state().borrow().waker.clone();
15871596
let handle = user_rt.js_runtime.v8_isolate().thread_safe_handle();
15881597

1589-
user_rt.add_memory_limit_callback(move || {
1598+
user_rt.add_memory_limit_callback(move |_| {
15901599
handle.terminate_execution();
15911600
waker.wake();
15921601
callback_tx.send(()).unwrap();

crates/base/src/rt_worker/supervisor/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::sync::Arc;
66
use cpu_timer::{CPUAlarmVal, CPUTimer};
77
use deno_core::v8::IsolateHandle;
88
use enum_as_inner::EnumAsInner;
9+
use event_worker::events::MemoryLimitDetail;
910
use futures_util::task::AtomicWaker;
1011
use log::error;
1112
use sb_workers::context::{Timing, UserWorkerMsgs, UserWorkerRuntimeOpts};
@@ -126,7 +127,7 @@ pub struct Arguments {
126127
pub cpu_timer_param: CPUTimerParam,
127128
pub supervisor_policy: SupervisorPolicy,
128129
pub timing: Option<Timing>,
129-
pub memory_limit_rx: mpsc::UnboundedReceiver<()>,
130+
pub memory_limit_rx: mpsc::UnboundedReceiver<MemoryLimitDetail>,
130131
pub pool_msg_tx: Option<mpsc::UnboundedSender<UserWorkerMsgs>>,
131132
pub isolate_memory_usage_tx: oneshot::Sender<IsolateMemoryStats>,
132133
pub thread_safe_handle: IsolateHandle,

crates/base/src/rt_worker/supervisor/strategy_per_request.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)
116116

117117
if !cpu_timer_param.is_disabled() {
118118
if cpu_usage_ms >= hard_limit_ms as i64 {
119-
error!("CPU time limit reached. isolate: {:?}", key);
119+
error!("CPU time limit reached: isolate: {:?}", key);
120120
complete_reason = Some(ShutdownReason::CPUTime);
121121
}
122122

@@ -130,7 +130,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)
130130

131131
Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => {
132132
if is_worker_entered && req_start_ack {
133-
error!("CPU time limit reached. isolate: {:?}", key);
133+
error!("CPU time limit reached: isolate: {:?}", key);
134134
complete_reason = Some(ShutdownReason::CPUTime);
135135
}
136136
}
@@ -171,14 +171,14 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)
171171

172172
continue;
173173
} else {
174-
error!("wall clock duraiton reached. isolate: {:?}", key);
174+
error!("wall clock duraiton reached: isolate: {:?}", key);
175175
complete_reason = Some(ShutdownReason::WallClockTime);
176176
}
177177
}
178178

179-
Some(_) = memory_limit_rx.recv() => {
180-
error!("memory limit reached for the worker. isolate: {:?}", key);
181-
complete_reason = Some(ShutdownReason::Memory);
179+
Some(detail) = memory_limit_rx.recv() => {
180+
error!("memory limit reached for the worker: isolate: {:?}", key);
181+
complete_reason = Some(ShutdownReason::Memory(detail));
182182
}
183183
}
184184

crates/base/src/rt_worker/supervisor/strategy_per_worker.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,15 +134,15 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
134134
if !cpu_timer_param.is_disabled() {
135135
if cpu_usage_ms >= hard_limit_ms as i64 {
136136
terminate_fn();
137-
error!("CPU time hard limit reached. isolate: {:?}", key);
137+
error!("CPU time hard limit reached: isolate: {:?}", key);
138138
return (ShutdownReason::CPUTime, cpu_usage_ms);
139139
} else if cpu_usage_ms >= soft_limit_ms as i64 && !cpu_time_soft_limit_reached {
140-
error!("CPU time soft limit reached. isolate: {:?}", key);
140+
error!("CPU time soft limit reached: isolate: {:?}", key);
141141
cpu_time_soft_limit_reached = true;
142142

143143
if req_ack_count == demand.load(Ordering::Acquire) {
144144
terminate_fn();
145-
error!("early termination due to the last request being completed. isolate: {:?}", key);
145+
error!("early termination due to the last request being completed: isolate: {:?}", key);
146146
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
147147
}
148148
}
@@ -154,17 +154,17 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
154154
Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => {
155155
if is_worker_entered {
156156
if !cpu_time_soft_limit_reached {
157-
error!("CPU time soft limit reached. isolate: {:?}", key);
157+
error!("CPU time soft limit reached: isolate: {:?}", key);
158158
cpu_time_soft_limit_reached = true;
159159

160160
if req_ack_count == demand.load(Ordering::Acquire) {
161161
terminate_fn();
162-
error!("early termination due to the last request being completed. isolate: {:?}", key);
162+
error!("early termination due to the last request being completed: isolate: {:?}", key);
163163
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
164164
}
165165
} else {
166166
terminate_fn();
167-
error!("CPU time hard limit reached. isolate: {:?}", key);
167+
error!("CPU time hard limit reached: isolate: {:?}", key);
168168
return (ShutdownReason::CPUTime, cpu_usage_ms);
169169
}
170170
}
@@ -186,7 +186,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
186186
}
187187

188188
terminate_fn();
189-
error!("early termination due to the last request being completed. isolate: {:?}", key);
189+
error!("early termination due to the last request being completed: isolate: {:?}", key);
190190
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
191191
}
192192

@@ -195,24 +195,24 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
195195
// first tick completes immediately
196196
wall_clock_alerts += 1;
197197
} else if wall_clock_alerts == 1 {
198-
error!("wall clock duration warning. isolate: {:?}", key);
198+
error!("wall clock duration warning: isolate: {:?}", key);
199199
wall_clock_alerts += 1;
200200
} else {
201201
let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire);
202202

203203
terminate_fn();
204204

205-
error!("wall clock duration reached. isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);
205+
error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);
206206

207207
return (ShutdownReason::WallClockTime, cpu_usage_ms);
208208
}
209209
}
210210

211211
// memory usage
212-
Some(_) = memory_limit_rx.recv() => {
212+
Some(detail) = memory_limit_rx.recv() => {
213213
terminate_fn();
214-
error!("memory limit reached for the worker. isolate: {:?}", key);
215-
return (ShutdownReason::Memory, cpu_usage_ms);
214+
error!("memory limit reached for the worker: isolate: {:?}", key);
215+
return (ShutdownReason::Memory(detail), cpu_usage_ms);
216216
}
217217
}
218218
}

crates/base/src/rt_worker/worker.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ impl Worker {
224224
total: 0,
225225
heap: 0,
226226
external: 0,
227+
mem_check_captured: 0,
227228
},
228229
},
229230
));

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::deno_runtime::DenoRuntime;
22
use crate::inspector_server::Inspector;
33
use crate::timeout::{self, CancelOnWriteTimeout, ReadTimeoutStream};
44
use crate::utils::send_event_if_event_worker_available;
5-
use crate::utils::units::bytes_to_display;
65

76
use crate::rt_worker::worker::{Worker, WorkerHandler};
87
use crate::rt_worker::worker_pool::WorkerPool;
@@ -11,7 +10,8 @@ use cpu_timer::CPUTimer;
1110
use deno_config::JsxImportSourceConfig;
1211
use deno_core::{InspectorSessionProxy, LocalInspectorSession};
1312
use event_worker::events::{
14-
BootEvent, ShutdownEvent, WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed,
13+
BootEvent, MemoryLimitDetail, MemoryLimitDetailMemCheck, MemoryLimitDetailV8, ShutdownEvent,
14+
WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed,
1515
};
1616
use futures_util::pin_mut;
1717
use http::StatusCode;
@@ -31,6 +31,7 @@ use sb_workers::errors::WorkerError;
3131
use std::future::pending;
3232
use std::io::ErrorKind;
3333
use std::path::PathBuf;
34+
use std::sync::atomic::Ordering;
3435
use std::sync::Arc;
3536
use std::time::Duration;
3637
use tokio::io::{self, copy_bidirectional};
@@ -260,7 +261,7 @@ pub fn create_supervisor(
260261
timing: Option<Timing>,
261262
termination_token: Option<TerminationToken>,
262263
) -> Result<(Option<CPUTimer>, CancellationToken), Error> {
263-
let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<()>();
264+
let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<MemoryLimitDetail>();
264265
let (waker, thread_safe_handle) = {
265266
let js_runtime = &mut worker_runtime.js_runtime;
266267
(
@@ -271,6 +272,7 @@ pub fn create_supervisor(
271272

272273
// we assert supervisor is only run for user workers
273274
let conf = worker_runtime.conf.as_user_worker().unwrap().clone();
275+
let mem_check_captured_bytes = worker_runtime.mem_check_captured_bytes();
274276
let is_termination_requested = worker_runtime.is_termination_requested.clone();
275277

276278
let giveup_process_requests_token = cancel.clone();
@@ -292,32 +294,43 @@ pub fn create_supervisor(
292294
)
293295
});
294296

295-
worker_runtime.add_memory_limit_callback({
296-
let memory_limit_tx = memory_limit_tx.clone();
297-
move || {
298-
debug!("Hard memory limit triggered");
297+
let send_memory_limit_fn = move |detail: MemoryLimitDetail| {
298+
debug!(
299+
"memory limit triggered: isolate: {:?}, detail: {}",
300+
key, detail
301+
);
299302

300-
if memory_limit_tx.send(()).is_err() {
301-
error!("failed to send memory limit reached notification - isolate may already be terminating");
302-
}
303+
if memory_limit_tx.send(detail).is_err() {
304+
error!(
305+
"failed to send memory limit reached notification - isolate may already be terminating: kind: {}",
306+
<&'static str>::from(&detail)
307+
);
308+
}
309+
};
310+
311+
worker_runtime.add_memory_limit_callback({
312+
let send_fn = send_memory_limit_fn.clone();
313+
move |captured| {
314+
send_fn(MemoryLimitDetail::MemCheck(MemoryLimitDetailMemCheck {
315+
captured,
316+
}));
303317

304318
true
305319
}
306320
});
307321

308322
worker_runtime.js_runtime.add_near_heap_limit_callback({
309-
let memory_limit_tx = memory_limit_tx.clone();
310-
move |cur, _| {
311-
debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),);
312-
313-
if memory_limit_tx.send(()).is_err() {
314-
error!("failed to send memory limit reached notification - isolate may already be terminating");
315-
}
323+
let send_fn = send_memory_limit_fn;
324+
move |current, initial| {
325+
send_fn(MemoryLimitDetail::V8(MemoryLimitDetailV8 {
326+
current,
327+
initial,
328+
}));
316329

317330
// give an allowance on current limit (until the isolate is
318331
// terminated) we do this so that oom won't end up killing the
319332
// edge-runtime process
320-
cur * (conf.low_memory_multiplier as usize)
333+
current * (conf.low_memory_multiplier as usize)
321334
}
322335
});
323336

@@ -471,7 +484,9 @@ pub fn create_supervisor(
471484
total: v.used_heap_size + v.external_memory,
472485
heap: v.used_heap_size,
473486
external: v.external_memory,
487+
mem_check_captured: mem_check_captured_bytes.load(Ordering::Acquire),
474488
},
489+
475490
Err(_) => {
476491
if !supervise_cancel_token_inner.is_cancelled() {
477492
error!("isolate memory usage sender dropped");
@@ -481,6 +496,7 @@ pub fn create_supervisor(
481496
total: 0,
482497
heap: 0,
483498
external: 0,
499+
mem_check_captured: 0,
484500
}
485501
}
486502
};

crates/event_worker/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ uuid.workspace = true
1616
serde.workspace = true
1717
anyhow.workspace = true
1818
tokio.workspace = true
19-
log.workspace = true
19+
log.workspace = true
20+
strum = { workspace = true, features = ["derive"] }

0 commit comments

Comments
 (0)