Skip to content

Commit 0c80f16

Browse files
committed
wip: modbus server
1 parent 8113202 commit 0c80f16

File tree

3 files changed

+306
-31
lines changed

3 files changed

+306
-31
lines changed

rust/examples/modbus_client.rs

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
// Copyright (c) 2025 Ronan LE MEILLAT, SCTG Development
2+
// This file is part of the rust-photoacoustic project and is licensed under the
3+
// SCTG Development Non-Commercial License v1.0 (see LICENSE.md for details).
4+
5+
//! Simple Modbus client example for the photoacoustic water vapor analyzer
6+
//!
7+
//! This example demonstrates how to connect to the photoacoustic Modbus server
8+
//! and read measurement data. It can be used as a template for integrating
9+
//! the analyzer with SCADA systems, PLCs, or other industrial automation equipment.
10+
//!
11+
//! ## Usage
12+
//!
13+
//! First, start the photoacoustic daemon with Modbus enabled:
14+
//! ```bash
15+
//! cargo run -- --config config.yaml daemon
16+
//! ```
17+
//!
18+
//! Then run this client example:
19+
//! ```bash
20+
//! cargo run --example modbus_client
21+
//! ```
22+
//!
23+
//! ## Register Map
24+
//!
25+
//! ### Input Registers (Read-Only - Measurement Data)
26+
//! - Register 0: Resonance frequency (Hz × 10, 0.1 Hz resolution)
27+
//! - Register 1: Signal amplitude (dB × 1000, 0.001 dB resolution)
28+
//! - Register 2: Water vapor concentration (ppm × 10, 0.1 ppm resolution)
29+
//! - Register 3: Timestamp low word (UNIX epoch seconds)
30+
//! - Register 4: Timestamp high word (UNIX epoch seconds)
31+
//! - Register 5: Status code (0=normal, 1=warning, 2=error)
32+
//!
33+
//! ### Holding Registers (Read-Write - Configuration Data)
34+
//! - Register 0: Measurement interval (seconds), default: 10
35+
//! - Register 1: Averaging count (samples), default: 20
36+
//! - Register 2: Gain setting, default: 30
37+
//! - Register 3: Filter strength, default: 40
38+
39+
use std::time::{Duration, UNIX_EPOCH};
40+
use tokio::time;
41+
use tokio_modbus::client::{tcp::connect, Reader, Writer};
42+
43+
#[tokio::main]
44+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
45+
env_logger::init();
46+
47+
// Modbus server configuration (should match config.yaml)
48+
let server_address = "127.0.0.1:1502"; // Non-privileged port for security
49+
50+
println!("🔌 Photoacoustic Modbus Client");
51+
println!("=====================================");
52+
println!("Connecting to Modbus server at {}", server_address);
53+
54+
// Parse socket address
55+
let socket_addr: std::net::SocketAddr = server_address.parse()?;
56+
57+
// Connect to the Modbus server
58+
let mut ctx = match connect(socket_addr).await {
59+
Ok(ctx) => {
60+
println!("✅ Successfully connected to Modbus server");
61+
ctx
62+
}
63+
Err(e) => {
64+
eprintln!("❌ Failed to connect to Modbus server: {}", e);
65+
eprintln!("💡 Make sure the photoacoustic daemon is running with Modbus enabled");
66+
eprintln!(" Example: cargo run -- --config config.yaml daemon");
67+
return Err(e.into());
68+
}
69+
};
70+
71+
println!("\n📊 Reading measurement data (Input Registers):");
72+
println!("===============================================");
73+
74+
// Read input registers (measurement data)
75+
match ctx.read_input_registers(0, 6).await {
76+
Ok(Ok(data)) => {
77+
// Decode frequency (register 0)
78+
let freq_raw = data[0];
79+
let frequency = freq_raw as f32 / 10.0;
80+
println!("🌊 Resonance Frequency: {} Hz (raw: {})", frequency, freq_raw);
81+
82+
// Decode amplitude (register 1)
83+
let amp_raw = data[1];
84+
let amplitude = amp_raw as f32 / 1000.0;
85+
println!("📈 Signal Amplitude: {} dB (raw: {})", amplitude, amp_raw);
86+
87+
// Decode concentration (register 2)
88+
let conc_raw = data[2];
89+
let concentration = conc_raw as f32 / 10.0;
90+
println!("💧 Water Vapor Concentration: {} ppm (raw: {})", concentration, conc_raw);
91+
92+
// Decode timestamp (registers 3-4)
93+
let timestamp_low = data[3] as u32;
94+
let timestamp_high = data[4] as u32;
95+
let timestamp = timestamp_low | (timestamp_high << 16);
96+
97+
let current_time = std::time::SystemTime::now()
98+
.duration_since(UNIX_EPOCH)?
99+
.as_secs() as u32;
100+
let age_seconds = current_time.saturating_sub(timestamp);
101+
102+
println!("⏰ Measurement Timestamp: {} (age: {} seconds)", timestamp, age_seconds);
103+
104+
// Decode status (register 5)
105+
let status = data[5];
106+
let status_text = match status {
107+
0 => "Normal",
108+
1 => "Warning",
109+
2 => "Error",
110+
_ => "Unknown",
111+
};
112+
println!("📊 System Status: {} ({})", status_text, status);
113+
}
114+
Ok(Err(e)) => {
115+
eprintln!("❌ Modbus exception when reading input registers: {:?}", e);
116+
}
117+
Err(e) => {
118+
eprintln!("❌ Failed to read input registers: {}", e);
119+
}
120+
}
121+
122+
println!("\n⚙️ Reading configuration data (Holding Registers):");
123+
println!("====================================================");
124+
125+
// Read holding registers (configuration data)
126+
match ctx.read_holding_registers(0, 4).await {
127+
Ok(Ok(data)) => {
128+
println!("⏱️ Measurement Interval: {} seconds", data[0]);
129+
println!("🔢 Averaging Count: {} samples", data[1]);
130+
println!("📈 Gain Setting: {}", data[2]);
131+
println!("🎛️ Filter Strength: {}", data[3]);
132+
}
133+
Ok(Err(e)) => {
134+
eprintln!("❌ Modbus exception when reading holding registers: {:?}", e);
135+
}
136+
Err(e) => {
137+
eprintln!("❌ Failed to read holding registers: {}", e);
138+
}
139+
}
140+
141+
println!("\n✏️ Testing configuration write (Holding Registers):");
142+
println!("=====================================================");
143+
144+
// Example: Change measurement interval to 15 seconds
145+
match ctx.write_single_register(0, 15).await {
146+
Ok(_) => {
147+
println!("✅ Successfully set measurement interval to 15 seconds");
148+
149+
// Read back to confirm
150+
match ctx.read_holding_registers(0, 1).await {
151+
Ok(Ok(data)) => {
152+
println!("✅ Confirmed: Measurement interval is now {} seconds", data[0]);
153+
}
154+
Ok(Err(e)) => {
155+
eprintln!("❌ Modbus exception when reading back configuration: {:?}", e);
156+
}
157+
Err(e) => {
158+
eprintln!("❌ Failed to read back configuration: {}", e);
159+
}
160+
}
161+
}
162+
Err(e) => {
163+
eprintln!("❌ Failed to write configuration: {}", e);
164+
}
165+
}
166+
167+
// Example: Write multiple configuration values
168+
let new_config = [10, 25, 35, 45]; // interval, averaging, gain, filter
169+
match ctx.write_multiple_registers(0, &new_config).await {
170+
Ok(_) => {
171+
println!("✅ Successfully updated multiple configuration values");
172+
println!(" Interval: {} sec, Averaging: {} samples, Gain: {}, Filter: {}",
173+
new_config[0], new_config[1], new_config[2], new_config[3]);
174+
}
175+
Err(e) => {
176+
eprintln!("❌ Failed to write multiple registers: {}", e);
177+
}
178+
}
179+
180+
println!("\n🔄 Continuous monitoring (press Ctrl+C to stop):");
181+
println!("=================================================");
182+
183+
// Continuous monitoring loop
184+
for i in 1..=5 {
185+
println!("\n📊 Reading #{}", i);
186+
187+
match ctx.read_input_registers(0, 3).await {
188+
Ok(Ok(data)) => {
189+
let frequency = data[0] as f32 / 10.0;
190+
let amplitude = data[1] as f32 / 1000.0;
191+
let concentration = data[2] as f32 / 10.0;
192+
193+
println!(" Freq: {} Hz | Amp: {} dB | Conc: {} ppm",
194+
frequency, amplitude, concentration);
195+
}
196+
Ok(Err(e)) => {
197+
eprintln!("❌ Modbus exception when reading measurement data: {:?}", e);
198+
}
199+
Err(e) => {
200+
eprintln!("❌ Failed to read measurement data: {}", e);
201+
}
202+
}
203+
204+
time::sleep(Duration::from_secs(2)).await;
205+
}
206+
207+
println!("\n🎉 Modbus client example completed!");
208+
println!("💡 This demonstrates how to integrate the photoacoustic analyzer");
209+
println!(" with SCADA systems, PLCs, or other industrial automation equipment");
210+
211+
Ok(())
212+
}

