11use anyhow:: { Context , Result , bail} ;
22use lazy_static:: lazy_static;
3+ use linereader:: LineReader ;
34use nix:: sys:: statfs:: { FsType , statfs} ;
45use notify:: {
56 Error , Event , EventKind , RecommendedWatcher , RecursiveMode , Watcher ,
67 event:: { AccessKind , AccessMode } ,
78} ;
8- use regex:: Regex ;
99use std:: {
10+ fs:: File as StdFile ,
1011 os:: unix:: prelude:: AsRawFd ,
1112 path:: { Path , PathBuf } ,
1213} ;
1314use tokio:: {
1415 fs:: { File , OpenOptions } ,
15- io:: { AsyncBufReadExt , AsyncReadExt , AsyncWriteExt , BufReader , ErrorKind } ,
16+ io:: { AsyncReadExt , AsyncWriteExt , ErrorKind } ,
1617 sync:: mpsc:: { Receiver , Sender , channel} ,
1718 task,
1819} ;
@@ -43,6 +44,8 @@ pub const CGROUP2_SUPER_MAGIC: FsType = FsType(libc::CGROUP2_SUPER_MAGIC);
4344
4445static CGROUP_ROOT : & str = "/sys/fs/cgroup" ;
4546
47+ static MAX_LINEREADER_CAPACITY : usize = 256 ;
48+
4649lazy_static ! {
4750 static ref IS_CGROUP_V2 : bool = {
4851 if let Ok ( sts) = statfs( CGROUP_ROOT ) {
@@ -114,6 +117,11 @@ impl OOMWatcher {
114117 return Ok ( ( ) ) ;
115118 } ;
116119
120+ debug ! (
121+ "Using memory cgroup v1 path: {}" ,
122+ memory_cgroup_path. display( )
123+ ) ;
124+
117125 let memory_cgroup_file_oom_path = memory_cgroup_path. join ( "memory.oom_control" ) ;
118126 let event_control_path = memory_cgroup_path. join ( "cgroup.event_control" ) ;
119127 let path = memory_cgroup_file_oom_path. to_str ( ) ;
@@ -202,6 +210,11 @@ impl OOMWatcher {
202210 return Ok ( ( ) ) ;
203211 } ;
204212
213+ debug ! (
214+ "Using subsystem cgroup v2 path: {}" ,
215+ subsystem_path. display( )
216+ ) ;
217+
205218 let memory_events_file_path = subsystem_path. join ( "memory.events" ) ;
206219 let mut last_counter: u64 = 0 ;
207220
@@ -334,17 +347,21 @@ impl OOMWatcher {
334347 ) ;
335348 let mut new_counter: u64 = 0 ;
336349 let mut found_oom = false ;
337- let fp = File :: open ( memory_events_file_path) . await . context ( format ! (
350+ let fp = StdFile :: open ( memory_events_file_path) . context ( format ! (
338351 "open memory events file: {}" ,
339352 memory_events_file_path. display( )
340353 ) ) ?;
341- let reader = BufReader :: new ( fp) ;
342- let mut lines = reader. lines ( ) ;
343- while let Some ( line) = lines. next_line ( ) . await . context ( "get next line" ) ? {
354+
355+ let mut reader = LineReader :: with_capacity ( MAX_LINEREADER_CAPACITY , fp) ;
356+
357+ while let Some ( l) = reader. next_line ( ) {
358+ let line = str:: from_utf8 ( l. context ( "read line from buffer" ) ?)
359+ . context ( "convert line to utf8" ) ?;
344360 trace ! ( line) ;
361+
345362 if let Some ( counter) = line. strip_prefix ( "oom " ) . or ( line. strip_prefix ( "oom_kill " ) ) {
346363 let counter = counter
347- . to_string ( )
364+ . trim_end ( )
348365 . parse :: < u64 > ( )
349366 . context ( "parse u64 counter" ) ?;
350367 debug ! ( "New oom counter: {counter}, last counter: {last_counter}" , ) ;
@@ -385,27 +402,25 @@ impl OOMWatcher {
385402 pid : u32 ,
386403 subsystem : & str ,
387404 ) -> Result < Option < PathBuf > > {
388- lazy_static ! {
389- static ref RE : Regex = Regex :: new( ".*:(.*):/(.*)" ) . expect( "could not compile regex" ) ;
390- }
391-
392- if let Some ( fp) = Self :: try_open_cgroup_path ( pid) . await ? {
393- let reader = BufReader :: new ( fp) ;
394-
395- let mut lines = reader. lines ( ) ;
396- while let Some ( line) = lines. next_line ( ) . await . context ( "read line from buffer" ) ? {
397- if let Some ( caps) = RE . captures ( & line) {
398- let system = caps
399- . get ( 1 )
400- . context ( "no first capture group in regex match" ) ?
401- . as_str ( ) ;
402- let path = caps
403- . get ( 2 )
404- . context ( "no second capture group in regex match" ) ?
405- . as_str ( ) ;
406- if system. contains ( subsystem) || system. is_empty ( ) {
407- return Ok ( PathBuf :: from ( CGROUP_ROOT ) . join ( subsystem) . join ( path) . into ( ) ) ;
408- }
405+ if let Some ( fp) = Self :: try_open_cgroup_path ( pid) ? {
406+ let mut reader = LineReader :: with_capacity ( MAX_LINEREADER_CAPACITY , fp) ;
407+
408+ while let Some ( line) = reader. next_line ( ) {
409+ let mut iter = str:: from_utf8 ( line. context ( "read line from buffer" ) ?)
410+ . context ( "convert line to utf8" ) ?
411+ . split ( ':' )
412+ . skip ( 1 ) ;
413+
414+ let system = iter. next ( ) . context ( "no system found in cgroup" ) ?;
415+ let path = iter
416+ . next ( )
417+ . context ( "no path found in cgroup" ) ?
418+ . strip_prefix ( "/" )
419+ . context ( "strip root path prefix" ) ?
420+ . trim_end ( ) ;
421+
422+ if system. contains ( subsystem) || system. is_empty ( ) {
423+ return Ok ( PathBuf :: from ( CGROUP_ROOT ) . join ( subsystem) . join ( path) . into ( ) ) ;
409424 }
410425 }
411426
@@ -416,40 +431,34 @@ impl OOMWatcher {
416431 }
417432
418433 async fn process_cgroup_subsystem_path_cgroup_v2 ( pid : u32 ) -> Result < Option < PathBuf > > {
419- lazy_static ! {
420- static ref RE : Regex = Regex :: new( ".*:.*:/(.*)" ) . expect( "could not compile regex" ) ;
421- }
422-
423- if let Some ( fp) = Self :: try_open_cgroup_path ( pid) . await ? {
424- let mut buffer = String :: new ( ) ;
425- let mut reader = BufReader :: new ( fp) ;
426-
427- reader
428- . read_line ( & mut buffer)
429- . await
430- . context ( "read line from buffer" ) ?;
431-
432- if let Some ( caps) = RE . captures ( & buffer) {
433- return Ok ( Path :: new ( CGROUP_ROOT )
434- . join (
435- caps. get ( 1 )
436- . context ( "no first capture group in regex match" ) ?
437- . as_str ( ) ,
434+ if let Some ( fp) = Self :: try_open_cgroup_path ( pid) ? {
435+ return Ok ( Path :: new ( CGROUP_ROOT )
436+ . join (
437+ str:: from_utf8 (
438+ LineReader :: with_capacity ( MAX_LINEREADER_CAPACITY , fp)
439+ . next_line ( )
440+ . context ( "get next line" ) ?
441+ . context ( "read line from buffer" ) ?,
438442 )
439- . into ( ) ) ;
440- }
441-
442- bail ! ( "invalid cgroup" )
443+ . context ( "convert byte slice to utf8" ) ?
444+ . split ( ':' )
445+ . nth ( 2 )
446+ . context ( "no path found in cgroup" ) ?
447+ . strip_prefix ( "/" )
448+ . context ( "strip root path prefix" ) ?
449+ . trim_end ( ) ,
450+ )
451+ . into ( ) ) ;
443452 }
444453
445454 Ok ( None )
446455 }
447456
448- async fn try_open_cgroup_path ( pid : u32 ) -> Result < Option < File > > {
457+ fn try_open_cgroup_path ( pid : u32 ) -> Result < Option < StdFile > > {
449458 let cgroup_path = PathBuf :: from ( "/proc" ) . join ( pid. to_string ( ) ) . join ( "cgroup" ) ;
450459 debug ! ( "Using cgroup path: {}" , cgroup_path. display( ) ) ;
451460
452- match File :: open ( & cgroup_path) . await {
461+ match StdFile :: open ( & cgroup_path) {
453462 Ok ( file) => Ok ( file. into ( ) ) ,
454463 // Short lived processes will not be handled as an error
455464 Err ( error) if error. kind ( ) == ErrorKind :: NotFound => {
0 commit comments