Skip to content

Commit 0f75d43

Browse files
authored
Merge pull request #276 from carlaKC/244-batchedwriter
simln-lib: add batched writer for results
2 parents 27fd6e3 + e89419f commit 0f75d43

File tree

4 files changed

+192
-29
lines changed

4 files changed

+192
-29
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

simln-lib/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,4 @@ tokio-util = { version = "0.7.13", features = ["rt"] }
3838
ntest = "0.9.0"
3939
mockall = "0.13.1"
4040
futures = "0.3.31"
41+
tempfile = "3"

simln-lib/src/batched_writer.rs

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
use csv::{Writer, WriterBuilder};
2+
use serde::Serialize;
3+
use std::fs::File;
4+
use std::path::PathBuf;
5+
6+
use crate::SimulationError;
7+
8+
/// Implements a writer that will write records in batches to the file provided.
9+
pub struct BatchedWriter {
10+
batch_size: u32,
11+
counter: u32,
12+
writer: Writer<File>,
13+
}
14+
15+
impl BatchedWriter {
16+
/// Creates a new writer and the results file that output will be written to.
17+
pub fn new(
18+
directory: PathBuf,
19+
file_name: String,
20+
batch_size: u32,
21+
) -> Result<BatchedWriter, SimulationError> {
22+
if batch_size == 0 {
23+
return Err(SimulationError::FileError);
24+
}
25+
26+
let file = directory.join(file_name);
27+
28+
let writer = WriterBuilder::new()
29+
.from_path(file)
30+
.map_err(SimulationError::CsvError)?;
31+
32+
Ok(BatchedWriter {
33+
batch_size,
34+
counter: 0,
35+
writer,
36+
})
37+
}
38+
39+
/// Adds an item to the batch to be written, flushing to disk if the batch size has been reached.
40+
pub fn queue<S: Serialize>(&mut self, record: S) -> Result<(), SimulationError> {
41+
// If there's an error serializing an input, flush and exit with an error.
42+
self.writer.serialize(record).map_err(|e| {
43+
// If we encounter another errors (when we've already failed to serialize), we just log because we've
44+
// already experienced a critical error.
45+
if let Err(e) = self.write(true) {
46+
log::error!("Error flushing to disk: {e}");
47+
}
48+
49+
SimulationError::CsvError(e)
50+
})?;
51+
52+
// Otherwise increment counter and flush if we've reached batch size.
53+
self.counter = self.counter % self.batch_size + 1;
54+
self.write(false)
55+
}
56+
57+
/// Writes the contents of the batched writer to disk. Will result in a write if force is true _or_ the batch is
58+
/// full.
59+
pub fn write(&mut self, force: bool) -> Result<(), SimulationError> {
60+
if force || self.batch_size == self.counter {
61+
self.counter = 0;
62+
return self
63+
.writer
64+
.flush()
65+
.map_err(|e| SimulationError::CsvError(e.into()));
66+
}
67+
68+
Ok(())
69+
}
70+
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use super::*;
75+
use serde::{Deserialize, Serialize};
76+
use tempfile::tempdir;
77+
78+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)]
79+
struct TestRecord {
80+
id: u32,
81+
}
82+
83+
fn read_csv_contents(file_path: &PathBuf) -> Vec<TestRecord> {
84+
let mut reader = csv::Reader::from_path(file_path).unwrap();
85+
reader.deserialize().map(|r| r.unwrap()).collect()
86+
}
87+
88+
#[test]
89+
fn test_basic_write_and_flush_on_batch_size() {
90+
let dir = tempdir().unwrap();
91+
let file_name = "test_basic_write_and_flush_on_batch_size.csv".to_string();
92+
let file_path = dir.path().join(&file_name);
93+
let mut writer = BatchedWriter::new(dir.path().to_path_buf(), file_name, 2).unwrap();
94+
95+
let rec1 = TestRecord { id: 1 };
96+
let rec2 = TestRecord { id: 2 };
97+
98+
writer.queue(rec1).unwrap();
99+
writer.queue(rec2).unwrap();
100+
101+
assert!(file_path.exists(), "File should exist after flush");
102+
let records = read_csv_contents(&file_path);
103+
assert_eq!(records, vec![rec1, rec2]);
104+
105+
// Queuing a record that doesn't hit our batch limit shouldn't flush.
106+
let rec3 = TestRecord { id: 3 };
107+
108+
writer.queue(rec3).unwrap();
109+
let records = read_csv_contents(&file_path);
110+
assert_eq!(records, vec![rec1, rec2]);
111+
112+
// Force flushing should write even if batch isn't hit.
113+
writer.write(true).unwrap();
114+
let records = read_csv_contents(&file_path);
115+
assert_eq!(records, vec![rec1, rec2, rec3]);
116+
}
117+
118+
#[test]
119+
fn test_forced_flush() {
120+
let dir = tempdir().unwrap();
121+
let file_name = "test_forced_flush.csv".to_string();
122+
let file_path = dir.path().join(&file_name);
123+
let mut writer = BatchedWriter::new(dir.path().to_path_buf(), file_name, 10).unwrap();
124+
125+
let rec = TestRecord { id: 1 };
126+
writer.queue(rec).unwrap();
127+
128+
writer.write(true).unwrap();
129+
assert!(file_path.exists(), "File should exist after forced flush");
130+
let records = read_csv_contents(&file_path);
131+
assert_eq!(records, vec![rec]);
132+
}
133+
134+
#[test]
135+
fn test_zero_batch_size() {
136+
let dir = tempdir().unwrap();
137+
let file_name = "test_zero_batch_size_no_auto_flush.csv".to_string();
138+
assert!(BatchedWriter::new(dir.path().to_path_buf(), file_name, 0).is_err());
139+
}
140+
141+
/// Create a record that can't be serialized.
142+
struct BadRecord {}
143+
144+
impl Serialize for BadRecord {
145+
fn serialize<S>(&self, _: S) -> Result<S::Ok, S::Error>
146+
where
147+
S: serde::Serializer,
148+
{
149+
Err(serde::ser::Error::custom("intentional failure"))
150+
}
151+
}
152+
153+
#[test]
154+
fn test_serialization_error() {
155+
let dir = tempdir().unwrap();
156+
let file_name = "test_serialization_error.csv".to_string();
157+
let file_path = dir.path().join(&file_name);
158+
let mut writer = BatchedWriter::new(dir.path().to_path_buf(), file_name, 2).unwrap();
159+
160+
let rec = TestRecord { id: 1 };
161+
writer.queue(rec).unwrap();
162+
163+
let bad = BadRecord {};
164+
let result = writer.queue(bad);
165+
assert!(result.is_err());
166+
167+
writer.write(true).unwrap();
168+
assert!(file_path.exists(), "File should exist after forced flush");
169+
let records = read_csv_contents(&file_path);
170+
assert_eq!(records, vec![rec]);
171+
}
172+
}

