Skip to content

Commit b808db7

Browse files
didier-wenzekjarhodes314
authored andcommitted
Distinguish two message sources: MQTT vs DB
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 2fa0e5b commit b808db7

File tree

11 files changed

+1096
-132
lines changed

11 files changed

+1096
-132
lines changed

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl TestCommand {
7575
}
7676
};
7777
processor
78-
.on_message(timestamp, &message)
78+
.on_message(MessageSource::MQTT, timestamp, &message)
7979
.await
8080
.into_iter()
8181
.for_each(|msg| self.print_messages(msg))

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 29 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::flow::DateTime;
22
use crate::flow::FlowOutput;
33
use crate::flow::Message;
4+
use crate::flow::MessageSource;
45
use crate::runtime::MessageProcessor;
56
use crate::InputMessage;
67
use crate::OutputMessage;
@@ -22,6 +23,7 @@ use tedge_mqtt_ext::TopicFilter;
2223
use tokio::time::sleep_until;
2324
use tokio::time::Instant;
2425
use tracing::error;
26+
use tracing::info;
2527

2628
pub const STATS_DUMP_INTERVAL: Duration = Duration::from_secs(300);
2729

@@ -42,13 +44,17 @@ impl Actor for FlowsMapper {
4244
while let Some(message) = self.next_message().await {
4345
match message {
4446
InputMessage::Tick(_) => {
45-
self.on_interval().await?;
46-
4747
let drained_messages = self.drain_db().await?;
48-
self.filter_all(drained_messages).await?;
48+
self.on_messages(MessageSource::MeaDB, drained_messages)
49+
.await?;
50+
51+
self.on_interval().await?;
4952
}
5053
InputMessage::MqttMessage(message) => match Message::try_from(message) {
51-
Ok(message) => self.on_message(message).await?,
54+
Ok(message) => {
55+
self.on_message(MessageSource::MQTT, DateTime::now(), message)
56+
.await?
57+
}
5258
Err(err) => {
5359
error!(target: "flows", "Cannot process message: {err}");
5460
}
@@ -125,24 +131,16 @@ impl FlowsMapper {
125131
diff
126132
}
127133

128-
async fn on_message(&mut self, message: Message) -> Result<(), RuntimeError> {
129-
let timestamp = DateTime::now();
130-
for (flow_id, flow_messages) in self.processor.on_message(timestamp, &message).await {
134+
async fn on_message(
135+
&mut self,
136+
source: MessageSource,
137+
timestamp: DateTime,
138+
message: Message,
139+
) -> Result<(), RuntimeError> {
140+
for (flow_id, flow_messages) in self.processor.on_message(source, timestamp, &message).await
141+
{
131142
match flow_messages {
132-
Ok(messages) => {
133-
for message in messages {
134-
match MqttMessage::try_from(message) {
135-
Ok(message) => {
136-
self.messages
137-
.send(OutputMessage::MqttMessage(message))
138-
.await?
139-
}
140-
Err(err) => {
141-
error!(target: "flows", "{flow_id}: cannot send transformed message: {err}")
142-
}
143-
}
144-
}
145-
}
143+
Ok(messages) => self.publish_messages(flow_id, timestamp, messages).await?,
146144
Err(err) => {
147145
error!(target: "flows", "{flow_id}: {err}");
148146
}
@@ -152,26 +150,14 @@ impl FlowsMapper {
152150
Ok(())
153151
}
154152

155-
async fn filter_all(&mut self, messages: Vec<(DateTime, Message)>) -> Result<(), RuntimeError> {
153+
async fn on_messages(
154+
&mut self,
155+
source: MessageSource,
156+
messages: Vec<(DateTime, Message)>,
157+
) -> Result<(), RuntimeError> {
156158
for (timestamp, message) in messages {
157-
self.filter(timestamp, message).await?
158-
}
159-
Ok(())
160-
}
161-
162-
async fn filter(&mut self, timestamp: DateTime, message: Message) -> Result<(), RuntimeError> {
163-
for (flow_id, flow_messages) in self.processor.process(timestamp, &message).await {
164-
match flow_messages {
165-
Ok(messages) => {
166-
self.publish_messages(flow_id.clone(), timestamp, messages)
167-
.await?;
168-
}
169-
Err(err) => {
170-
error!(target: "flows", "{flow_id}: {err}");
171-
}
172-
}
159+
self.on_message(source, timestamp, message).await?
173160
}
174-
175161
Ok(())
176162
}
177163

@@ -225,6 +211,7 @@ impl FlowsMapper {
225211
}
226212
FlowOutput::MeaDB { output_series } => {
227213
for message in messages {
214+
info!(target: "flows", "store {output_series} @{}.{} [{}]", timestamp.seconds, timestamp.nanoseconds, message.topic);
228215
if let Err(err) = self
229216
.processor
230217
.database
@@ -246,6 +233,9 @@ impl FlowsMapper {
246233
for (flow_id, flow_messages) in self.processor.drain_db(timestamp).await {
247234
match flow_messages {
248235
Ok(flow_messages) => {
236+
for (t, m) in flow_messages.iter() {
237+
info!(target: "flows", "drained: @{}.{} [{}]", t.seconds, t.nanoseconds, m.topic);
238+
}
249239
messages.extend(flow_messages);
250240
}
251241
Err(err) => {
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use anyhow::Context;
2+
use tedge_flows::flow::DateTime;
3+
use tedge_flows::flow::Message;
4+
5+
const DB_PATH: &str = "/etc/tedge/tedge-gen.db";
6+
7+
type Record = (DateTime, Message);
8+
9+
#[tokio::main]
10+
async fn main() -> anyhow::Result<()> {
11+
let mut args = std::env::args();
12+
let series_name = args.nth(1).unwrap_or("latest-data-points".to_string());
13+
println!("Reading series name: {series_name}");
14+
15+
let mut db = tedge_flows::MeaDB::open(DB_PATH)
16+
.await
17+
.with_context(|| format!("Failed to open DB at path={DB_PATH}"))?;
18+
19+
let items = db
20+
.query_all(&series_name)
21+
.await
22+
.context("Failed to query for all items")?;
23+
24+
items.iter().for_each(print_record);
25+
26+
Ok(())
27+
}
28+
29+
fn print_record(record: &Record) {
30+
let time = record.0.seconds;
31+
let topic = &record.1.topic;
32+
let payload = std::str::from_utf8(&record.1.payload).unwrap_or("<binary payload>");
33+
println!("[{time}]\t{topic}\t{payload}")
34+
}

crates/extensions/tedge_flows/src/config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,16 @@ impl FlowConfig {
122122
steps.push(step);
123123
}
124124
let output = self.output.try_into()?;
125-
Ok(Flow {
125+
let mut flow = Flow {
126126
input,
127127
steps,
128128
source,
129129
output,
130-
})
130+
next_drain: None,
131+
last_drain: None,
132+
};
133+
flow.init_next_drain();
134+
Ok(flow)
131135
}
132136
}
133137

crates/extensions/tedge_flows/src/flow.rs

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use tokio::time::Instant;
1313
use tracing::warn;
1414

1515
/// A chain of transformation of MQTT messages
16+
#[derive(Debug)]
1617
pub struct Flow {
1718
/// The source topics
1819
pub input: FlowInput,
@@ -24,14 +25,22 @@ pub struct Flow {
2425

2526
/// Target of the transformed messages
2627
pub output: FlowOutput,
28+
29+
/// Next time to drain database for MeaDB inputs (for deadline-based wakeup)
30+
pub next_drain: Option<tokio::time::Instant>,
31+
32+
/// Last time database was drained (for frequency checking)
33+
pub last_drain: Option<DateTime>,
2734
}
2835

2936
/// A message transformation step
37+
#[derive(Debug)]
3038
pub struct FlowStep {
3139
pub script: JsScript,
3240
pub config_topics: TopicFilter,
3341
}
3442

43+
#[derive(Debug)]
3544
pub enum FlowInput {
3645
MQTT {
3746
topics: TopicFilter,
@@ -49,6 +58,12 @@ pub enum FlowOutput {
4958
MeaDB { output_series: String },
5059
}
5160

61+
#[derive(Copy, Clone, Eq, PartialEq)]
62+
pub enum MessageSource {
63+
MQTT,
64+
MeaDB,
65+
}
66+
5267
#[derive(
5368
Copy, Clone, Debug, serde::Deserialize, serde::Serialize, Eq, PartialEq, Ord, PartialOrd,
5469
)]
@@ -77,8 +92,57 @@ pub enum FlowError {
7792
}
7893

7994
impl Flow {
95+
pub fn accept(&self, source: MessageSource, message_topic: &str) -> bool {
96+
match &self.input {
97+
FlowInput::MQTT {
98+
topics: input_topics,
99+
} => source == MessageSource::MQTT && input_topics.accept_topic_name(message_topic),
100+
FlowInput::MeaDB { .. } => source == MessageSource::MeaDB,
101+
}
102+
}
103+
104+
pub fn init_next_drain(&mut self) {
105+
if let FlowInput::MeaDB { frequency, .. } = &self.input {
106+
if !frequency.is_zero() {
107+
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
108+
}
109+
}
110+
}
111+
112+
pub fn should_drain_at(&mut self, timestamp: DateTime) -> bool {
113+
if let FlowInput::MeaDB { frequency, .. } = &self.input {
114+
if frequency.is_zero() {
115+
return false;
116+
}
117+
118+
// Check if enough time has passed since last drain
119+
match self.last_drain {
120+
Some(last_drain) => {
121+
let elapsed_secs = timestamp.seconds.saturating_sub(last_drain.seconds);
122+
let frequency_secs = frequency.as_secs();
123+
if elapsed_secs >= frequency_secs {
124+
self.last_drain = Some(timestamp);
125+
// Also update the deadline for the actor loop
126+
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
127+
true
128+
} else {
129+
false
130+
}
131+
}
132+
None => {
133+
// First drain
134+
self.last_drain = Some(timestamp);
135+
self.next_drain = Some(tokio::time::Instant::now() + *frequency);
136+
true
137+
}
138+
}
139+
} else {
140+
false
141+
}
142+
}
143+
80144
pub fn topics(&self) -> TopicFilter {
81-
let mut topics = self.input.topics().clone();
145+
let mut topics = self.input.topics();
82146
for step in self.steps.iter() {
83147
topics.add_all(step.config_topics.clone())
84148
}
@@ -101,12 +165,13 @@ impl Flow {
101165
pub async fn on_message(
102166
&mut self,
103167
js_runtime: &JsRuntime,
168+
source: MessageSource,
104169
stats: &mut Counter,
105170
timestamp: DateTime,
106171
message: &Message,
107172
) -> Result<Vec<Message>, FlowError> {
108173
self.on_config_update(js_runtime, message).await?;
109-
if !self.input.topics().accept_topic_name(&message.topic) {
174+
if !self.accept(source, &message.topic) {
110175
return Ok(vec![]);
111176
}
112177

@@ -203,14 +268,12 @@ impl FlowStep {
203268
}
204269

205270
impl FlowInput {
206-
pub fn topics(&self) -> &TopicFilter {
271+
fn topics(&self) -> TopicFilter {
207272
match self {
208-
FlowInput::MQTT { topics } => topics,
273+
FlowInput::MQTT { topics } => topics.clone(),
209274
FlowInput::MeaDB { .. } => {
210275
// MeaDB inputs don't subscribe to MQTT topics
211-
// Return an empty topic filter
212-
static EMPTY_TOPICS: std::sync::OnceLock<TopicFilter> = std::sync::OnceLock::new();
213-
EMPTY_TOPICS.get_or_init(TopicFilter::empty)
276+
TopicFilter::empty()
214277
}
215278
}
216279
}

crates/extensions/tedge_flows/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ use tedge_mqtt_ext::TopicFilter;
3232
use tokio::time::Instant;
3333
use tracing::error;
3434

35+
pub use runtime::MeaDB;
36+
3537
fan_in_message_type!(InputMessage[MqttMessage, FsWatchEvent, Tick]: Clone, Debug, Eq, PartialEq);
3638
fan_in_message_type!(OutputMessage[MqttMessage, SubscriptionDiff]: Clone, Debug, Eq, PartialEq);
3739

0 commit comments

Comments
 (0)