1- #[ macro_use]
2- extern crate lazy_static;
3- #[ macro_use]
4- extern crate enum_display_derive;
5- #[ macro_use]
6- extern crate num_derive;
7-
81mod common;
92
103pub mod entry;
114pub mod error;
12-
135/// KLog Implementation (makes klogctl aka syslog system call through libc)
146pub mod klogctl;
15-
167/// KMsg Implementation (reads from the /dev/kmsg file)
178pub mod kmsgfile;
189
@@ -23,6 +14,10 @@ use std::iter::Iterator;
2314use core:: pin:: Pin ;
2415#[ cfg( feature = "async" ) ]
2516use futures:: stream:: Stream ;
17+ #[ cfg( feature = "async" ) ]
18+ use futures:: task:: { Context , Poll } ;
19+ #[ cfg( feature = "async" ) ]
20+ use pin_project:: pin_project;
2621
2722#[ derive( Clone , Copy , Debug ) ]
2823pub enum Backend {
@@ -32,10 +27,37 @@ pub enum Backend {
3227}
3328
3429#[ cfg( feature = "sync" ) ]
35- pub type EntriesIterator = Box < dyn Iterator < Item = Result < entry:: Entry , error:: RMesgError > > > ;
30+ pub enum EntriesIterator {
31+ KLogCtl ( klogctl:: KLogEntries ) ,
32+ DevKMsg ( kmsgfile:: KMsgEntriesIter ) ,
33+ }
34+ #[ cfg( feature = "sync" ) ]
35+ impl Iterator for EntriesIterator {
36+ type Item = Result < entry:: Entry , error:: RMesgError > ;
37+ fn next ( & mut self ) -> Option < Self :: Item > {
38+ match self {
39+ Self :: KLogCtl ( k) => k. next ( ) ,
40+ Self :: DevKMsg ( d) => d. next ( ) ,
41+ }
42+ }
43+ }
3644
45+ #[ pin_project( project = EntriesStreamPinnedProjection ) ]
46+ #[ cfg( feature = "async" ) ]
47+ pub enum EntriesStream {
48+ KLogCtl ( #[ pin] klogctl:: KLogEntries ) ,
49+ DevKMsg ( #[ pin] kmsgfile:: KMsgEntriesStream ) ,
50+ }
3751#[ cfg( feature = "async" ) ]
38- pub type EntriesStream = Pin < Box < dyn Stream < Item = Result < entry:: Entry , error:: RMesgError > > > > ;
52+ impl Stream for EntriesStream {
53+ type Item = Result < entry:: Entry , error:: RMesgError > ;
54+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
55+ match self . project ( ) {
56+ EntriesStreamPinnedProjection :: KLogCtl ( k) => k. poll_next ( cx) ,
57+ EntriesStreamPinnedProjection :: DevKMsg ( d) => d. poll_next ( cx) ,
58+ }
59+ }
60+ }
3961
4062pub fn log_entries ( b : Backend , clear : bool ) -> Result < Vec < entry:: Entry > , error:: RMesgError > {
4163 match b {
@@ -77,20 +99,24 @@ pub fn logs_raw(b: Backend, clear: bool) -> Result<String, error::RMesgError> {
7799pub fn logs_iter ( b : Backend , clear : bool , raw : bool ) -> Result < EntriesIterator , error:: RMesgError > {
78100 match b {
79101 Backend :: Default => match kmsgfile:: KMsgEntriesIter :: with_options ( None , raw) {
80- Ok ( e) => Ok ( Box :: new ( e) ) ,
102+ Ok ( e) => Ok ( EntriesIterator :: DevKMsg ( e) ) ,
81103 Err ( error:: RMesgError :: DevKMsgFileOpenError ( s) ) => {
82104 eprintln ! (
83105 "Falling back from device file to klogctl syscall due to error: {}" ,
84106 s
85107 ) ;
86- Ok ( Box :: new ( klog_entries_only_if_timestamp_enabled ( clear) ?) )
108+ Ok ( EntriesIterator :: KLogCtl (
109+ klog_entries_only_if_timestamp_enabled ( clear) ?,
110+ ) )
87111 }
88112 Err ( e) => Err ( e) ,
89113 } ,
90- Backend :: KLogCtl => Ok ( Box :: new ( klog_entries_only_if_timestamp_enabled ( clear) ?) ) ,
91- Backend :: DevKMsg => Ok ( Box :: new ( kmsgfile:: KMsgEntriesIter :: with_options (
92- None , raw,
93- ) ?) ) ,
114+ Backend :: KLogCtl => Ok ( EntriesIterator :: KLogCtl (
115+ klog_entries_only_if_timestamp_enabled ( clear) ?,
116+ ) ) ,
117+ Backend :: DevKMsg => Ok ( EntriesIterator :: DevKMsg (
118+ kmsgfile:: KMsgEntriesIter :: with_options ( None , raw) ?,
119+ ) ) ,
94120 }
95121}
96122
@@ -102,18 +128,22 @@ pub async fn logs_stream(
102128) -> Result < EntriesStream , error:: RMesgError > {
103129 match b {
104130 Backend :: Default => match kmsgfile:: KMsgEntriesStream :: with_options ( None , raw) . await {
105- Ok ( e) => Ok ( Box :: pin ( e) ) ,
131+ Ok ( e) => Ok ( EntriesStream :: DevKMsg ( e) ) ,
106132 Err ( error:: RMesgError :: DevKMsgFileOpenError ( s) ) => {
107133 eprintln ! (
108134 "Falling back from device file to klogctl syscall due to error: {}" ,
109135 s
110136 ) ;
111- Ok ( Box :: pin ( klog_entries_only_if_timestamp_enabled ( clear) ?) )
137+ Ok ( EntriesStream :: KLogCtl (
138+ klog_entries_only_if_timestamp_enabled ( clear) ?,
139+ ) )
112140 }
113141 Err ( e) => Err ( e) ,
114142 } ,
115- Backend :: KLogCtl => Ok ( Box :: pin ( klog_entries_only_if_timestamp_enabled ( clear) ?) ) ,
116- Backend :: DevKMsg => Ok ( Box :: pin (
143+ Backend :: KLogCtl => Ok ( EntriesStream :: KLogCtl (
144+ klog_entries_only_if_timestamp_enabled ( clear) ?,
145+ ) ) ,
146+ Backend :: DevKMsg => Ok ( EntriesStream :: DevKMsg (
117147 kmsgfile:: KMsgEntriesStream :: with_options ( None , raw) . await ?,
118148 ) ) ,
119149 }
@@ -135,3 +165,66 @@ fn klog_entries_only_if_timestamp_enabled(
135165
136166 klogctl:: KLogEntries :: with_options ( clear, klogctl:: SUGGESTED_POLL_INTERVAL )
137167}
168+
169+ /**********************************************************************************/
170+ // Tests! Tests! Tests!
171+
172+ #[ cfg( all( test, target_os = "linux" ) ) ]
173+ mod test {
174+ use super :: * ;
175+ #[ cfg( feature = "async" ) ]
176+ use tokio_stream:: StreamExt ;
177+
178+ #[ test]
179+ fn test_log_entries ( ) {
180+ let entries = log_entries ( Backend :: Default , false ) ;
181+ assert ! ( entries. is_ok( ) , "Response from kmsg not Ok" ) ;
182+ assert ! ( !entries. unwrap( ) . is_empty( ) , "Should have non-empty logs" ) ;
183+ }
184+
185+ #[ cfg( feature = "sync" ) ]
186+ #[ test]
187+ fn test_iterator ( ) {
188+ // uncomment below if you want to be extra-sure
189+ //let enable_timestamp_result = kernel_log_timestamps_enable(true);
190+ //assert!(enable_timestamp_result.is_ok());
191+
192+ // Don't clear the buffer. Poll every second.
193+ let iterator_result = logs_iter ( Backend :: Default , false , false ) ;
194+ assert ! ( iterator_result. is_ok( ) ) ;
195+
196+ let iterator = iterator_result. unwrap ( ) ;
197+
198+ // Read 10 lines and quit
199+ for ( count, entry) in iterator. enumerate ( ) {
200+ assert ! ( entry. is_ok( ) ) ;
201+ if count > 10 {
202+ break ;
203+ }
204+ }
205+ }
206+
207+ #[ cfg( feature = "async" ) ]
208+ #[ tokio:: test]
209+ async fn test_stream ( ) {
210+ // uncomment below if you want to be extra-sure
211+ //let enable_timestamp_result = kernel_log_timestamps_enable(true);
212+ //assert!(enable_timestamp_result.is_ok());
213+
214+ // Don't clear the buffer. Poll every second.
215+ let stream_result = logs_stream ( Backend :: Default , false , false ) . await ;
216+ assert ! ( stream_result. is_ok( ) ) ;
217+
218+ let mut stream = stream_result. unwrap ( ) ;
219+
220+ // Read 10 lines and quit
221+ let mut count: u32 = 0 ;
222+ while let Some ( entry) = stream. next ( ) . await {
223+ assert ! ( entry. is_ok( ) ) ;
224+ count += 1 ;
225+ if count > 10 {
226+ break ;
227+ }
228+ }
229+ }
230+ }
0 commit comments