Skip to content

Commit 2cc44b2

Browse files
committed
Refactor database into separate module, and add in memory database for testing
1 parent b808db7 commit 2cc44b2

File tree

11 files changed

+611
-299
lines changed

11 files changed

+611
-299
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/tedge/src/cli/flows/test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl TestCommand {
7575
}
7676
};
7777
processor
78-
.on_message(MessageSource::MQTT, timestamp, &message)
78+
.on_message(MessageSource::Mqtt, timestamp, &message)
7979
.await
8080
.into_iter()
8181
.for_each(|msg| self.print_messages(msg))

crates/extensions/tedge_flows/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ toml = { workspace = true, features = ["parse"] }
3535
tracing = { workspace = true }
3636

3737
[dev-dependencies]
38+
env_logger = { workspace = true }
39+
rstest = { workspace = true }
3840
tedge_mqtt_ext = { workspace = true, features = ["test-helpers"] }
3941
tempfile = { workspace = true }
4042
tokio = { workspace = true, features = ["test-util"] }

crates/extensions/tedge_flows/src/actor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ impl Actor for FlowsMapper {
5252
}
5353
InputMessage::MqttMessage(message) => match Message::try_from(message) {
5454
Ok(message) => {
55-
self.on_message(MessageSource::MQTT, DateTime::now(), message)
55+
self.on_message(MessageSource::Mqtt, DateTime::now(), message)
5656
.await?
5757
}
5858
Err(err) => {
@@ -142,7 +142,7 @@ impl FlowsMapper {
142142
match flow_messages {
143143
Ok(messages) => self.publish_messages(flow_id, timestamp, messages).await?,
144144
Err(err) => {
145-
error!(target: "flows", "{flow_id}: {err}");
145+
error!(target: "flows", "{flow_id}: {err:#}");
146146
}
147147
}
148148
}
@@ -192,7 +192,7 @@ impl FlowsMapper {
192192
) -> Result<(), RuntimeError> {
193193
if let Some(flow) = self.processor.flows.get(&flow_id) {
194194
match &flow.output {
195-
FlowOutput::MQTT { output_topics } => {
195+
FlowOutput::Mqtt { output_topics } => {
196196
for message in messages {
197197
match MqttMessage::try_from(message) {
198198
Ok(message) if output_topics.accept_topic(&message.topic) => {
@@ -212,6 +212,7 @@ impl FlowsMapper {
212212
FlowOutput::MeaDB { output_series } => {
213213
for message in messages {
214214
info!(target: "flows", "store {output_series} @{}.{} [{}]", timestamp.seconds, timestamp.nanoseconds, message.topic);
215+
let timestamp = DateTime::now();
215216
if let Err(err) = self
216217
.processor
217218
.database

crates/extensions/tedge_flows/src/bin/db_dump.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
use anyhow::Context;
2+
use tedge_flows::database::FjallMeaDb;
3+
use tedge_flows::database::MeaDb;
24
use tedge_flows::flow::DateTime;
35
use tedge_flows::flow::Message;
46

@@ -12,7 +14,7 @@ async fn main() -> anyhow::Result<()> {
1214
let series_name = args.nth(1).unwrap_or("latest-data-points".to_string());
1315
println!("Reading series name: {series_name}");
1416

15-
let mut db = tedge_flows::MeaDB::open(DB_PATH)
17+
let mut db = FjallMeaDb::open(DB_PATH)
1618
.await
1719
.with_context(|| format!("Failed to open DB at path={DB_PATH}"))?;
1820

crates/extensions/tedge_flows/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ impl TryFrom<InputConfig> for FlowInput {
163163

164164
fn try_from(input: InputConfig) -> Result<Self, Self::Error> {
165165
match input {
166-
InputConfig::Mqtt { topics } => Ok(FlowInput::MQTT {
166+
InputConfig::Mqtt { topics } => Ok(FlowInput::Mqtt {
167167
topics: topic_filters(topics)?,
168168
}),
169169
InputConfig::MeaDB {
@@ -184,7 +184,7 @@ impl TryFrom<OutputConfig> for FlowOutput {
184184

185185
fn try_from(output: OutputConfig) -> Result<Self, Self::Error> {
186186
match output {
187-
OutputConfig::Mqtt { topics } => Ok(FlowOutput::MQTT {
187+
OutputConfig::Mqtt { topics } => Ok(FlowOutput::Mqtt {
188188
output_topics: topic_filters(topics)?,
189189
}),
190190
OutputConfig::MeaDB { series } => Ok(FlowOutput::MeaDB {

0 commit comments

Comments
 (0)