Skip to content

Commit 71965fa

Browse files
committed
Streaming / Iterator interface
1 parent 87c837b commit 71965fa

File tree

2 files changed

+260
-11
lines changed

2 files changed

+260
-11
lines changed

README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,39 @@ println!("Estimated unique words: {}", estimate as usize);
9999

100100
The builder validates parameters and provides clear error messages for invalid inputs.
101101

102+
## Streaming Interface
103+
104+
For processing iterators directly, you can use the streaming methods:
105+
106+
```rust
107+
use cvmcount::{CVM, EstimateDistinct};
108+
109+
// Process an entire iterator with CVM instance
110+
let mut cvm: CVM<i32> = CVM::builder().epsilon(0.05).build().unwrap();
111+
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
112+
let estimate = cvm.process_stream(numbers);
113+
114+
// Or use the iterator extension trait for one-liners
115+
let estimate = (1..=1000)
116+
.cycle()
117+
.take(10_000)
118+
.estimate_distinct_count(0.1, 0.1, 10_000);
119+
120+
// With builder pattern
121+
let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
122+
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
123+
let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
124+
125+
// When working with borrowed data, map to owned explicitly
126+
let borrowed_words = vec!["hello", "world", "hello"];
127+
let estimate = borrowed_words
128+
.iter()
129+
.map(|s| s.to_string())
130+
.estimate_distinct_count(0.1, 0.1, 1000);
131+
```
132+
133+
The streaming interface accepts owned values to avoid cloning within the algorithm, making the ownership requirements explicit.
134+
102135
## Analysis
103136

104137
![](cvmcount.png)

src/lib.rs