simln-lib/src/lib.rs

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#![deny(rustdoc::broken_intra_doc_links)]
22

3+
use self::batched_writer::BatchedWriter;
34
use self::clock::Clock;
45
use async_trait::async_trait;
56
use bitcoin::secp256k1::PublicKey;
67
use bitcoin::Network;
7-
use csv::WriterBuilder;
88
use lightning::ln::features::NodeFeatures;
99
use lightning::ln::PaymentHash;
1010
use rand::{Rng, RngCore, SeedableRng};
@@ -29,6 +29,7 @@ use triggered::{Listener, Trigger};
2929
use self::defined_activity::DefinedPaymentActivity;
3030
use self::random_activity::{NetworkGraphView, RandomPaymentActivity};
3131

32+
mod batched_writer;
3233
pub mod cln;
3334
pub mod clock;
3435
mod defined_activity;
@@ -1322,49 +1323,37 @@ async fn consume_simulation_results(
13221323
listener: Listener,
13231324
write_results: Option<WriteResults>,
13241325
) -> Result<(), SimulationError> {
1325-
let mut writer = match write_results {
1326-
Some(res) => {
1327-
let duration = clock.now().duration_since(SystemTime::UNIX_EPOCH)?;
1328-
let file = res
1329-
.results_dir
1330-
.join(format!("simulation_{:?}.csv", duration));
1331-
let writer = WriterBuilder::new().from_path(file)?;
1332-
Some((writer, res.batch_size))
1333-
},
1334-
None => None,
1335-
};
1326+
let mut writer = None;
1327+
if let Some(w) = write_results {
1328+
// File name contains time that simulation was started.
1329+
let file_name = format!(
1330+
"simulation_{:?}.csv",
1331+
clock
1332+
.now()
1333+
.duration_since(SystemTime::UNIX_EPOCH)?
1334+
.as_secs()
1335+
);
13361336

1337-
let mut counter = 1;
1337+
writer = Some(BatchedWriter::new(w.results_dir, file_name, w.batch_size)?);
1338+
}
13381339

13391340
loop {
13401341
select! {
13411342
biased;
13421343
_ = listener.clone() => {
1343-
writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| {
1344-
SimulationError::FileError
1345-
}))?;
1346-
return Ok(());
1344+
return writer.map_or(Ok(()), |mut w| w.write(true));
13471345
},
13481346
payment_result = receiver.recv() => {
13491347
match payment_result {
13501348
Some((details, result)) => {
13511349
logger.lock().await.report_result(&details, &result);
13521350
log::trace!("Resolved dispatched payment: {} with: {}.", details, result);
13531351

1354-
if let Some((ref mut w, batch_size)) = writer {
1355-
w.serialize((details, result)).map_err(|e| {
1356-
let _ = w.flush();
1357-
SimulationError::CsvError(e)
1358-
})?;
1359-
counter = counter % batch_size + 1;
1360-
if batch_size == counter {
1361-
w.flush().map_err(|_| {
1362-
SimulationError::FileError
1363-
})?;
1364-
}
1352+
if let Some(ref mut w) = writer{
1353+
w.queue((details, result))?;
13651354
}
13661355
},
1367-
None => return writer.map_or(Ok(()), |(ref mut w, _)| w.flush().map_err(|_| SimulationError::FileError)),
1356+
None => return writer.map_or(Ok(()), |mut w| w.write(true)),
13681357
}
13691358
}
13701359
}

0 commit comments

Comments
 (0)