rust/src/daemon/launch_daemon.rs

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -497,9 +497,8 @@ impl Daemon {
497497
drop(config_read); // Release the read lock
498498

499499
let running = self.running.clone();
500-
// Create a clone of the data source to share with the server
501-
let data_source = self.get_data_source();
502-
// Create another clone for the server task
500+
// Get a reference to the shared computing state
501+
let computing_state = Arc::clone(&self.computing_state);
503502

504503
let task = tokio::spawn(async move {
505504
let socket_addr: SocketAddr = socket_addr_str.parse().expect("Invalid socket address");
@@ -514,20 +513,31 @@ impl Daemon {
514513
// Create a new Modbus server instance
515514
let on_connected = move |stream, socket_addr| {
516515
// Clone the Arc to avoid moving the original
517-
let data_source_clone = data_source.clone();
518-
let current_data_clone = data_source_clone.get_latest_data().unwrap();
519-
debug!(
520-
"Data are now frequency:{} amplitude:{} concentration:{}",
521-
current_data_clone.frequency,
522-
current_data_clone.amplitude,
523-
current_data_clone.concentration
524-
);
516+
let computing_state_clone = computing_state.clone();
517+
518+
// Log current data from computing state
519+
if let Ok(state) = computing_state_clone.try_read() {
520+
if let (Some(freq), Some(amp), Some(conc)) = (
521+
state.peak_frequency,
522+
state.peak_amplitude,
523+
state.concentration_ppm,
524+
) {
525+
debug!(
526+
"Computing state contains - frequency:{} amplitude:{} concentration:{}",
527+
freq, amp, conc
528+
);
529+
} else {
530+
debug!("Computing state contains no measurement data yet");
531+
}
532+
} else {
533+
debug!("Could not read computing state");
534+
}
525535

526536
async move {
527537
accept_tcp_connection(stream, socket_addr, move |_socket_addr| {
528538
// Use the cloned Arc in this inner closure
529-
Ok(Some(PhotoacousticModbusServer::with_data_source(
530-
&data_source_clone,
539+
Ok(Some(PhotoacousticModbusServer::with_computing_state(
540+
&computing_state_clone,
531541
)))
532542
})
533543
}
@@ -544,10 +554,7 @@ impl Daemon {
544554
}
545555
});
546556

547-
// Create a cancellation token for the server task
548-
let _running_clone = running.clone();
549-
550-
// Periodically update the modbus server with latest measurement data
557+
// Monitor the running flag and shutdown when requested
551558
while running.load(Ordering::SeqCst) {
552559
// Check every second if we should continue running
553560
time::sleep(Duration::from_secs(1)).await;
@@ -1060,6 +1067,18 @@ impl Daemon {
10601067
self.data_source.clone()
10611068
}
10621069

1070+
/// Get the shared computing state
1071+
///
1072+
/// Returns a clone of the `Arc<RwLock<ComputingSharedData>>` for sharing the
1073+
/// computing state with other components that need access to real-time measurement data.
1074+
///
1075+
/// ### Returns
1076+
///
1077+
/// A cloned `Arc` pointing to the shared computing state
1078+
pub fn get_computing_state(&self) -> SharedComputingState {
1079+
Arc::clone(&self.computing_state)
1080+
}
1081+
10631082
/// Get a reference to the shared audio stream
10641083
///
10651084
/// Returns the shared audio stream if acquisition is enabled and running.

0 commit comments

Comments
 (0)