Lines changed: 227 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -73,23 +73,13 @@ impl ConfidenceSpec {
7373
/// .build()
7474
/// .unwrap();
7575
/// ```
76-
#[derive(Debug, Clone)]
76+
#[derive(Debug, Clone, Default)]
7777
pub struct CVMBuilder {
7878
epsilon: Option<f64>,
7979
confidence_spec: Option<ConfidenceSpec>,
8080
stream_size: Option<usize>,
8181
}
8282

83-
impl Default for CVMBuilder {
84-
fn default() -> Self {
85-
Self {
86-
epsilon: None,
87-
confidence_spec: None,
88-
stream_size: None,
89-
}
90-
}
91-
}
92-
9383
impl CVMBuilder {
9484
/// Create a new builder with default values
9585
pub fn new() -> Self {
@@ -264,12 +254,118 @@ impl<T: Ord> CVM<T> {
264254
let rng = &mut self.rng;
265255
self.buf.retain(|_| rng.gen_bool(0.5));
266256
}
257+
/// Process an entire iterator of owned values and return the final estimate
258+
///
259+
/// This is a convenience method that processes all elements from an iterator
260+
/// and returns the final count estimate. The iterator must yield owned values
261+
/// that the CVM can take ownership of.
262+
///
263+
/// # Examples
264+
///
265+
/// ```
266+
/// use cvmcount::CVM;
267+
///
268+
/// // Process owned strings
269+
/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
270+
/// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
271+
/// let estimate = cvm.process_stream(words);
272+
/// assert!(estimate > 0.0);
273+
///
274+
/// // Process numeric data
275+
/// let numbers = vec![1, 2, 3, 2, 1, 4];
276+
/// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
277+
/// let estimate = cvm.process_stream(numbers);
278+
/// assert!(estimate > 0.0);
279+
///
280+
/// // When you have borrowed data, clone explicitly
281+
/// let borrowed_words = vec!["hello", "world", "hello"];
282+
/// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
283+
/// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
284+
/// assert!(estimate > 0.0);
285+
/// ```
286+
pub fn process_stream<I>(&mut self, iter: I) -> f64
287+
where
288+
I: IntoIterator<Item = T>,
289+
{
290+
for item in iter {
291+
self.process_element(item);
292+
}
293+
self.calculate_final_result()
294+
}
295+
267296
/// Calculate the current unique element count. You can continue to add elements after calling this method.
268297
pub fn calculate_final_result(&self) -> f64 {
269298
self.buf.len() as f64 / self.probability
270299
}
271300
}
272301

302+
/// Extension trait for iterators to estimate distinct count directly
303+
///
304+
/// This trait provides convenient methods to estimate distinct counts from iterators
305+
/// without manually creating and managing a CVM instance.
306+
///
307+
/// # Examples
308+
///
309+
/// ```
310+
/// use cvmcount::{CVM, EstimateDistinct};
311+
///
312+
/// // Simple usage with default parameters
313+
/// let numbers = vec![1, 2, 3, 2, 1, 4, 5];
314+
/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
315+
/// assert!(estimate > 0.0);
316+
///
317+
/// // Using builder pattern for more control
318+
/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
319+
/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
320+
/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
321+
/// assert!(estimate > 0.0);
322+
/// ```
323+
pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized {
324+
/// Estimate distinct count using the CVM algorithm with specified parameters
325+
///
326+
/// # Arguments
327+
///
328+
/// * `epsilon` - Accuracy requirement (smaller = more accurate)
329+
/// * `delta` - Failure probability (smaller = more confident)
330+
/// * `estimated_size` - Rough estimate of total stream size
331+
///
332+
/// # Returns
333+
///
334+
/// The estimated number of distinct elements
335+
fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 {
336+
let mut cvm = CVM::new(epsilon, delta, estimated_size);
337+
cvm.process_stream(self)
338+
}
339+
340+
/// Estimate distinct count using a builder for more ergonomic configuration
341+
///
342+
/// # Arguments
343+
///
344+
/// * `builder` - A configured CVMBuilder instance
345+
///
346+
/// # Returns
347+
///
348+
/// Result containing the estimated number of distinct elements or an error message
349+
///
350+
/// # Examples
351+
///
352+
/// ```
353+
/// use cvmcount::{CVM, EstimateDistinct};
354+
///
355+
/// let data = vec![1, 2, 3, 2, 1];
356+
/// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99);
357+
/// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap();
358+
/// assert!(estimate > 0.0);
359+
/// ```
360+
fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> {
361+
let mut cvm: CVM<T> = builder.build()?;
362+
Ok(cvm.process_stream(self))
363+
}
364+
}
365+
366+
/// Implement EstimateDistinct for all iterators that yield Ord types
367+
impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {}
368+
273369
// Calculate threshold (buf_size) value for the F0-Estimator algorithm
274370
fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
275371
((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
@@ -415,4 +511,124 @@ mod tests {
415511
let delta_spec = ConfidenceSpec::Delta(0.05);
416512
assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
417513
}
514+
515+
#[test]
516+
fn test_process_stream() {
517+
let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
518+
519+
// Test with vector
520+
let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3];
521+
let estimate = cvm.process_stream(numbers);
522+
assert!(estimate > 0.0);
523+
524+
// Test with range
525+
let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap();
526+
let estimate2 = cvm2.process_stream(1..=100);
527+
assert!(estimate2 > 0.0);
528+
}
529+
530+
#[test]
531+
fn test_process_stream_strings() {
532+
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
533+
534+
// Test with owned strings
535+
let words = vec![
536+
"hello".to_string(),
537+
"world".to_string(),
538+
"hello".to_string(),
539+
"rust".to_string(),
540+
];
541+
let estimate = cvm.process_stream(words);
542+
assert!(estimate > 0.0);
543+
}
544+
545+
#[test]
546+
fn test_process_stream_with_map() {
547+
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
548+
549+
// Test with borrowed data mapped to owned
550+
let borrowed_words = ["hello", "world", "hello", "rust"];
551+
let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
552+
assert!(estimate > 0.0);
553+
}
554+
555+
#[test]
556+
fn test_estimate_distinct_trait() {
557+
// Test simple usage
558+
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
559+
let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
560+
assert!(estimate > 0.0);
561+
562+
// Test with builder
563+
let words = vec![
564+
"hello".to_string(),
565+
"world".to_string(),
566+
"hello".to_string(),
567+
];
568+
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
569+
let estimate = words
570+
.into_iter()
571+
.estimate_distinct_with_builder(builder)
572+
.unwrap();
573+
assert!(estimate > 0.0);
574+
}
575+
576+
#[test]
577+
fn test_estimate_distinct_with_cloning() {
578+
// Test that explicit cloning works as expected
579+
let borrowed_numbers = [1, 2, 3, 2, 1, 4];
580+
let estimate = borrowed_numbers
581+
.iter()
582+
.cloned()
583+
.estimate_distinct_count(0.1, 0.1, 100);
584+
assert!(estimate > 0.0);
585+
}
586+
587+
#[test]
588+
fn test_streaming_integration_with_file_processing() {
589+
// Simulate file processing pattern
590+
let lines = vec![
591+
"hello world".to_string(),
592+
"world peace".to_string(),
593+
"hello rust".to_string(),
594+
];
595+
596+
let mut cvm: CVM<String> = CVM::<String>::builder()
597+
.epsilon(0.1)
598+
.confidence(0.9)
599+
.build()
600+
.unwrap();
601+
602+
// Process words from all lines
603+
let words: Vec<String> = lines
604+
.into_iter()
605+
.flat_map(|line| {
606+
line.split_whitespace()
607+
.map(|s| s.to_string())
608+
.collect::<Vec<_>>()
609+
})
610+
.collect();
611+
let estimate = cvm.process_stream(words);
612+
613+
assert!(estimate > 0.0);
614+
}
615+
616+
#[test]
617+
fn test_streaming_large_dataset() {
618+
// Test with a larger dataset to verify the algorithm works
619+
let mut cvm: CVM<i32> = CVM::<i32>::builder()
620+
.epsilon(0.1)
621+
.confidence(0.9)
622+
.estimated_size(10_000)
623+
.build()
624+
.unwrap();
625+
626+
// Create data with known distinct count (1000 unique values, repeated)
627+
let data: Vec<i32> = (0..1000).cycle().take(10_000).collect();
628+
let estimate = cvm.process_stream(data);
629+
630+
// The estimate should be reasonably close to 1000
631+
// With epsilon=0.1, we expect within 10 % accuracy most of the time
632+
assert!(estimate > 500.0 && estimate < 2000.0);
633+
}
418634
}

0 commit comments

Comments
 (0)