Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,39 @@ println!("Estimated unique words: {}", estimate as usize);

The builder validates parameters and provides clear error messages for invalid inputs.

## Streaming Interface

For processing iterators directly, you can use the streaming methods:

```rust
use cvmcount::{CVM, EstimateDistinct};

// Process an entire iterator with CVM instance
let mut cvm: CVM<i32> = CVM::builder().epsilon(0.05).build().unwrap();
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
let estimate = cvm.process_stream(numbers);

// Or use the iterator extension trait for one-liners
let estimate = (1..=1000)
.cycle()
.take(10_000)
.estimate_distinct_count(0.1, 0.1, 10_000);

// With builder pattern
let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();

// When working with borrowed data, map to owned explicitly
let borrowed_words = vec!["hello", "world", "hello"];
let estimate = borrowed_words
.iter()
.map(|s| s.to_string())
.estimate_distinct_count(0.1, 0.1, 1000);
```

The streaming interface accepts owned values to avoid cloning within the algorithm, making the ownership requirements explicit.

## Analysis

![](cvmcount.png)
Expand Down
228 changes: 227 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,118 @@ impl<T: Ord> CVM<T> {
let rng = &mut self.rng;
self.buf.retain(|_| rng.gen_bool(0.5));
}
/// Process an entire iterator of owned values and return the final estimate
///
/// This is a convenience method that processes all elements from an iterator
/// and returns the final count estimate. The iterator must yield owned values
/// that the CVM can take ownership of.
///
/// # Examples
///
/// ```
/// use cvmcount::CVM;
///
/// // Process owned strings
/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
/// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
/// let estimate = cvm.process_stream(words);
/// assert!(estimate > 0.0);
///
/// // Process numeric data
/// let numbers = vec![1, 2, 3, 2, 1, 4];
/// let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();
/// let estimate = cvm.process_stream(numbers);
/// assert!(estimate > 0.0);
///
/// // When you have borrowed data, clone explicitly
/// let borrowed_words = vec!["hello", "world", "hello"];
/// let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();
/// let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
/// assert!(estimate > 0.0);
/// ```
pub fn process_stream<I>(&mut self, iter: I) -> f64
where
I: IntoIterator<Item = T>,
{
for item in iter {
self.process_element(item);
}
self.calculate_final_result()
}

/// Calculate the current unique element count. You can continue to add elements after calling this method.
pub fn calculate_final_result(&self) -> f64 {
self.buf.len() as f64 / self.probability
}
}

/// Extension trait for iterators to estimate distinct count directly
///
/// This trait provides convenient methods to estimate distinct counts from iterators
/// without manually creating and managing a CVM instance.
///
/// # Examples
///
/// ```
/// use cvmcount::{CVM, EstimateDistinct};
///
/// // Simple usage with default parameters
/// let numbers = vec![1, 2, 3, 2, 1, 4, 5];
/// let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
/// assert!(estimate > 0.0);
///
/// // Using builder pattern for more control
/// let words = vec!["hello".to_string(), "world".to_string(), "hello".to_string()];
/// let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
/// let estimate = words.into_iter().estimate_distinct_with_builder(builder).unwrap();
/// assert!(estimate > 0.0);
/// ```
pub trait EstimateDistinct<T: Ord>: Iterator<Item = T> + Sized {
/// Estimate distinct count using the CVM algorithm with specified parameters
///
/// # Arguments
///
/// * `epsilon` - Accuracy requirement (smaller = more accurate)
/// * `delta` - Failure probability (smaller = more confident)
/// * `estimated_size` - Rough estimate of total stream size
///
/// # Returns
///
/// The estimated number of distinct elements
fn estimate_distinct_count(self, epsilon: f64, delta: f64, estimated_size: usize) -> f64 {
let mut cvm = CVM::new(epsilon, delta, estimated_size);
cvm.process_stream(self)
}

/// Estimate distinct count using a builder for more ergonomic configuration
///
/// # Arguments
///
/// * `builder` - A configured CVMBuilder instance
///
/// # Returns
///
/// Result containing the estimated number of distinct elements or an error message
///
/// # Examples
///
/// ```
/// use cvmcount::{CVM, EstimateDistinct};
///
/// let data = vec![1, 2, 3, 2, 1];
/// let builder = CVM::<i32>::builder().epsilon(0.05).confidence(0.99);
/// let estimate = data.into_iter().estimate_distinct_with_builder(builder).unwrap();
/// assert!(estimate > 0.0);
/// ```
fn estimate_distinct_with_builder(self, builder: CVMBuilder) -> Result<f64, String> {
let mut cvm: CVM<T> = builder.build()?;
Ok(cvm.process_stream(self))
}
}

