|
1 | 1 | //! Optional ADIF data transformations |
2 | 2 |
|
3 | | -use crate::{Error, Record}; |
| 3 | +use crate::{Datum, Error, Record}; |
4 | 4 | use chrono::{Days, NaiveDateTime}; |
5 | 5 | use futures::stream::Stream; |
6 | 6 | use std::collections::HashSet; |
@@ -99,6 +99,48 @@ pub trait FilterExt: Stream { |
99 | 99 |
|
100 | 100 | impl<S> FilterExt for S where S: Stream {} |
101 | 101 |
|
| 102 | +/// Stream adapter that transforms each record by consuming and rebuilding it. |
| 103 | +pub struct Map<S, F> { |
| 104 | + stream: S, |
| 105 | + f: F, |
| 106 | +} |
| 107 | + |
| 108 | +impl<S, F> Stream for Map<S, F> |
| 109 | +where |
| 110 | + S: Stream<Item = Result<Record, Error>> + Unpin, |
| 111 | + F: FnMut(Record) -> Result<Record, Error> + Unpin, |
| 112 | +{ |
| 113 | + type Item = Result<Record, Error>; |
| 114 | + |
| 115 | + fn poll_next( |
| 116 | + self: Pin<&mut Self>, cx: &mut Context<'_>, |
| 117 | + ) -> Poll<Option<Self::Item>> { |
| 118 | + let this = self.get_mut(); |
| 119 | + match Pin::new(&mut this.stream).poll_next(cx) { |
| 120 | + Poll::Ready(Some(Ok(record))) => { |
| 121 | + Poll::Ready(Some((this.f)(record))) |
| 122 | + } |
| 123 | + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), |
| 124 | + Poll::Ready(None) => Poll::Ready(None), |
| 125 | + Poll::Pending => Poll::Pending, |
| 126 | + } |
| 127 | + } |
| 128 | +} |
| 129 | + |
| 130 | +/// Extension trait providing the `map` method on streams. |
| 131 | +pub trait MapExt: Stream { |
| 132 | + /// Transform each record by consuming and rebuilding it. |
| 133 | + fn map<F>(self, f: F) -> Map<Self, F> |
| 134 | + where |
| 135 | + Self: Sized, |
| 136 | + F: FnMut(Record) -> Result<Record, Error>, |
| 137 | + { |
| 138 | + Map { stream: self, f } |
| 139 | + } |
| 140 | +} |
| 141 | + |
| 142 | +impl<S> MapExt for S where S: Stream {} |
| 143 | + |
102 | 144 | /// Normalize date and time fields from multiple possible source fields into |
103 | 145 | /// combined datetime values. |
104 | 146 | /// |
@@ -282,3 +324,35 @@ where |
282 | 324 | { |
283 | 325 | stream.filter(|record| !record.is_header()) |
284 | 326 | } |
| 327 | + |
| 328 | +/// Trim leading and trailing whitespace from string field values. |
| 329 | +/// |
| 330 | +/// Non-string datum variants pass through unchanged. |
| 331 | +/// |
| 332 | +/// ``` |
| 333 | +/// use difa::{ |
| 334 | +/// Record, RecordStreamExt, TagDecoder, filter::trim_whitespace, |
| 335 | +/// }; |
| 336 | +/// use futures::StreamExt; |
| 337 | +/// |
| 338 | +/// # tokio_test::block_on(async { |
| 339 | +/// let data = b"<call:6> W1AW <eor>"; |
| 340 | +/// let stream = TagDecoder::new_stream(&data[..], true).records(); |
| 341 | +/// let mut stream = trim_whitespace(stream); |
| 342 | +/// let record = stream.next().await.unwrap().unwrap(); |
| 343 | +/// assert_eq!(record.get("call").unwrap().as_str(), "W1AW"); |
| 344 | +/// # }); |
| 345 | +/// ``` |
| 346 | +pub fn trim_whitespace<S>( |
| 347 | + stream: S, |
| 348 | +) -> Map<S, impl FnMut(Record) -> Result<Record, Error>> |
| 349 | +where |
| 350 | + S: Stream<Item = Result<Record, Error>>, |
| 351 | +{ |
| 352 | + stream.map(|r| { |
| 353 | + Ok(r.map_fields(|_, v| match v { |
| 354 | + Datum::String(s) => Datum::String(s.trim().to_string()), |
| 355 | + other => other, |
| 356 | + })) |
| 357 | + }) |
| 358 | +} |
0 commit comments