diff --git a/benches/byte_order.rs b/benches/byte_order.rs index 5e38929..9584e13 100644 --- a/benches/byte_order.rs +++ b/benches/byte_order.rs @@ -14,6 +14,7 @@ fn get_test_request_data() -> RequestData { offset: None, size: None, shape: None, + axis: reductionist::models::ReductionAxes::All, order: None, selection: None, compression: None, diff --git a/benches/operations.rs b/benches/operations.rs index da4f2fa..79830be 100644 --- a/benches/operations.rs +++ b/benches/operations.rs @@ -19,6 +19,7 @@ fn get_test_request_data() -> RequestData { offset: None, size: None, shape: None, + axis: reductionist::models::ReductionAxes::All, order: None, selection: None, compression: None, @@ -34,7 +35,7 @@ fn criterion_benchmark(c: &mut Criterion) { let size = size_k * 1024; let data: Vec = (0_i64..size).map(|i| i % 256).collect::>(); let data: Vec = data.as_bytes().into(); - let missings = vec![ + let missing_types = vec![ None, Some(Missing::MissingValue(42.into())), Some(Missing::MissingValues(vec![42.into()])), @@ -50,7 +51,7 @@ fn criterion_benchmark(c: &mut Criterion) { ("sum", Box::new(operations::Sum::execute)), ]; for (op_name, execute) in operations { - for missing in missings.clone() { + for missing in missing_types.clone() { let name = format!("{}({}, {:?})", op_name, size, missing); c.bench_function(&name, |b| { b.iter(|| { diff --git a/docs/api.md b/docs/api.md index 13d8153..933a219 100644 --- a/docs/api.md +++ b/docs/api.md @@ -37,6 +37,11 @@ The request body should be a JSON object of the form: // - optional, defaults to a simple 1D array "shape": [20, 5], + // The axis or axes over which to perform the reduction operation + // - optional, can be either a single axis or list of axes, defaults + // to a reduction over all axes + "axis": 0, + // Indicates whether the data is in C order (row major) // or Fortran order (column major, indicated by 'F') // - optional, defaults to 'C' @@ -78,10 +83,10 @@ Unauthenticated access to S3 is possible by omitting the basic auth header. On success, all operations return HTTP 200 OK with the response using the same datatype as specified in the request except for `count` which always returns the result as `int64`. The server returns the following headers with the HTTP response: -* `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`. -* `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`. -* `x-activestorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result. -* `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks. +- `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`. +- `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`. +- `x-activestorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result. +- `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks. On error, an HTTP 4XX (client) or 5XX (server) response code will be returned, with the response body being a JSON object of the following format: diff --git a/docs/architecture.md b/docs/architecture.md index ce42939..e52640e 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -7,10 +7,10 @@ Reductionist is built on top of a number of popular open source components. A few properties make it relatively easy to build a conceptual mental model of how Reductionist works. -* All operations share the same request processing pipeline. -* The request processing pipeline for each request is a fairly linear sequence of steps. -* There is no persistent state. -* The only external service that is interacted with is an S3-compatible object store. +- All operations share the same request processing pipeline. +- The request processing pipeline for each request is a fairly linear sequence of steps. +- There is no persistent state. +- The only external service that is interacted with is an S3-compatible object store. The more challenging aspects of the system are the lower level details of asynchronous programming, memory management, the Rust type system and working with multi-dimensional arrays. @@ -29,7 +29,6 @@ A diagram of this step for the sum operation is shown in Figure 2.
Figure 2: Sum operation flow diagram
- ## Axum web server [Axum](https://docs.rs/axum) is an asynchronous web framework that performs well in [various benchmarks](https://github.com/programatik29/rust-web-benchmarks/blob/master/result/hello-world.md) and is built on top of various popular components, including the [hyper](https://hyper.rs/) HTTP library. @@ -110,13 +109,11 @@ Each operation is implemented by a struct that implements the `NumOperation` tra For example, the sum operation is implemented by the `Sum` struct in `src/operations.rs`. The `Sum` struct's `execute_t` method does the following: -* Zero copy conversion of the byte array to a multi-dimensional [ndarray::ArrayView](https://docs.rs/ndarray/latest/ndarray/type.ArrayView.html) object of the data type, shape and byte order specified in the request data -* If a selection was specified in the request data, create a sliced `ndarray::ArrayView` onto the original array view -* If missing data was specified in the request data: - * Create an iterator over the array view that filters out missing data, performs the sum operation and counts non-missing elements -* Otherwise: - * Use the array view's native `sum` and `len` methods to take the sum and element count -* Convert the sum to a byte array and return with the element count +- Zero copy conversion of the byte array to a multi-dimensional [ndarray::ArrayView](https://docs.rs/ndarray/latest/ndarray/type.ArrayView.html) object of the data type, shape and byte order specified in the request data +- If a selection was specified in the request data, create a sliced `ndarray::ArrayView` onto the original array view +- Checks whether the reduction should be performed over all or only a subset of the sliced data's axes +- Performs a fold over each of the requested axes to calculate the required reduction while ignoring any specified missing data +- Convert the sum to a byte array and return with the element count The procedure for other operations varies slightly but generally follows the same pattern. @@ -136,9 +133,9 @@ Reductionist supports optional restriction of resource usage. This is implemented in `src/resource_manager.rs` using [Tokio Semaphores](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html). This allows Reductionist to limit the quantity of various resources used at any time: -* S3 connections -* memory used for numeric data (this is more of a rough guide than a perfect limit) -* threads used for CPU-bound work +- S3 connections +- memory used for numeric data (this is more of a rough guide than a perfect limit) +- threads used for CPU-bound work ## CPU-bound work @@ -158,9 +155,9 @@ The second approach may leave the server more responsive if more CPU-heavy opera Prometheus metrics are implemented in `src/metrics.rs` and are exposed by the Reductionist API under the `/metrics` path. These include: -* incoming requests (counter) -* outgoing response (counter) -* response time (histogram) +- incoming requests (counter) +- outgoing response (counter) +- response time (histogram) ## Tracing and profiling diff --git a/docs/pyactivestorage.md b/docs/pyactivestorage.md index 3755991..a7783d1 100644 --- a/docs/pyactivestorage.md +++ b/docs/pyactivestorage.md @@ -3,34 +3,36 @@ Reductionist has been integrated with the [PyActiveStorage](https://github.com/valeriupredoi/PyActiveStorage) library, and PyActiveStorage acts as a client of the Reductionist server. PyActiveStorage currently works with data in netCDF4 format, and is able to perform reductions on a variable within such a dataset. Numerical operations are performed on individual storage chunks, with the results later aggregated. -The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implementated in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist. +The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implemented in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist. The following code snippet shows the `reduce_chunk` function signature. ```python def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None): - """ We do our own read of chunks and decoding etc - - rfile - the actual file with the data + """ We do our own read of chunks and decoding etc + + rfile - the actual file with the data offset, size - where and what we want ... compression - optional `numcodecs.abc.Codec` compression codec filters - optional list of `numcodecs.abc.Codec` filter codecs - dtype - likely float32 in most cases. - shape - will be a tuple, something like (3,3,1), this is the dimensionality of the + dtype - likely float32 in most cases. + shape - will be a tuple, something like (3,3,1), this is the dimensionality of the chunk itself order - typically 'C' for c-type ordering chunk_selection - python slice tuples for each dimension, e.g. (slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1)) this defines the part of the chunk which is to be obtained or operated upon. - method - computation desired - (in this Python version it's an actual method, in + method - computation desired + (in this Python version it's an actual method, in storage implementations we'll change to controlled vocabulary) - + """ ``` For Reductionist, the `reduce_chunk` function signature in `activestorage/reductionist.py` is similar, but replaces the local file path with a `requests.Session` object, the Reductionist server URL, S3-compatible object store URL, and the bucket and object containing the data. + + ```python def reduce_chunk(session, server, source, bucket, object, offset, size, compression, filters, missing, dtype, shape, @@ -64,11 +66,11 @@ def reduce_chunk(session, server, source, bucket, object, Within the `reduce_chunk` implementation for Reductionist, the following steps are taken: -* build Reductionist API request data -* build Reductionist API URL -* perform an HTTP(S) POST request to Reductionist -* on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers -* on failure, raise a `ReductionistError` with the response status code and JSON encoded error response +- build Reductionist API request data +- build Reductionist API URL +- perform an HTTP(S) POST request to Reductionist +- on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers +- on failure, raise a `ReductionistError` with the response status code and JSON encoded error response The use of a `requests.Session` object allows for TCP connection pooling, reducing connection overhead when multiple requests are made within a short timeframe. @@ -77,9 +79,9 @@ It should be possible to provide a unified interface to storage systems by abstr Other changes to the main `activestorage.Active` class were necessary for integration of Reductionist. These include: -* Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries -* Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket -* Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend -* Use of a thread pool to execute storage chunk reductions in parallel -* Unit tests to cover new and modified code -* Integration test changes to allow running against a POSIX or S3 storage backend +- Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries +- Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket +- Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend +- Use of a thread pool to execute storage chunk reductions in parallel +- Unit tests to cover new and modified code +- Integration test changes to allow running against a POSIX or S3 storage backend diff --git a/scripts/client.py b/scripts/client.py index bedf768..cc6e78b 100644 --- a/scripts/client.py +++ b/scripts/client.py @@ -36,6 +36,7 @@ def get_args() -> argparse.Namespace: parser.add_argument("--offset", type=int) parser.add_argument("--size", type=int) parser.add_argument("--shape", type=str) + parser.add_argument("--axis", type=str) parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing parser.add_argument("--selection", type=str) parser.add_argument("--compression", type=str) @@ -72,6 +73,8 @@ def build_request_data(args: argparse.Namespace) -> dict: request_data["byte_order"] = args.byte_order if args.shape: request_data["shape"] = json.loads(args.shape) + if args.axis is not None: + request_data["axis"] = json.loads(args.axis) if args.selection: request_data["selection"] = json.loads(args.selection) if args.compression: @@ -113,11 +116,16 @@ def display(response, verbose=False): #print(response.content) dtype = response.headers['x-activestorage-dtype'] shape = json.loads(response.headers['x-activestorage-shape']) - result = np.frombuffer(response.content, dtype=dtype) - result = result.reshape(shape) + counts = json.loads(response.headers['x-activestorage-count']) + counts = np.array(counts) + if len(counts) > 1: + counts = counts.reshape(shape) + result = np.frombuffer(response.content, dtype=dtype).reshape(shape) if verbose: + sep = "\n" if len(counts.shape) > 1 else " " print("\nResponse headers:", response.headers) - print("\nResult:", result) + print("\nNon-missing count(s):", counts, sep=sep) + print("\nResult:", result, sep=sep) else: print(result) diff --git a/scripts/parallel-client.py b/scripts/parallel-client.py index 6d5b5de..88d1139 100644 --- a/scripts/parallel-client.py +++ b/scripts/parallel-client.py @@ -48,6 +48,7 @@ def get_args() -> argparse.Namespace: parser.add_argument("--offset", type=int) parser.add_argument("--size", type=int) parser.add_argument("--shape", type=str) + parser.add_argument("--axis", type=str) parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing parser.add_argument("--selection", type=str) parser.add_argument("--compression", type=str) @@ -90,6 +91,8 @@ def build_request_data(args: argparse.Namespace) -> dict: request_data["byte_order"] = args.byte_order if args.shape: request_data["shape"] = json.loads(args.shape) + if args.axis is not None: + request_data["axis"] = json.loads(args.axis) if args.selection: request_data["selection"] = json.loads(args.selection) if args.compression: diff --git a/scripts/upload_sample_data.py b/scripts/upload_sample_data.py index 6273074..f23bc14 100644 --- a/scripts/upload_sample_data.py +++ b/scripts/upload_sample_data.py @@ -6,12 +6,12 @@ import s3fs import zlib -NUM_ITEMS = 10 +NUM_ITEMS = 12 OBJECT_PREFIX = "data" COMPRESSION_ALGS = [None, "gzip", "zlib"] FILTER_ALGS = [None, "shuffle"] -#Use enum which also subclasses string type so that +# Use enum which also subclasses string type so that # auto-generated OpenAPI schema can determine allowed dtypes class AllowedDatatypes(str, Enum): """ Data types supported by active storage proxy """ @@ -31,7 +31,7 @@ def n_bytes(self): s3_fs = s3fs.S3FileSystem(key='minioadmin', secret='minioadmin', client_kwargs={'endpoint_url': S3_URL}) bucket = pathlib.Path('sample-data') -#Make sure s3 bucket exists +# Make sure s3 bucket exists try: s3_fs.mkdir(bucket) except FileExistsError: diff --git a/src/models.rs b/src/models.rs index 2c1472c..76f8cfc 100644 --- a/src/models.rs +++ b/src/models.rs @@ -107,6 +107,16 @@ pub enum Filter { Shuffle { element_size: usize }, } +/// Axes over which to perform the reduction +#[derive(Debug, PartialEq, Default, Deserialize)] +#[serde(rename_all = "lowercase", untagged)] +pub enum ReductionAxes { + #[default] + All, + One(usize), + Multi(Vec), +} + /// Request data for operations #[derive(Debug, Deserialize, PartialEq, Validate)] #[serde(deny_unknown_fields)] @@ -136,6 +146,9 @@ pub struct RequestData { custom = "validate_shape" )] pub shape: Option>, + /// Axis or axes over which to perform the reduction operation + #[serde(default)] + pub axis: ReductionAxes, /// Order of the multi-dimensional array pub order: Option, /// Subset of the data to operate on @@ -222,6 +235,7 @@ fn validate_request_data(request_data: &RequestData) -> Result<(), ValidationErr validate_raw_size(*size, request_data.dtype, &request_data.shape)?; } }; + // Check selection is compatible with shape match (&request_data.shape, &request_data.selection) { (Some(shape), Some(selection)) => { validate_shape_selection(shape, selection)?; @@ -233,9 +247,55 @@ fn validate_request_data(request_data: &RequestData) -> Result<(), ValidationErr } _ => (), }; + // Check axis is compatible with shape + match (&request_data.shape, &request_data.axis) { + (Some(shape), ReductionAxes::One(axis)) => { + if *axis > shape.len() - 1 { + return Err(ValidationError::new("Reduction axis must be within shape")); + } + } + (Some(shape), ReductionAxes::Multi(axes)) => { + // Check we've not been given too many axes + if axes.len() >= shape.len() { + return Err(ValidationError::new( + "Number of reduction axes must be less than length of shape - to reduce over all axes omit the axis field completely", + )); + } + // Check axes are ordered correctly + // NOTE(sd109): We could mutate request data to sort the axes + // but it's also trivial to do on the Python client side + let mut sorted_axes = axes.clone(); + sorted_axes.sort(); + if &sorted_axes != axes { + return Err(ValidationError::new( + "Reduction axes must be provided in ascending order", + )); + } + // Check axes are valid for given shape + for ax in axes { + if *ax > shape.len() - 1 { + return Err(ValidationError::new( + "All reduction axes must be within shape", + )); + } + } + // Check we've not been given duplicate axes + for ax in axes { + if axes.iter().filter(|val| *val == ax).count() != 1 { + return Err(ValidationError::new("Reduction axes contains duplicates")); + } + } + } + (None, ReductionAxes::One(_) | ReductionAxes::Multi(_)) => { + return Err(ValidationError::new("Axis requires shape to be specified")); + } + (_, ReductionAxes::All) => (), + }; + // Validate missing specification if let Some(missing) = &request_data.missing { missing.validate(request_data.dtype)?; }; + Ok(()) } @@ -247,13 +307,14 @@ pub struct Response { pub dtype: DType, /// Shape of the response pub shape: Vec, - /// Number of non-missing elements operated on to generate response - pub count: i64, + /// Number of non-missing elements operated + /// along each reduction axis + pub count: Vec, } impl Response { /// Return a Response object - pub fn new(body: Bytes, dtype: DType, shape: Vec, count: i64) -> Response { + pub fn new(body: Bytes, dtype: DType, shape: Vec, count: Vec) -> Response { Response { body, dtype, @@ -331,9 +392,15 @@ mod tests { Token::U32(8), Token::Str("shape"), Token::Some, - Token::Seq { len: Some(2) }, + Token::Seq { len: Some(3) }, Token::U32(2), Token::U32(5), + Token::U32(1), + Token::SeqEnd, + Token::Str("axis"), + Token::Seq { len: Some(2) }, + Token::U32(1), + Token::U32(2), Token::SeqEnd, Token::Str("order"), Token::Some, @@ -342,7 +409,7 @@ mod tests { Token::Unit, Token::Str("selection"), Token::Some, - Token::Seq { len: Some(2) }, + Token::Seq { len: Some(3) }, Token::Seq { len: Some(3) }, Token::U32(1), Token::U32(2), @@ -353,6 +420,11 @@ mod tests { Token::U32(5), Token::U32(6), Token::SeqEnd, + Token::Seq { len: Some(3) }, + Token::U32(1), + Token::U32(1), + Token::U32(1), + Token::SeqEnd, Token::SeqEnd, Token::Str("compression"), Token::Some, @@ -572,7 +644,7 @@ mod tests { #[test] fn test_selection_end_lt_start() { - // Numpy sementics: start >= end yields an empty array + // Numpy semantics: start >= end yields an empty array let mut request_data = test_utils::get_test_request_data(); request_data.shape = Some(vec![1]); request_data.selection = Some(vec![Slice::new(1, 0, 1)]); @@ -617,7 +689,7 @@ mod tests { #[test] fn test_selection_start_gt_shape() { - // Numpy sementics: start > length yields an empty array + // Numpy semantics: start > length yields an empty array let mut request_data = test_utils::get_test_request_data(); request_data.shape = Some(vec![4]); request_data.selection = Some(vec![Slice::new(5, 5, 1)]); @@ -626,7 +698,7 @@ mod tests { #[test] fn test_selection_start_lt_negative_shape() { - // Numpy sementics: start < -length gets clamped to zero + // Numpy semantics: start < -length gets clamped to zero let mut request_data = test_utils::get_test_request_data(); request_data.shape = Some(vec![4]); request_data.selection = Some(vec![Slice::new(-5, 5, 1)]); @@ -659,6 +731,41 @@ mod tests { request_data.validate().unwrap() } + #[test] + #[should_panic(expected = "Axis requires shape to be specified")] + fn test_axis_without_shape() { + let mut request_data = test_utils::get_test_request_data(); + request_data.axis = ReductionAxes::One(1); + request_data.validate().unwrap() + } + + #[test] + #[should_panic(expected = "Reduction axis must be within shape")] + fn test_axis_gt_shape() { + let mut request_data = test_utils::get_test_request_data(); + request_data.axis = ReductionAxes::One(2); + request_data.shape = Some(vec![2, 5]); + request_data.validate().unwrap() + } + + #[test] + #[should_panic(expected = "Reduction axes must be provided in ascending order")] + fn test_axis_unsorted() { + let mut request_data = test_utils::get_test_request_data(); + request_data.axis = ReductionAxes::Multi(vec![1, 0]); + request_data.shape = Some(vec![2, 5, 1]); + request_data.validate().unwrap() + } + + #[test] + #[should_panic(expected = "Reduction axes contains duplicates")] + fn test_axis_duplicated() { + let mut request_data = test_utils::get_test_request_data(); + request_data.axis = ReductionAxes::Multi(vec![1, 1]); + request_data.shape = Some(vec![2, 5, 1]); + request_data.validate().unwrap() + } + #[test] fn test_invalid_compression() { assert_de_tokens_error::( @@ -731,7 +838,7 @@ mod tests { Token::Str("foo"), Token::StructEnd ], - "unknown field `foo`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `order`, `selection`, `compression`, `filters`, `missing`" + "unknown field `foo`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `axis`, `order`, `selection`, `compression`, `filters`, `missing`" ) } @@ -754,9 +861,10 @@ mod tests { "byte_order": "little", "offset": 4, "size": 8, - "shape": [2, 5], + "shape": [2, 5, 1], + "axis": [1, 2], "order": "C", - "selection": [[1, 2, 3], [4, 5, 6]], + "selection": [[1, 2, 3], [4, 5, 6], [1, 1, 1]], "compression": {"id": "gzip"}, "filters": [{"id": "shuffle", "element_size": 4}], "missing": {"missing_value": 42} @@ -776,6 +884,7 @@ mod tests { "offset": 4, "size": 8, "shape": [2, 5, 10], + "axis": 2, "order": "F", "selection": [[1, 2, 3], [4, 5, 6], [7, 8, 9]], "compression": {"id": "zlib"}, @@ -787,6 +896,7 @@ mod tests { expected.dtype = DType::Float64; expected.byte_order = Some(ByteOrder::Big); expected.shape = Some(vec![2, 5, 10]); + expected.axis = ReductionAxes::One(2); expected.order = Some(Order::F); expected.selection = Some(vec![ Slice::new(1, 2, 3), diff --git a/src/operation.rs b/src/operation.rs index 220f4cf..46ea54a 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -13,6 +13,7 @@ pub trait Element: + num_traits::FromPrimitive + num_traits::ToBytes + num_traits::Zero + + num_traits::Bounded + std::convert::From + std::fmt::Debug + std::iter::Sum @@ -34,6 +35,7 @@ impl Element for T where + num_traits::One + num_traits::ToBytes + num_traits::Zero + + num_traits::Bounded + std::convert::From + std::fmt::Debug + std::iter::Sum @@ -111,7 +113,7 @@ mod tests { data.into(), request_data.dtype, vec![3], - 3, + vec![3], )) } } @@ -125,7 +127,7 @@ mod tests { assert_eq!(&[1, 2, 3, 4][..], response.body); assert_eq!(models::DType::Uint32, response.dtype); assert_eq!(vec![3], response.shape); - assert_eq!(3, response.count); + assert_eq!(vec![3], response.count); } struct TestNumOp {} @@ -141,7 +143,7 @@ mod tests { body.into(), request_data.dtype, vec![1, 2], - 2, + vec![2], )) } } @@ -155,6 +157,6 @@ mod tests { assert_eq!("i64", response.body); assert_eq!(models::DType::Int64, response.dtype); assert_eq!(vec![1, 2], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } } diff --git a/src/operations.rs b/src/operations.rs index cd7fe16..7f0f562 100644 --- a/src/operations.rs +++ b/src/operations.rs @@ -3,15 +3,16 @@ //! Each operation is implemented as a struct that implements the //! [Operation](crate::operation::Operation) trait. +use std::cmp::{max_by, min_by}; + use crate::array; use crate::error::ActiveStorageError; -use crate::models; +use crate::models::{self, Order, ReductionAxes}; use crate::operation::{Element, NumOperation}; use crate::types::Missing; use axum::body::Bytes; -use ndarray::ArrayView; -use ndarray_stats::{errors::MinMaxError, QuantileExt}; +use ndarray::{ArrayView, Axis}; // Bring trait into scope to use as_bytes method. use zerocopy::AsBytes; @@ -40,9 +41,66 @@ fn missing_filter<'a, T: Element>(missing: &'a Missing) -> Box fn count_non_missing( array: &ArrayView>, missing: &Missing, -) -> Result { +) -> usize { let filter = missing_filter(missing); - Ok(array.iter().copied().filter(filter).count()) + array.iter().copied().filter(filter).count() +} + +/// Counts the number of non-missing elements along +/// one or more axes of the provided array +fn count_array_multi_axis( + array: ndarray::ArrayView, + axes: &[usize], + missing: Option>, +) -> (Vec, Vec) { + let result = match axes.first() { + None => { + // Emulate numpy semantics of axis = () being + // equivalent to a 'reduction over no axes' + array.map(|val| { + if let Some(missing) = &missing { + if !missing.is_missing(val) { + 1 + } else { + 0 + } + } else { + 1 + } + }) + } + Some(first_axis) => { + // Count non-missing over first axis + let mut result = array + .fold_axis(Axis(*first_axis), 0, |running_count, val| { + if let Some(missing) = &missing { + if !missing.is_missing(val) { + running_count + 1 + } else { + *running_count + } + } else { + running_count + 1 + } + }) + .into_dyn(); + // Sum counts over remaining axes + if let Some(remaining_axes) = axes.get(1..) { + for (n, axis) in remaining_axes.iter().enumerate() { + result = result + .fold_axis(Axis(axis - n - 1), 0, |total_count, count| { + total_count + count + }) + .into_dyn(); + } + } + result + } + }; + + // Convert result to owned vec + let counts = result.iter().copied().collect(); + (counts, result.shape().into()) } /// Return the number of selected elements in the array. @@ -56,21 +114,50 @@ impl NumOperation for Count { let array = array::build_array::(request_data, &mut data)?; let slice_info = array::build_slice_info::(&request_data.selection, array.shape()); let sliced = array.slice(slice_info); - let count = if let Some(missing) = &request_data.missing { - let missing = Missing::::try_from(missing)?; - count_non_missing(&sliced, &missing)? + + let typed_missing: Option> = if let Some(missing) = &request_data.missing { + let m = Missing::try_from(missing)?; + Some(m) } else { - sliced.len() + None + }; + + let (body, shape, counts) = match &request_data.axis { + ReductionAxes::All => { + let count = if let Some(missing) = typed_missing { + count_non_missing(&sliced, &missing) + } else { + sliced.len() + }; + let count = i64::try_from(count)?; + let body = count.to_ne_bytes(); + // Need to copy to provide ownership to caller + // and ensure match arm types are aligned + let body = Bytes::copy_from_slice(&body); + (body, vec![], vec![count]) + } + ReductionAxes::One(axis) => { + // Single axis case is just a special case of the multi-axis case + let (counts, shape) = + count_array_multi_axis(sliced.view(), &[*axis], typed_missing); + let body = counts.as_bytes(); + // Need to copy to provide ownership to caller. + let body = Bytes::copy_from_slice(body); + (body, shape, counts) + } + ReductionAxes::Multi(axes) => { + let (counts, shape) = count_array_multi_axis(sliced.view(), axes, typed_missing); + let body = counts.as_bytes(); + // Need to copy to provide ownership to caller. + let body = Bytes::copy_from_slice(body); + (body, shape, counts) + } }; - let count = i64::try_from(count)?; - let body = count.to_ne_bytes(); - // Need to copy to provide ownership to caller. - let body = Bytes::copy_from_slice(&body); Ok(models::Response::new( body, models::DType::Int64, - vec![], - count, + shape, + counts, )) } } @@ -78,6 +165,111 @@ impl NumOperation for Count { /// Return the maximum of selected elements in the array. pub struct Max {} +fn max_element_pairwise(x: &&T, y: &&T) -> std::cmp::Ordering { + // TODO: How to handle NaN correctly? + // Numpy seems to behave as follows: + // + // np.min([np.nan, 1]) == np.nan + // np.max([np.nan, 1]) == np.nan + // np.nan != np.nan + // np.min([np.nan, 1]) != np.max([np.nan, 1]) + // + // There are also separate np.nan{min,max} functions + // which ignore nans instead. + // + // Which behaviour do we want to follow? + // + // Panic for now (TODO: Make this a user-facing error response instead) + x.partial_cmp(y) + // .unwrap_or(std::cmp::Ordering::Less) + .unwrap_or_else(|| panic!("unexpected undefined order error for min")) +} + +/// Emulates numpy behaviour of 'reduction over no axes' +/// when `axis=()` is passed to a numpy.ma function +fn reduction_over_zero_axes( + array: &ndarray::ArrayView, + missing: Option>, + order: &Option, +) -> ndarray::ArrayBase, ndarray::IxDyn> { + let func = |val| { + if let Some(missing) = &missing { + if !missing.is_missing(val) { + (*val, 1) + } else { + (*val, 0) + } + } else { + (*val, 1) + } + }; + let result = match order { + // Account for the fact that map always + // iterates in C-order. + Some(Order::F) => array.t().map(func), + _ => array.map(func), + }; + result +} + +/// Performs a max over one or more axes of the provided array +fn max_array_multi_axis( + array: ndarray::ArrayView, + axes: &[usize], + missing: Option>, + order: &Option, +) -> (Vec, Vec, Vec) { + let (result, shape) = match axes.first() { + None => { + // Emulate numpy behaviour of 'reduction over no axes' + let result = reduction_over_zero_axes(&array, missing, order); + (result, array.shape().to_owned()) + } + Some(first_axis) => { + // Find maximum over first axis and count elements operated on + let init = T::min_value(); + let mut result = array + .fold_axis(Axis(*first_axis), (init, 0), |(running_max, count), val| { + if let Some(missing) = &missing { + if !missing.is_missing(val) { + let new_max = max_by(running_max, val, max_element_pairwise); + (*new_max, count + 1) + } else { + (*running_max, *count) + } + } else { + let new_max = max_by(running_max, val, max_element_pairwise); + (*new_max, count + 1) + } + }) + .into_dyn(); + // Find max over remaining axes (where total count is now sum of counts) + if let Some(remaining_axes) = axes.get(1..) { + for (n, axis) in remaining_axes.iter().enumerate() { + result = result + .fold_axis( + Axis(axis - n - 1), + (init, 0), + |(global_max, total_count), (running_max, count)| { + let new_max = max_by(global_max, running_max, max_element_pairwise); + (*new_max, total_count + count) + }, + ) + .into_dyn(); + } + } + let shape = result.shape().to_owned(); + (result, shape) + } + }; + + // Result is array of (max, count) tuples so separate them here + let maxes = result.iter().map(|(max, _)| *max).collect::>(); + let counts = result.iter().map(|(_, count)| *count).collect::>(); + + (maxes, counts, shape) +} + impl NumOperation for Max { fn execute_t( request_data: &models::RequestData, @@ -86,43 +278,56 @@ impl NumOperation for Max { let array = array::build_array::(request_data, &mut data)?; let slice_info = array::build_slice_info::(&request_data.selection, array.shape()); let sliced = array.slice(slice_info); - let (max, count) = if let Some(missing) = &request_data.missing { - let missing = Missing::::try_from(missing)?; - // Use a fold to simultaneously max and count the non-missing data. - // TODO: separate float impl? - // TODO: inifinite/NaN - let (max, count) = sliced - .iter() - .copied() - .filter(missing_filter(&missing)) - .fold((None, 0), |(a, count), b| { - let max = match (a, b) { - (None, b) => Some(b), //FIXME: if b.is_finite() { Some(b) } else { None }, - (Some(a), b) => Some(std::cmp::max_by(a, b, |x, y| { - x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Greater) - })), - }; - (max, count + 1) - }); - let max = max.ok_or(ActiveStorageError::EmptyArray { operation: "max" })?; - (max, count) + + let typed_missing: Option> = if let Some(missing) = &request_data.missing { + let m = Missing::try_from(missing)?; + Some(m) } else { - let max = *sliced.max().map_err(|err| match err { - MinMaxError::EmptyInput => ActiveStorageError::EmptyArray { operation: "max" }, - MinMaxError::UndefinedOrder => panic!("unexpected undefined order error for max"), - })?; - let count = sliced.len(); - (max, count) + None }; - let count = i64::try_from(count)?; - let body = max.as_bytes(); - // Need to copy to provide ownership to caller. - let body = Bytes::copy_from_slice(body); + + let (body, counts, shape) = match &request_data.axis { + ReductionAxes::One(axis) => { + // Reduction over one axis is just a special case of + // the general multi-axis reduction. + let (maxes, counts, shape) = max_array_multi_axis( + sliced.view(), + &[*axis], + typed_missing, + &request_data.order, + ); + let body = Bytes::copy_from_slice(maxes.as_bytes()); + (body, counts, shape) + } + ReductionAxes::Multi(axes) => { + let (maxes, counts, shape) = + max_array_multi_axis(sliced, axes, typed_missing, &request_data.order); + let body = Bytes::copy_from_slice(maxes.as_bytes()); + (body, counts, shape) + } + ReductionAxes::All => { + let init = T::min_value(); + let (max, count) = sliced.fold((init, 0_i64), |(running_max, count), val| { + if let Some(missing) = &typed_missing { + if !missing.is_missing(val) { + (*max_by(&running_max, val, max_element_pairwise), count + 1) + } else { + (running_max, count) + } + } else { + (*max_by(&running_max, val, max_element_pairwise), count + 1) + } + }); + let body = Bytes::copy_from_slice(max.as_bytes()); + (body, vec![count], vec![]) + } + }; + Ok(models::Response::new( body, request_data.dtype, - vec![], - count, + shape, + counts, )) } } @@ -130,6 +335,87 @@ impl NumOperation for Max { /// Return the minimum of selected elements in the array. pub struct Min {} +fn min_element_pairwise(x: &&T, y: &&T) -> std::cmp::Ordering { + // TODO: How to handle NaN correctly? + // Numpy seems to behave as follows: + + // np.min([np.nan, 1]) == np.nan + // np.max([np.nan, 1]) == np.nan + // np.nan != np.nan + // np.min([np.nan, 1]) != np.max([np.nan, 1]) + + // There are also separate np.nan{min,max} functions + // which ignore nans instead so that + + // np.nanmin([np.nan, 1]) == 1 + // np.nanmax([np.nan, 1]) == 1 + + // Which behaviour do we want to follow? + // + // Panic for now (TODO: Make this a user-facing error response instead) + x.partial_cmp(y) + // .unwrap_or(std::cmp::Ordering::Less) + .unwrap_or_else(|| panic!("unexpected undefined order error for min")) +} + +/// Finds the minimum value over one or more axes of the provided array +fn min_array_multi_axis( + array: ndarray::ArrayView, + axes: &[usize], + missing: Option>, + order: &Option, +) -> (Vec, Vec, Vec) { + let (result, shape) = match axes.first() { + None => { + // Emulate numpy behaviour of 'reduction over no axes' + let result = reduction_over_zero_axes(&array, missing, order); + (result, array.shape().to_owned()) + } + Some(first_axis) => { + // Find minimum over first axis and count elements operated on + let init = T::max_value(); + let mut result = array + .fold_axis(Axis(*first_axis), (init, 0), |(running_min, count), val| { + if let Some(missing) = &missing { + if !missing.is_missing(val) { + let new_min = min_by(running_min, val, min_element_pairwise); + (*new_min, count + 1) + } else { + (*running_min, *count) + } + } else { + let new_min = min_by(running_min, val, min_element_pairwise); + (*new_min, count + 1) + } + }) + .into_dyn(); + // Find min over remaining axes (where total count is now sum of counts) + if let Some(remaining_axes) = axes.get(1..) { + for (n, axis) in remaining_axes.iter().enumerate() { + result = result + .fold_axis( + Axis(axis - n - 1), + (init, 0), + |(global_min, total_count), (running_min, count)| { + let new_min = min_by(global_min, running_min, min_element_pairwise); + (*new_min, total_count + count) + }, + ) + .into_dyn(); + } + } + let shape = result.shape().to_owned(); + (result, shape) + } + }; + + // Result is array of (mins, count) tuples so separate them here + let mins = result.iter().map(|(min, _)| *min).collect::>(); + let counts = result.iter().map(|(_, count)| *count).collect::>(); + + (mins, counts, shape) +} + impl NumOperation for Min { fn execute_t( request_data: &models::RequestData, @@ -138,43 +424,60 @@ impl NumOperation for Min { let array = array::build_array::(request_data, &mut data)?; let slice_info = array::build_slice_info::(&request_data.selection, array.shape()); let sliced = array.slice(slice_info); - let (min, count) = if let Some(missing) = &request_data.missing { - let missing = Missing::::try_from(missing)?; - // Use a fold to simultaneously min and count the non-missing data. - // TODO: separate float impl? - // TODO: inifinite/NaN - let (min, count) = sliced - .iter() - .copied() - .filter(missing_filter(&missing)) - .fold((None, 0), |(a, count), b| { - let min = match (a, b) { - (None, b) => Some(b), //FIXME: if b.is_finite() { Some(b) } else { None }, - (Some(a), b) => Some(std::cmp::min_by(a, b, |x, y| { - x.partial_cmp(y).unwrap_or(std::cmp::Ordering::Less) - })), - }; - (min, count + 1) - }); - let min = min.ok_or(ActiveStorageError::EmptyArray { operation: "min" })?; - (min, count) + + // Convert Missing to Missing + let typed_missing: Option> = if let Some(missing) = &request_data.missing { + let m = Missing::try_from(missing)?; + Some(m) } else { - let min = *sliced.min().map_err(|err| match err { - MinMaxError::EmptyInput => ActiveStorageError::EmptyArray { operation: "min" }, - MinMaxError::UndefinedOrder => panic!("unexpected undefined order error for min"), - })?; - let count = sliced.len(); - (min, count) + None + }; + + // Use ndarray::fold, ndarray::fold_axis or dispatch to specialised + // multi-axis function depending on whether we're performing reduction + // over all axes or only a subset + let (body, counts, shape) = match &request_data.axis { + ReductionAxes::One(axis) => { + // Reduction over one axis is just a special case of + // the general multi-axis reduction. + let (mins, counts, shape) = min_array_multi_axis( + sliced.view(), + &[*axis], + typed_missing, + &request_data.order, + ); + let body = Bytes::copy_from_slice(mins.as_bytes()); + (body, counts, shape) + } + ReductionAxes::Multi(axes) => { + let (mins, counts, shape) = + min_array_multi_axis(sliced, axes, typed_missing, &request_data.order); + let body = Bytes::copy_from_slice(mins.as_bytes()); + (body, counts, shape) + } + ReductionAxes::All => { + let init = T::max_value(); + let (min, count) = sliced.fold((init, 0_i64), |(running_min, count), val| { + if let Some(missing) = &typed_missing { + if !missing.is_missing(val) { + (*min_by(&running_min, val, min_element_pairwise), count + 1) + } else { + (running_min, count) + } + } else { + (*min_by(&running_min, val, min_element_pairwise), count + 1) + } + }); + // Need to copy to provide ownership to caller. + let body = Bytes::copy_from_slice(min.as_bytes()); + (body, vec![count], vec![]) + } }; - let count = i64::try_from(count)?; - let body = min.as_bytes(); - // Need to copy to provide ownership to caller. - let body = Bytes::copy_from_slice(body); Ok(models::Response::new( body, request_data.dtype, - vec![], - count, + shape, + counts, )) } } @@ -187,12 +490,14 @@ impl NumOperation for Select { request_data: &models::RequestData, mut data: Vec, ) -> Result { + // NOTE(sd109): We explicitly ignore the request_data.axis field + // here because the 'select' operation is not a actually reduction let array = array::build_array::(request_data, &mut data)?; let slice_info = array::build_slice_info::(&request_data.selection, array.shape()); let sliced = array.slice(slice_info); let count = if let Some(missing) = &request_data.missing { let missing = Missing::::try_from(missing)?; - count_non_missing(&sliced, &missing)? + count_non_missing(&sliced, &missing) } else { sliced.len() }; @@ -212,7 +517,7 @@ impl NumOperation for Select { body, request_data.dtype, shape, - count, + vec![count], )) } } @@ -220,6 +525,60 @@ impl NumOperation for Select { /// Return the sum of selected elements in the array. pub struct Sum {} +/// Performs a sum over one or more axes of the provided array +fn sum_array_multi_axis( + array: ndarray::ArrayView, + axes: &[usize], + missing: Option>, + order: &Option, +) -> (Vec, Vec, Vec) { + let (result, shape) = match axes.first() { + None => { + // Emulate numpy behaviour of 'reduction over no axes' + let result = reduction_over_zero_axes(&array, missing, order); + (result, array.shape().to_owned()) + } + Some(first_axis) => { + // Sum over first axis and count elements operated on + let mut result = array + .fold_axis(Axis(*first_axis), (T::zero(), 0), |(sum, count), val| { + if let Some(missing) = &missing { + if !missing.is_missing(val) { + (*sum + *val, count + 1) + } else { + (*sum, *count) + } + } else { + (*sum + *val, count + 1) + } + }) + .into_dyn(); + // Sum over remaining axes (where total count is now sum of counts) + if let Some(remaining_axes) = axes.get(1..) { + for (n, axis) in remaining_axes.iter().enumerate() { + result = result + .fold_axis( + Axis(axis - n - 1), + (T::zero(), 0), + |(total_sum, total_count), (sum, count)| { + (*total_sum + *sum, total_count + count) + }, + ) + .into_dyn(); + } + } + let shape = result.shape().to_owned(); + (result, shape) + } + }; + + // Result is array of (sum, count) tuples so separate them here + let sums = result.iter().map(|(sum, _)| *sum).collect::>(); + let counts = result.iter().map(|(_, count)| *count).collect::>(); + + (sums, counts, shape) +} + impl NumOperation for Sum { fn execute_t( request_data: &models::RequestData, @@ -228,34 +587,68 @@ impl NumOperation for Sum { let array = array::build_array::(request_data, &mut data)?; let slice_info = array::build_slice_info::(&request_data.selection, array.shape()); let sliced = array.slice(slice_info); - let (sum, count) = if let Some(missing) = &request_data.missing { - let missing = Missing::::try_from(missing)?; - // Use a fold to simultaneously sum and count the non-missing data. - sliced - .iter() - .copied() - .filter(missing_filter(&missing)) - .fold((T::zero(), 0), |(a, count), b| (a + b, count + 1)) + + // Convert Missing to Missing + let typed_missing: Option> = if let Some(missing) = &request_data.missing { + let m = Missing::try_from(missing)?; + Some(m) } else { - (sliced.sum(), sliced.len()) + None }; - let count = i64::try_from(count)?; - let body = sum.as_bytes(); - // Need to copy to provide ownership to caller. - let body = Bytes::copy_from_slice(body); + + // Use ndarray::fold or ndarray::fold_axis depending on whether we're + // performing reduction over all axes or only a subset + let (body, counts, shape) = match &request_data.axis { + ReductionAxes::One(axis) => { + let (sums, counts, shape) = sum_array_multi_axis( + sliced.view(), + &[*axis], + typed_missing, + &request_data.order, + ); + let body = Bytes::copy_from_slice(sums.as_bytes()); + (body, counts, shape) + } + ReductionAxes::Multi(axes) => { + let (sums, counts, shape) = + sum_array_multi_axis(sliced, axes, typed_missing, &request_data.order); + let body = Bytes::copy_from_slice(sums.as_bytes()); + (body, counts, shape) + } + ReductionAxes::All => { + let (sum, count) = sliced.fold((T::zero(), 0_i64), |(sum, count), val| { + if let Some(missing) = &typed_missing { + if !missing.is_missing(val) { + (sum + *val, count + 1) + } else { + (sum, count) + } + } else { + (sum + *val, count + 1) + } + }); + + // Convert to generic owned Bytes instance + let body = Bytes::copy_from_slice(sum.as_bytes()); + (body, vec![count], vec![]) + } + }; + Ok(models::Response::new( body, request_data.dtype, - vec![], - count, + shape, + counts, )) } } #[cfg(test)] mod tests { + use super::*; + use crate::models::ReductionAxes; use crate::operation::Operation; use crate::test_utils; use crate::types::DValue; @@ -267,7 +660,7 @@ mod tests { let response = Count::execute(&request_data, data).unwrap(); // A Vec of 8 elements == a u32 slice with 2 elements // Count is always i64. - let expected: i64 = 2; + let expected = vec![2]; assert_eq!(expected.as_bytes(), response.body); assert_eq!(8, response.body.len()); // Assert that count value is 8 bytes (i.e. i64) assert_eq!(models::DType::Int64, response.dtype); @@ -284,7 +677,7 @@ mod tests { let response = Count::execute(&request_data, data).unwrap(); // A Vec of 8 elements == a u32 slice with 2 elements // Count is always i64. - let expected: i64 = 1; + let expected = vec![1]; assert_eq!(expected.as_bytes(), response.body); assert_eq!(8, response.body.len()); // Assert that count value is 8 bytes (i.e. i64) assert_eq!(models::DType::Int64, response.dtype); @@ -308,7 +701,7 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Int64, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(1, response.count); + assert_eq!(vec![1], response.count); } #[test] @@ -328,7 +721,7 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Int64, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(1, response.count); + assert_eq!(vec![1], response.count); } #[test] @@ -343,7 +736,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -358,7 +751,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -372,7 +765,7 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Uint64, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(1, response.count); + assert_eq!(vec![1], response.count); } #[test] @@ -393,7 +786,7 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Int64, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(1, response.count); + assert_eq!(vec![1], response.count); } #[test] @@ -408,7 +801,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -423,7 +816,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -439,7 +832,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -455,10 +848,11 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] + #[should_panic(expected = "unexpected undefined order error for min")] fn min_f32_1d_nan_missing_value() { let mut request_data = test_utils::get_test_request_data(); request_data.dtype = models::DType::Float32; @@ -471,10 +865,11 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] + #[should_panic(expected = "unexpected undefined order error for min")] fn min_f32_1d_nan_first_missing_value() { let mut request_data = test_utils::get_test_request_data(); request_data.dtype = models::DType::Float32; @@ -488,7 +883,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -502,7 +897,25 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![2], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); + } + + #[test] + fn select_f32_1d_1ax() { + // Arrange + let mut request_data = test_utils::get_test_request_data(); + request_data.dtype = models::DType::Float32; + request_data.axis = ReductionAxes::Multi(vec![0]); + let data = vec![1, 2, 3, 4, 5, 6, 7, 8]; + // Act + let response = Select::execute(&request_data, data).unwrap(); + // Assert (check that axis value is ignored) + let expected: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8]; + assert_eq!(expected.as_bytes(), response.body); + assert_eq!(8, response.body.len()); + assert_eq!(models::DType::Float32, response.dtype); + assert_eq!(vec![2], response.shape); + assert_eq!(vec![2], response.count); } #[test] @@ -517,7 +930,7 @@ mod tests { assert_eq!(16, response.body.len()); assert_eq!(models::DType::Float64, response.dtype); assert_eq!(vec![2, 1], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -539,7 +952,7 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![2, 1], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -553,7 +966,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Uint32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -568,7 +981,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Uint32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(1, response.count); + assert_eq!(vec![1], response.count); } #[test] @@ -583,7 +996,7 @@ mod tests { assert_eq!(4, response.body.len()); assert_eq!(models::DType::Float32, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); } #[test] @@ -598,7 +1011,82 @@ mod tests { assert_eq!(8, response.body.len()); assert_eq!(models::DType::Float64, response.dtype); assert_eq!(vec![0; 0], response.shape); - assert_eq!(2, response.count); + assert_eq!(vec![2], response.count); + } + + /// Test helper for converting response bytes back into a type + /// that's easier to assert on + fn vec_from_bytes(data: &Bytes) -> Vec { + let mut data = data.to_vec(); + let data = data.as_mut_slice(); + let layout = zerocopy::LayoutVerified::<_, [T]>::new_slice(&mut data[..]).unwrap(); + layout.into_mut_slice().to_vec() + } + + #[test] + fn sum_u32_1d_axis_0() { + // Arrange + let mut request_data = test_utils::get_test_request_data(); + request_data.dtype = models::DType::Uint32; + request_data.shape = Some(vec![2, 4]); + request_data.axis = ReductionAxes::One(0); + let data: Vec = vec![1, 2, 3, 4, 5, 6, 7, 8]; + // Act + let response = Sum::execute(&request_data, data.as_bytes().into()).unwrap(); + let result = vec_from_bytes::(&response.body); + // Assert + let arr = ndarray::Array::from_shape_vec((2, 4), data).unwrap(); + let expected = arr.sum_axis(Axis(0)).to_vec(); + assert_eq!(result, expected); + assert_eq!(models::DType::Uint32, response.dtype); + assert_eq!(16, response.body.len()); // 4 bytes in a u32 + assert_eq!(vec![4], response.shape); + assert_eq!(vec![2, 2, 2, 2], response.count); + } + + #[test] + fn sum_u32_1d_axis_1_missing() { + // Arrange + let mut request_data = test_utils::get_test_request_data(); + request_data.dtype = models::DType::Uint32; + request_data.shape = Some(vec![2, 4]); + request_data.axis = ReductionAxes::One(1); + request_data.missing = Some(Missing::MissingValue(0.into())); + let data: Vec = vec![0, 2, 3, 4, 5, 6, 7, 8]; + // Act + let response = Sum::execute(&request_data, data.as_bytes().into()).unwrap(); + let result = vec_from_bytes::(&response.body); + // Assert + let arr = ndarray::Array::from_shape_vec((2, 4), data).unwrap(); + let expected = arr.sum_axis(Axis(1)).to_vec(); + assert_eq!(result, expected); + assert_eq!(models::DType::Uint32, response.dtype); + assert_eq!(8, response.body.len()); // 4 bytes in a u32 + assert_eq!(vec![2], response.shape); + assert_eq!(vec![3, 4], response.count); // Expect a lower count due to 'missing' value + } + + #[test] + fn sum_f64_1d_axis_1_missing() { + // Arrange + let mut request_data = test_utils::get_test_request_data(); + request_data.dtype = models::DType::Float64; + request_data.shape = Some(vec![2, 2, 2]); + request_data.axis = ReductionAxes::One(1); + request_data.missing = Some(Missing::MissingValue(0.into())); + let data: Vec = vec![0.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]; + // Act + let response = Sum::execute(&request_data, data.as_bytes().into()).unwrap(); + let result = vec_from_bytes::(&response.body); + let result = ndarray::Array::from_shape_vec((2, 2), result).unwrap(); + // Assert + let arr = ndarray::Array::from_shape_vec((2, 2, 2), data).unwrap(); + let expected = arr.sum_axis(Axis(1)); + assert_eq!(result, expected); + assert_eq!(models::DType::Float64, response.dtype); + assert_eq!(32, response.body.len()); // 8 bytes in a f64 + assert_eq!(vec![2, 2], response.shape); + assert_eq!(vec![1, 2, 2, 2], response.count); // Expect a lower count due to 'missing' value } #[test] @@ -613,4 +1101,368 @@ mod tests { Some(std::cmp::Ordering::Greater) ); } + + #[test] + #[should_panic(expected = "assertion failed: axis.index() < self.ndim()")] + fn sum_multi_axis_2d_wrong_axis() { + let array = ndarray::Array::from_shape_vec((2, 2), (0..4).collect()) + .unwrap() + .into_dyn(); + let axes = vec![2]; + let _ = sum_array_multi_axis(array.view(), &axes, None, &None); + } + + #[test] + fn sum_multi_axis_2d_2ax() { + let array = ndarray::Array::from_shape_vec((2, 2), (0..4).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1]; + let (sum, count, shape) = sum_array_multi_axis(array.view(), &axes, None, &None); + assert_eq!(sum, vec![6]); + assert_eq!(count, vec![4]); + assert_eq!(shape, Vec::::new()); + } + + #[test] + fn sum_multi_axis_2d_2ax_missing() { + let array = ndarray::Array::from_shape_vec((2, 2), (0..4).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1]; + let missing = Missing::MissingValue(1); + let (sum, count, shape) = sum_array_multi_axis(array.view(), &axes, Some(missing), &None); + assert_eq!(sum, vec![5]); + assert_eq!(count, vec![3]); + assert_eq!(shape, Vec::::new()); + } + + #[test] + fn sum_multi_axis_2d_no_ax_some_missing() { + // Arrange + let axes = vec![]; + let missing = Some(Missing::ValidMax(2)); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = sum_array_multi_axis(arr.view(), &axes, missing, &None); + // Assert - reduction should just return original array + assert_eq!(result, arr.iter().copied().collect::>()); + assert_eq!(counts, vec![1, 1, 1, 0, 0, 0]); + assert_eq!(shape, arr.shape()); + } + + #[test] + fn sum_multi_axis_2d_no_ax_some_missing_f_order() { + // Arrange + let axes = vec![]; + let missing = Some(Missing::ValidMax(2)); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = + sum_array_multi_axis(arr.view(), &axes, missing, &Some(Order::F)); + // Assert - reduction should return transposed version of + // original array (due to F ordering) + assert_eq!(result, arr.t().iter().copied().collect::>()); + assert_eq!(counts, vec![1, 0, 1, 0, 1, 0]); + assert_eq!(shape, arr.shape()); + } + + #[test] + fn sum_multi_axis_4d_1ax() { + let array = ndarray::Array::from_shape_vec((2, 3, 2, 1), (0..12).collect()) + .unwrap() + .into_dyn(); + let axes = vec![2]; + let (sum, count, shape) = sum_array_multi_axis(array.view(), &axes, None, &None); + assert_eq!(sum, vec![1, 5, 9, 13, 17, 21]); + assert_eq!(count, vec![2, 2, 2, 2, 2, 2]); + assert_eq!(shape, vec![2, 3, 1]); + } + + #[test] + fn sum_multi_axis_4d_3ax() { + let array = ndarray::Array::from_shape_vec((2, 3, 2, 1), (0..12).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1, 3]; + let (sum, count, shape) = sum_array_multi_axis(array.view(), &axes, None, &None); + assert_eq!(sum, vec![30, 36]); + assert_eq!(count, vec![6, 6]); + assert_eq!(shape, vec![2]); + } + + #[test] + #[should_panic(expected = "assertion failed: axis.index() < self.ndim()")] + fn min_multi_axis_2d_wrong_axis() { + let array = ndarray::Array::from_shape_vec((2, 2), (0..4).collect()) + .unwrap() + .into_dyn(); + let axes = vec![2]; + let _ = min_array_multi_axis(array.view(), &axes, None, &None); + } + + #[test] + fn min_multi_axis_2d_2ax() { + // Arrrange + let axes = vec![0, 1]; + let missing = None; + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = min_array_multi_axis(arr.view(), &axes, missing, &None); + // Assert + assert_eq!(result, vec![0]); + assert_eq!(counts, vec![6]); + assert_eq!(shape, Vec::::new()); + } + + #[test] + fn min_multi_axis_2d_no_ax_some_missing() { + // Arrange + let axes = vec![]; + let missing = Some(Missing::ValidMax(2)); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = min_array_multi_axis(arr.view(), &axes, missing, &None); + // Assert - reduction should just return original array + assert_eq!(result, arr.iter().copied().collect::>()); + assert_eq!(counts, vec![1, 1, 1, 0, 0, 0]); + assert_eq!(shape, arr.shape()); + } + + #[test] + fn min_multi_axis_2d_no_ax_some_missing_f_order() { + // Arrange + let axes = vec![]; + let missing = Some(Missing::ValidMax(2)); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = + min_array_multi_axis(arr.view(), &axes, missing, &Some(Order::F)); + // Assert - reduction should return transposed version of + // original array (due to F ordering) + assert_eq!(result, arr.t().iter().copied().collect::>()); + assert_eq!(counts, vec![1, 0, 1, 0, 1, 0]); + assert_eq!(shape, arr.shape()); + } + + #[test] + fn min_multi_axis_2d_1ax_missing() { + // Arrange + let axes = vec![1]; + let missing = Missing::MissingValue(0); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = min_array_multi_axis(arr.view(), &axes, Some(missing), &None); + // Assert + assert_eq!(result, vec![1, 3]); + assert_eq!(counts, vec![2, 3]); + assert_eq!(shape, vec![2]); + } + + #[test] + fn min_multi_axis_4d_3ax_missing() { + let arr = ndarray::Array::from_shape_vec((2, 3, 2, 1), (0..12).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1, 3]; + let missing = Missing::MissingValue(1); + let (result, counts, shape) = min_array_multi_axis(arr.view(), &axes, Some(missing), &None); + + assert_eq!(result, vec![0, 3]); + assert_eq!(counts, vec![6, 5]); + assert_eq!(shape, vec![2]); + } + + #[test] + #[should_panic(expected = "assertion failed: axis.index() < self.ndim()")] + fn max_multi_axis_2d_wrong_axis() { + // Arrange + let array = ndarray::Array::from_shape_vec((2, 2), (0..4).collect()) + .unwrap() + .into_dyn(); + let axes = vec![2]; + // Act + let _ = max_array_multi_axis(array.view(), &axes, None, &None); + } + + #[test] + fn max_multi_axis_2d_2ax() { + // Arrange + let axes = vec![0, 1]; + let missing = None; + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = max_array_multi_axis(arr.view(), &axes, missing, &None); + // Assert + assert_eq!(result, vec![5]); + assert_eq!(counts, vec![6]); + assert_eq!(shape, Vec::::new()); + } + + #[test] + fn max_multi_axis_2d_no_ax_some_missing() { + // Arrange + let axes = vec![]; + let missing = Some(Missing::ValidMax(2)); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = max_array_multi_axis(arr.view(), &axes, missing, &None); + // Assert - reduction should just return original array + assert_eq!(result, arr.iter().copied().collect::>()); + assert_eq!(counts, vec![1, 1, 1, 0, 0, 0]); + assert_eq!(shape, arr.shape()); + } + + #[test] + fn max_multi_axis_2d_no_ax_some_missing_f_order() { + // Arrange + let axes = vec![]; + let missing = Some(Missing::ValidMax(2)); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = + max_array_multi_axis(arr.view(), &axes, missing, &Some(Order::F)); + // Assert - reduction should return transposed version of + // original array (due to F ordering) + assert_eq!(result, arr.t().iter().copied().collect::>()); + assert_eq!(counts, vec![1, 0, 1, 0, 1, 0]); + assert_eq!(shape, arr.shape()); + } + + #[test] + fn max_multi_axis_2d_1ax_missing() { + // Arrange + let axes = vec![1]; + let missing = Missing::MissingValue(0); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (result, counts, shape) = max_array_multi_axis(arr.view(), &axes, Some(missing), &None); + // Assert + assert_eq!(result, vec![2, 5]); + assert_eq!(counts, vec![2, 3]); + assert_eq!(shape, vec![2]); + } + + #[test] + fn max_multi_axis_4d_3ax_missing() { + // Arrange + let arr = ndarray::Array::from_shape_vec((2, 3, 2, 1), (0..12).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1, 3]; + let missing = Missing::MissingValue(10); + // Act + let (result, counts, shape) = max_array_multi_axis(arr.view(), &axes, Some(missing), &None); + // Assert + assert_eq!(result, vec![8, 11]); + assert_eq!(counts, vec![5, 6]); + assert_eq!(shape, vec![2]); + } + + #[test] + #[should_panic(expected = "assertion failed: axis.index() < self.ndim()")] + fn count_multi_axis_2d_wrong_axis() { + // Arrange + let array = ndarray::Array::from_shape_vec((2, 2), (0..4).collect()) + .unwrap() + .into_dyn(); + let axes = vec![2]; + // Act + let _ = count_array_multi_axis(array.view(), &axes, None); + } + + #[test] + fn count_multi_axis_2d_2ax() { + // Arrange + let axes = vec![0, 1]; + let missing = None; + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (counts, shape) = count_array_multi_axis(arr.view(), &axes, missing); + // Assert + assert_eq!(counts, vec![6]); + assert_eq!(shape, Vec::::new()); + } + + #[test] + fn count_multi_axis_2d_no_ax() { + // Arrange + let axes = vec![]; + let missing = None; + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (counts, shape) = count_array_multi_axis(arr.view(), &axes, missing); + // Assert + assert_eq!(counts, vec![1, 1, 1, 1, 1, 1]); + assert_eq!(shape, arr.shape().to_vec()); + } + + #[test] + fn count_multi_axis_2d_1ax_missing() { + // Arrange + let axes = vec![1]; + let missing = Missing::MissingValue(0); + let arr = ndarray::Array::from_shape_vec((2, 3), (0..6).collect()) + .unwrap() + .into_dyn(); + // Act + let (counts, shape) = count_array_multi_axis(arr.view(), &axes, Some(missing)); + // Assert + assert_eq!(counts, vec![2, 3]); + assert_eq!(shape, vec![2]); + } + + #[test] + fn count_multi_axis_4d_3ax_multi_missing() { + // Arrange + let arr = ndarray::Array::from_shape_vec((2, 3, 2, 1), (0..12).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1, 3]; + let missing = Missing::MissingValues(vec![9, 10, 11]); + // Act + let (counts, shape) = count_array_multi_axis(arr.view(), &axes, Some(missing)); + // Assert + assert_eq!(counts, vec![5, 4]); + assert_eq!(shape, vec![2]); + } + + #[test] + fn count_multi_axis_4d_3ax_missing() { + // Arrange + let arr = ndarray::Array::from_shape_vec((2, 3, 2, 1), (0..12).collect()) + .unwrap() + .into_dyn(); + let axes = vec![0, 1, 3]; + let missing = Missing::MissingValue(10); + // Act + let (counts, shape) = count_array_multi_axis(arr.view(), &axes, Some(missing)); + // Assert + assert_eq!(counts, vec![5, 6]); + assert_eq!(shape, vec![2]); + } } diff --git a/src/test_utils.rs b/src/test_utils.rs index 99249cf..d6b5e29 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -13,6 +13,7 @@ pub(crate) fn get_test_request_data() -> RequestData { byte_order: None, offset: None, size: None, + axis: ReductionAxes::All, shape: None, order: None, selection: None, @@ -32,9 +33,14 @@ pub(crate) fn get_test_request_data_optional() -> RequestData { byte_order: Some(ByteOrder::Little), offset: Some(4), size: Some(8), - shape: Some(vec![2, 5]), + axis: ReductionAxes::Multi(vec![1, 2]), + shape: Some(vec![2, 5, 1]), order: Some(Order::C), - selection: Some(vec![Slice::new(1, 2, 3), Slice::new(4, 5, 6)]), + selection: Some(vec![ + Slice::new(1, 2, 3), + Slice::new(4, 5, 6), + Slice::new(1, 1, 1), + ]), compression: Some(Compression::Gzip), filters: Some(vec![Filter::Shuffle { element_size: 4 }]), missing: Some(Missing::MissingValue(42.into())), diff --git a/src/types/missing.rs b/src/types/missing.rs index e877c68..bc2d152 100644 --- a/src/types/missing.rs +++ b/src/types/missing.rs @@ -14,6 +14,7 @@ use validator::ValidationError; use crate::error::ActiveStorageError; use crate::models::DType; +use crate::operation::Element; use crate::types::dvalue::TryFromDValue; use crate::types::DValue; @@ -30,11 +31,11 @@ use crate::types::DValue; pub enum Missing { /// A single missing value MissingValue(T), - /// Multple missing values + /// Multiple missing values MissingValues(Vec), /// Valid minimum ValidMin(T), - /// Valid maxiumum + /// Valid maximum ValidMax(T), /// Valid range ValidRange(T, T), @@ -108,6 +109,19 @@ impl TryFrom<&Missing> for Missing { } } +impl Missing { + /// Filter function to check whether the provided value is a 'missing' value + pub fn is_missing(&self, x: &T) -> bool { + match self { + Missing::MissingValue(value) => x == value, + Missing::MissingValues(values) => values.contains(x), + Missing::ValidMin(min) => x < min, + Missing::ValidMax(max) => x > max, + Missing::ValidRange(min, max) => x < min || x > max, + } + } +} + #[cfg(test)] mod tests { use num_traits::Float; @@ -232,4 +246,46 @@ mod tests { )) .unwrap(); } + + #[test] + fn test_is_missing_value() { + let missing = Missing::MissingValue(1); + assert!(!missing.is_missing(&0)); + assert!(missing.is_missing(&1)); + assert!(!missing.is_missing(&2)); + } + + #[test] + fn test_is_missing_values() { + let missing = Missing::MissingValues(vec![1, 2]); + assert!(!missing.is_missing(&0)); + assert!(missing.is_missing(&1)); + assert!(missing.is_missing(&2)); + assert!(!missing.is_missing(&3)); + } + + #[test] + fn test_is_missing_valid_min() { + let missing = Missing::ValidMin(1); + assert!(missing.is_missing(&0)); + assert!(!missing.is_missing(&1)); + assert!(!missing.is_missing(&2)); + } + + #[test] + fn test_is_missing_valid_max() { + let missing = Missing::ValidMax(1); + assert!(!missing.is_missing(&0)); + assert!(!missing.is_missing(&1)); + assert!(missing.is_missing(&2)); + } + + #[test] + fn test_is_missing_valid_range() { + let missing = Missing::ValidRange(1, 2); + assert!(missing.is_missing(&0)); + assert!(!missing.is_missing(&1)); + assert!(!missing.is_missing(&2)); + assert!(missing.is_missing(&3)); + } }