/// Implement EstimateDistinct for all iterators that yield Ord types
impl<T: Ord, I: Iterator<Item = T>> EstimateDistinct<T> for I {}

// Calculate threshold (buf_size) value for the F0-Estimator algorithm
fn buffer_size(epsilon: f64, delta: f64, stream_size: usize) -> usize {
((12.0 / epsilon.powf(2.0)) * ((8.0 * stream_size as f64) / delta).log2()).ceil() as usize
Expand All @@ -273,7 +379,7 @@ mod tests {
path::Path,
};

use super::{CVM, ConfidenceSpec};
use super::{CVM, ConfidenceSpec, EstimateDistinct};
use regex::Regex;
use std::collections::HashSet;

Expand Down Expand Up @@ -405,4 +511,124 @@ mod tests {
let delta_spec = ConfidenceSpec::Delta(0.05);
assert!((delta_spec.to_delta() - 0.05).abs() < f64::EPSILON);
}

#[test]
fn test_process_stream() {
let mut cvm: CVM<i32> = CVM::<i32>::builder().build().unwrap();

// Test with vector
let numbers = vec![1, 2, 3, 2, 1, 4, 5, 3];
let estimate = cvm.process_stream(numbers);
assert!(estimate > 0.0);

// Test with range
let mut cvm2: CVM<i32> = CVM::<i32>::builder().build().unwrap();
let estimate2 = cvm2.process_stream(1..=100);
assert!(estimate2 > 0.0);
}

#[test]
fn test_process_stream_strings() {
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();

// Test with owned strings
let words = vec![
"hello".to_string(),
"world".to_string(),
"hello".to_string(),
"rust".to_string(),
];
let estimate = cvm.process_stream(words);
assert!(estimate > 0.0);
}

#[test]
fn test_process_stream_with_map() {
let mut cvm: CVM<String> = CVM::<String>::builder().build().unwrap();

// Test with borrowed data mapped to owned
let borrowed_words = ["hello", "world", "hello", "rust"];
let estimate = cvm.process_stream(borrowed_words.iter().map(|s| s.to_string()));
assert!(estimate > 0.0);
}

#[test]
fn test_estimate_distinct_trait() {
// Test simple usage
let numbers = vec![1, 2, 3, 2, 1, 4, 5];
let estimate = numbers.into_iter().estimate_distinct_count(0.1, 0.1, 1000);
assert!(estimate > 0.0);

// Test with builder
let words = vec![
"hello".to_string(),
"world".to_string(),
"hello".to_string(),
];
let builder = CVM::<String>::builder().epsilon(0.05).confidence(0.99);
let estimate = words
.into_iter()
.estimate_distinct_with_builder(builder)
.unwrap();
assert!(estimate > 0.0);
}

#[test]
fn test_estimate_distinct_with_cloning() {
// Test that explicit cloning works as expected
let borrowed_numbers = [1, 2, 3, 2, 1, 4];
let estimate = borrowed_numbers
.iter()
.cloned()
.estimate_distinct_count(0.1, 0.1, 100);
assert!(estimate > 0.0);
}

#[test]
fn test_streaming_integration_with_file_processing() {
// Simulate file processing pattern
let lines = vec![
"hello world".to_string(),
"world peace".to_string(),
"hello rust".to_string(),
];

let mut cvm: CVM<String> = CVM::<String>::builder()
.epsilon(0.1)
.confidence(0.9)
.build()
.unwrap();

// Process words from all lines
let words: Vec<String> = lines
.into_iter()
.flat_map(|line| {
line.split_whitespace()
.map(|s| s.to_string())
.collect::<Vec<_>>()
})
.collect();
let estimate = cvm.process_stream(words);

assert!(estimate > 0.0);
}

#[test]
fn test_streaming_large_dataset() {
// Test with a larger dataset to verify the algorithm works
let mut cvm: CVM<i32> = CVM::<i32>::builder()
.epsilon(0.1)
.confidence(0.9)
.estimated_size(10_000)
.build()
.unwrap();

// Create data with known distinct count (1000 unique values, repeated)
let data: Vec<i32> = (0..1000).cycle().take(10_000).collect();
let estimate = cvm.process_stream(data);

// The estimate should be reasonably close to 1000
// With epsilon=0.1, we expect within 10 % accuracy most of the time
assert!(estimate > 500.0 && estimate < 2000.0);
}
}