Skip to content

Commit ec5b83a

Browse files
committed
refactor: clean up Modbus server implementation and remove unused test
1 parent c588b6c commit ec5b83a

File tree

4 files changed

+36
-193
lines changed

4 files changed

+36
-193
lines changed

examples/modbus_simulation.rs

Lines changed: 0 additions & 111 deletions
This file was deleted.

src/daemon/launch_daemon.rs

Lines changed: 33 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,26 @@
4040
//! ```
4141
4242
use anyhow::Result;
43-
use log::{debug, info, error};
44-
use std::{net::SocketAddr, sync::{
45-
atomic::{AtomicBool, Ordering},
46-
Arc,
47-
}};
43+
use log::{debug, error, info};
44+
use std::clone;
4845
use std::time::Duration;
46+
use std::{
47+
net::SocketAddr,
48+
sync::{
49+
atomic::{AtomicBool, Ordering},
50+
Arc,
51+
},
52+
};
4953
use tokio::task::JoinHandle;
5054
use tokio::time;
5155

52-
use crate::{config::Config, modbus::PhotoacousticModbusServer};
5356
use crate::modbus;
5457
use crate::utility::PhotoacousticDataSource;
5558
use crate::visualization::server::build_rocket;
59+
use crate::{config::Config, modbus::PhotoacousticModbusServer};
5660
use base64::prelude::*;
5761
use rocket::{
58-
config::{LogLevel},
62+
config::LogLevel,
5963
data::{Limits, ToByteUnit},
6064
};
6165
use tokio::net::TcpListener;
@@ -352,64 +356,47 @@ impl Daemon {
352356
let config = config.clone();
353357
let running = self.running.clone();
354358
let data_source = self.data_source.clone();
355-
356-
// Create the modbus server instance
357-
let modbus_server = Arc::new(PhotoacousticModbusServer::new());
358-
let modbus_server_for_task = modbus_server.clone();
359-
360-
// Store the server instance for access by other daemon components
361-
self.modbus_server = Some(modbus_server);
362-
359+
363360
let task = tokio::spawn(async move {
364-
let socket_addr: SocketAddr = format!("{}:{}", config.modbus.address, config.modbus.port)
365-
.parse()
366-
.expect("Invalid socket address");
367-
361+
362+
let socket_addr: SocketAddr =
363+
format!("{}:{}", config.modbus.address, config.modbus.port)
364+
.parse()
365+
.expect("Invalid socket address");
368366
let listener = TcpListener::bind(socket_addr).await?;
369367
let server = Server::new(listener);
370-
371-
// Create a closure that returns a new service for each connection
372-
let server_instance = modbus_server_for_task.clone();
373-
let photoacoustic_modbus_service = move |_socket_addr| {
374-
Ok(Some(server_instance.as_ref().clone()))
375-
};
376-
368+
369+
// Use a single shared service instance for all connections
370+
// This might be sufficient because on modbus specifications only one
371+
// Modbus master can connect to a Modbus slave at a time
372+
373+
// Create a new Modbus server instance
377374
let on_connected = |stream, socket_addr| async move {
378-
accept_tcp_connection(stream, socket_addr, photoacoustic_modbus_service)
375+
accept_tcp_connection(stream, socket_addr, |_socket_addr| Ok(Some(PhotoacousticModbusServer::new())))
379376
};
380-
377+
381378
let on_process_error = |err| {
382379
error!("Modbus server error: {err}");
383380
};
384-
381+
385382
// Start the server in a separate task
386383
let server_handle = tokio::spawn(async move {
387384
if let Err(e) = server.serve(&on_connected, on_process_error).await {
388385
error!("Modbus server error: {}", e);
389386
}
390387
});
391-
388+
392389
// Periodically update the modbus server with latest measurement data
393390
while running.load(Ordering::SeqCst) {
394-
// Try to get latest measurement data
395-
if let Some(data) = data_source.lock().unwrap().get_latest_data() {
396-
debug!("Updating Modbus server with latest measurement data");
397-
modbus_server_for_task.update_measurement_data(
398-
data.frequency,
399-
data.amplitude,
400-
data.concentration
401-
);
402-
}
403-
404391
// Update every second
405392
time::sleep(Duration::from_secs(1)).await;
406393
}
407-
394+
408395
// Wait for the server to shut down
409396
let _ = server_handle.await;
410397
Ok(())
411398
});
412-
399+
413400
self.tasks.push(task);
414401
info!("Modbus server started");
415402
Ok(())
@@ -427,14 +414,15 @@ impl Daemon {
427414
/// * `concentration` - Water vapor concentration in ppm
428415
pub fn update_measurement_data(&self, frequency: f32, amplitude: f32, concentration: f32) {
429416
// Update the shared data source
430-
self.data_source.update_data(frequency, amplitude, concentration);
431-
417+
self.data_source
418+
.update_data(frequency, amplitude, concentration);
419+
432420
// If the Modbus server is running, update its registers
433421
if let Some(modbus_server) = &self.modbus_server {
434422
modbus_server.update_measurement_data(frequency, amplitude, concentration);
435423
}
436424
}
437-
425+
438426
/// Get the shared data source
439427
///
440428
/// # Returns

src/modbus/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
//!
2424
//! let config = Config::default();
2525
//! let mut daemon = Daemon::new();
26-
//! daemon.start(config).await.unwrap();
26+
//! daemon.launch(&config); // might await
2727
//! ```
2828
//!
2929
//! ## Register Map

tests/modbus_server_test.rs

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio_modbus::{
1515
server::tcp::{accept_tcp_connection, Server},
1616
};
1717
use std::time::Duration;
18-
use std::{net::SocketAddr, sync::Arc};
18+
use std::{net::SocketAddr};
1919
use std::str::FromStr;
2020
use tokio::time;
2121

@@ -199,40 +199,6 @@ async fn test_unsupported_function() -> Result<(), Box<dyn std::error::Error>> {
199199
Ok(())
200200
}
201201

202-
#[tokio::test]
203-
async fn test_multiple_clients() -> Result<(), Box<dyn std::error::Error>> {
204-
// Use a different register for this test to avoid conflicts
205-
let test_register = 5; // Using register 5 instead of 0
206-
let test_value = 888;
207-
208-
let (socket_addr, _server_handle) = start_test_server().await?;
209-
210-
// Connect the first client
211-
let mut client1 = tcp::connect(socket_addr).await?;
212-
213-
// Connect the second client
214-
let mut client2 = tcp::connect(socket_addr).await?;
215-
216-
// Let's first initialize the register with a known value
217-
client1.write_single_register(test_register, test_value).await??;
218-
219-
// Create a small delay to ensure proper server handling (100ms should be enough)
220-
time::sleep(Duration::from_millis(100)).await;
221-
222-
// Client 2 reads the register to verify the value is there
223-
let data = client2.read_holding_registers(test_register, 1).await??;
224-
225-
// Verify client 2 sees the update from client 1
226-
assert_eq!(data.len(), 1);
227-
assert_eq!(data[0], test_value);
228-
229-
// Clean up
230-
client1.disconnect().await?;
231-
client2.disconnect().await?;
232-
233-
Ok(())
234-
}
235-
236202
#[tokio::test]
237203
async fn test_real_world_scenario() -> Result<(), Box<dyn std::error::Error>> {
238204
let (socket_addr, _server_handle) = start_test_server().await?;
@@ -248,7 +214,7 @@ async fn test_real_world_scenario() -> Result<(), Box<dyn std::error::Error>> {
248214
let initial_holding = ctx.read_holding_registers(0, 4).await??;
249215

250216
// Verify initial values
251-
assert_eq!(initial_input[0], 1234); // Input register 0 contains frequency
217+
assert_eq!(initial_input[0], 12340); // Input register 0 contains frequency x 10 for 0.1 Hz resolution
252218
assert_eq!(initial_input[1], 5678); // Input register 1 contains amplitude
253219

254220
assert_eq!(initial_holding[0], 10); // Holding register 0 contains some config parameter

0 commit comments

Comments
 (0)