@@ -9,9 +9,14 @@ use azure_iot_sdk::client::{IotHubClient, IotMessage};
99use inotify:: WatchMask ;
1010use lazy_static:: lazy_static;
1111use log:: { info, warn} ;
12- use serde:: { Deserialize , Serialize } ;
12+ use serde:: { Deserialize , Serialize , Serializer , ser :: Error } ;
1313use serde_json:: json;
14- use std:: { env, path:: Path , time:: Duration } ;
14+ use std:: {
15+ env,
16+ path:: Path ,
17+ sync:: { LazyLock , RwLock } ,
18+ time:: Duration ,
19+ } ;
1520use sysinfo;
1621use time:: format_description:: well_known:: Rfc3339 ;
1722use tokio:: sync:: mpsc;
@@ -29,45 +34,103 @@ lazy_static! {
2934 } ;
3035}
3136
32- #[ derive( Serialize ) ]
37+ #[ derive( Default , Serialize ) ]
3338struct Label {
34- device_id : String ,
35- module_name : String ,
39+ #[ serde( serialize_with = "device_id" ) ]
40+ device_id : ( ) ,
41+ #[ serde( serialize_with = "module_name" ) ]
42+ module_name : ( ) ,
3643 #[ serde( skip_serializing_if = "Option::is_none" ) ]
3744 sensor : Option < String > ,
3845}
3946
47+ fn device_id < S > ( _: & ( ) , s : S ) -> Result < S :: Ok , S :: Error >
48+ where
49+ S : Serializer ,
50+ {
51+ let Some ( hostname) = sysinfo:: System :: host_name ( ) else {
52+ return Err ( Error :: custom ( "failed to get hostname" ) ) ;
53+ } ;
54+
55+ s. serialize_str ( & hostname)
56+ }
57+
58+ fn module_name < S > ( _: & ( ) , s : S ) -> Result < S :: Ok , S :: Error >
59+ where
60+ S : Serializer ,
61+ {
62+ s. serialize_str ( "omnect-device-service" )
63+ }
64+
4065#[ derive( Serialize ) ]
41- struct Metric {
42- time_generated_utc : String ,
43- name : String ,
44- value : f64 ,
45- labels : Label ,
66+ enum MetricValue {
67+ CpuUsage ( f64 ) ,
68+ MemoryUsed ( f64 ) ,
69+ MemoryTotal ( f64 ) ,
70+ DiskUsed ( f64 ) ,
71+ DiskTotal ( f64 ) ,
72+ Temp ( f64 , String ) ,
4673}
4774
48- impl Metric {
49- // ToDo new not needed
50- fn new (
51- time : String ,
52- name : String ,
53- value : f64 ,
54- device : String ,
55- sensor : Option < String > ,
56- ) -> Metric {
75+ impl MetricValue {
76+ fn to_metric ( self ) -> Metric {
77+ let ( name, value, labels) = match self {
78+ MetricValue :: CpuUsage ( value) => ( "cpu_usage" . to_owned ( ) , value, Label :: default ( ) ) ,
79+ MetricValue :: MemoryUsed ( value) => ( "memory_used" . to_owned ( ) , value, Label :: default ( ) ) ,
80+ MetricValue :: MemoryTotal ( value) => ( "memory_total" . to_owned ( ) , value, Label :: default ( ) ) ,
81+ MetricValue :: DiskUsed ( value) => ( "disk_used" . to_owned ( ) , value, Label :: default ( ) ) ,
82+ MetricValue :: DiskTotal ( value) => ( "disk_total" . to_owned ( ) , value, Label :: default ( ) ) ,
83+ MetricValue :: Temp ( value, sensor) => (
84+ "temp" . to_owned ( ) ,
85+ value,
86+ Label {
87+ sensor : Some ( sensor) ,
88+ ..Default :: default ( )
89+ } ,
90+ ) ,
91+ } ;
92+
5793 Metric {
58- time_generated_utc : time,
59- name,
60- value,
61- labels : Label {
62- device_id : device,
63- module_name : "omnect-device-service" . to_string ( ) ,
64- sensor,
94+ metric : InnerMetric {
95+ name,
96+ value,
97+ labels,
6598 } ,
99+ ..Default :: default ( )
66100 }
67101 }
68102}
69103
70- // ToDo rm lazy_static
104+ #[ derive( Default , Serialize ) ]
105+ struct InnerMetric {
106+ name : String ,
107+ value : f64 ,
108+ labels : Label ,
109+ }
110+
111+ #[ derive( Default , Serialize ) ]
112+ struct Metric {
113+ #[ serde( serialize_with = "time_stamp" ) ]
114+ time_generated_utc : ( ) ,
115+ #[ serde( flatten) ]
116+ metric : InnerMetric ,
117+ }
118+
119+ static TIME_STAMP : LazyLock < RwLock < String > > =
120+ LazyLock :: new ( || RwLock :: new ( time:: OffsetDateTime :: now_utc ( ) . format ( & Rfc3339 ) . unwrap ( ) ) ) ;
121+
122+ fn time_stamp < S > ( _: & ( ) , s : S ) -> Result < S :: Ok , S :: Error >
123+ where
124+ S : Serializer ,
125+ {
126+ let Ok ( time_stamp) = TIME_STAMP . read ( ) else {
127+ return Err ( Error :: custom ( "failed to get timestamp" ) ) ;
128+ } ;
129+
130+ s. serialize_str ( & time_stamp)
131+ }
132+
133+ // ToDo rm lazy_staticw
71134lazy_static ! {
72135 static ref TIMESYNC_FILE : & ' static Path = if cfg!( feature = "mock" ) {
73136 Path :: new( "/tmp/synchronized" )
@@ -100,7 +163,6 @@ struct HardwareInfo {
100163 components : sysinfo:: Components ,
101164 disk : sysinfo:: Disks ,
102165 system : sysinfo:: System ,
103- hostname : String ,
104166}
105167
106168#[ derive( Default , Serialize ) ]
@@ -191,10 +253,6 @@ impl SystemInfo {
191253 RootPartition :: current( ) ?. as_str( )
192254 ) ;
193255
194- let Some ( hostname) = sysinfo:: System :: host_name ( ) else {
195- bail ! ( "metrics: hostname could not be read" )
196- } ;
197-
198256 feature:: add_watch :: < Self > ( & TIMESYNC_FILE , WatchMask :: CREATE | WatchMask :: ONESHOT ) ?;
199257
200258 if 0 < * REFRESH_SYSTEM_INFO_INTERVAL_SECS {
@@ -220,8 +278,6 @@ impl SystemInfo {
220278 . with_cpu ( sysinfo:: CpuRefreshKind :: everything ( ) )
221279 . with_memory ( sysinfo:: MemoryRefreshKind :: everything ( ) ) ,
222280 ) ,
223-
224- hostname,
225281 } ,
226282 reboot_reason : reboot_reason:: current_reboot_reason ( ) ,
227283 fleet_id : None ,
@@ -270,106 +326,50 @@ impl SystemInfo {
270326 . context ( "boot_time: format uptime" )
271327 }
272328
273- fn cpu_usage ( & self , time : String ) -> Metric {
274- Metric :: new (
275- time,
276- "cpu_usage" . to_string ( ) ,
277- self . hardware_info . system . global_cpu_usage ( ) as f64 ,
278- self . hardware_info . hostname . clone ( ) ,
279- None ,
280- )
281- }
282-
283- fn memory_used ( & self , time : String ) -> Metric {
284- Metric :: new (
285- time,
286- "memory_used" . to_string ( ) ,
287- self . hardware_info . system . used_memory ( ) as f64 ,
288- self . hardware_info . hostname . clone ( ) ,
289- None ,
290- )
291- }
292-
293- fn memory_total ( & self , time : String ) -> Metric {
294- Metric :: new (
295- time,
296- "memory_total" . to_string ( ) ,
297- self . hardware_info . system . total_memory ( ) as f64 ,
298- self . hardware_info . hostname . clone ( ) ,
299- None ,
300- )
301- }
302-
303- fn disk_used ( & self , time : String , value : f64 ) -> Metric {
304- Metric :: new (
305- time,
306- "disk_used" . to_string ( ) ,
307- value,
308- self . hardware_info . hostname . clone ( ) ,
309- None ,
310- )
311- }
312-
313- fn disk_total ( & self , time : String , value : f64 ) -> Metric {
314- Metric :: new (
315- time,
316- "disk_total" . to_string ( ) ,
317- value,
318- self . hardware_info . hostname . clone ( ) ,
319- None ,
320- )
321- }
322-
323- fn temp ( & self , time : String , value : f64 , sensor : String ) -> Metric {
324- Metric :: new (
325- time,
326- "temp" . to_string ( ) ,
327- value,
328- self . hardware_info . hostname . clone ( ) ,
329- Some ( sensor) ,
330- )
331- }
332-
333329 async fn metrics ( & mut self ) -> Result < ( ) > {
334330 let Some ( tx) = & self . tx_outgoing_message else {
335331 warn ! ( "metrics: skip since tx_outgoing_message is None" ) ;
336332 return Ok ( ( ) ) ;
337333 } ;
338334
339- let Ok ( time ) = time :: OffsetDateTime :: now_utc ( ) . format ( & Rfc3339 ) else {
340- bail ! ( "metrics: timestamp could not be generated " )
335+ let Ok ( mut time_stamp ) = TIME_STAMP . write ( ) else {
336+ bail ! ( "metrics: failed to lock TIME_STAMP " )
341337 } ;
338+ * time_stamp = time:: OffsetDateTime :: now_utc ( )
339+ . format ( & Rfc3339 )
340+ . context ( "metrics: failed to get and format timestamp" ) ?;
342341
343342 self . hardware_info . components . refresh ( true ) ;
344343 self . hardware_info . system . refresh_cpu_usage ( ) ;
345344 self . hardware_info . system . refresh_memory ( ) ;
346345 self . hardware_info . disk . refresh ( true ) ;
347346
348- let mut disk_total = 0 ;
349- let mut disk_used = 0 ;
350- for disk in self . hardware_info . disk . list ( ) {
351- if disk. name ( ) . to_str ( ) == Some ( "/dev/omnect/data" ) {
352- disk_total = disk. total_space ( ) ;
353- disk_used = disk. total_space ( ) - disk. available_space ( ) ;
354- break ;
355- }
356- }
357-
358- let mut metric_list = vec ! [
359- self . cpu_usage( time. clone( ) ) ,
360- self . memory_used( time. clone( ) ) ,
361- self . memory_total( time. clone( ) ) ,
362- self . disk_used( time. clone( ) , disk_used as f64 ) ,
363- self . disk_total( time. clone( ) , disk_total as f64 ) ,
347+ let ( disk_total, disk_used) = self
348+ . hardware_info
349+ . disk
350+ . into_iter ( )
351+ . filter_map ( |disk| match ( disk. name ( ) . to_str ( ) , disk. total_space ( ) ) {
352+ ( Some ( "/dev/omnect/data" ) , space) => Some ( ( space, space - disk. available_space ( ) ) ) ,
353+ _ => None ,
354+ } )
355+ . next ( )
356+ . unwrap_or ( ( 0 , 0 ) ) ;
357+
358+ let mut metrics = vec ! [
359+ MetricValue :: CpuUsage ( self . hardware_info. system. global_cpu_usage( ) as f64 ) . to_metric( ) ,
360+ MetricValue :: MemoryUsed ( self . hardware_info. system. used_memory( ) as f64 ) . to_metric( ) ,
361+ MetricValue :: MemoryTotal ( self . hardware_info. system. total_memory( ) as f64 ) . to_metric( ) ,
362+ MetricValue :: DiskUsed ( disk_used as f64 ) . to_metric( ) ,
363+ MetricValue :: DiskTotal ( disk_total as f64 ) . to_metric( ) ,
364364 ] ;
365365
366366 self . hardware_info . components . iter ( ) . for_each ( |c| {
367367 if let Some ( t) = c. temperature ( ) {
368- metric_list . push ( self . temp ( time . clone ( ) , t. into ( ) , c. label ( ) . to_string ( ) ) )
368+ metrics . push ( MetricValue :: Temp ( t. into ( ) , c. label ( ) . to_string ( ) ) . to_metric ( ) )
369369 } ;
370370 } ) ;
371371
372- let json = serde_json:: to_vec ( & metric_list )
372+ let json = serde_json:: to_vec ( & metrics )
373373 . context ( "metrics list could not be converted to vector:" ) ?;
374374
375375 let msg = IotMessage :: builder ( )
0 commit comments