Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/byte_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn get_test_request_data() -> RequestData {
offset: None,
size: None,
shape: None,
axis: reductionist::models::ReductionAxes::All,
order: None,
selection: None,
compression: None,
Expand Down
5 changes: 3 additions & 2 deletions benches/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn get_test_request_data() -> RequestData {
offset: None,
size: None,
shape: None,
axis: reductionist::models::ReductionAxes::All,
order: None,
selection: None,
compression: None,
Expand All @@ -34,7 +35,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let size = size_k * 1024;
let data: Vec<i64> = (0_i64..size).map(|i| i % 256).collect::<Vec<i64>>();
let data: Vec<u8> = data.as_bytes().into();
let missings = vec![
let missing_types = vec![
None,
Some(Missing::MissingValue(42.into())),
Some(Missing::MissingValues(vec![42.into()])),
Expand All @@ -50,7 +51,7 @@ fn criterion_benchmark(c: &mut Criterion) {
("sum", Box::new(operations::Sum::execute)),
];
for (op_name, execute) in operations {
for missing in missings.clone() {
for missing in missing_types.clone() {
let name = format!("{}({}, {:?})", op_name, size, missing);
c.bench_function(&name, |b| {
b.iter(|| {
Expand Down
13 changes: 9 additions & 4 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ The request body should be a JSON object of the form:
// - optional, defaults to a simple 1D array
"shape": [20, 5],

// The axis or axes over which to perform the reduction operation
// - optional, can be either a single axis or list of axes, defaults
// to a reduction over all axes
"axis": 0,

// Indicates whether the data is in C order (row major)
// or Fortran order (column major, indicated by 'F')
// - optional, defaults to 'C'
Expand Down Expand Up @@ -78,10 +83,10 @@ Unauthenticated access to S3 is possible by omitting the basic auth header.
On success, all operations return HTTP 200 OK with the response using the same datatype as specified in the request except for `count` which always returns the result as `int64`.
The server returns the following headers with the HTTP response:

* `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`.
* `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`.
* `x-activestorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result.
* `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.
- `x-activestorage-dtype`: The data type of the data in the response payload. One of `int32`, `int64`, `uint32`, `uint64`, `float32` or `float64`.
- `x-activestorage-byte-order`: The byte order of the data in the response payload. Either `big` or `little`.
- `x-activestorage-shape`: A JSON-encoded list of numbers describing the shape of the data in the response payload. May be an empty list for a scalar result.
- `x-activestorage-count`: The number of non-missing array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.

On error, an HTTP 4XX (client) or 5XX (server) response code will be returned, with the response body being a JSON object of the following format:

Expand Down
33 changes: 15 additions & 18 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ Reductionist is built on top of a number of popular open source components.

A few properties make it relatively easy to build a conceptual mental model of how Reductionist works.

* All operations share the same request processing pipeline.
* The request processing pipeline for each request is a fairly linear sequence of steps.
* There is no persistent state.
* The only external service that is interacted with is an S3-compatible object store.
- All operations share the same request processing pipeline.
- The request processing pipeline for each request is a fairly linear sequence of steps.
- There is no persistent state.
- The only external service that is interacted with is an S3-compatible object store.

The more challenging aspects of the system are the lower level details of asynchronous programming, memory management, the Rust type system and working with multi-dimensional arrays.

Expand All @@ -29,7 +29,6 @@ A diagram of this step for the sum operation is shown in Figure 2.
<figcaption>Figure 2: Sum operation flow diagram</figcaption>
</figure>


## Axum web server

[Axum](https://docs.rs/axum) is an asynchronous web framework that performs well in [various benchmarks](https://github.com/programatik29/rust-web-benchmarks/blob/master/result/hello-world.md) and is built on top of various popular components, including the [hyper](https://hyper.rs/) HTTP library.
Expand Down Expand Up @@ -110,13 +109,11 @@ Each operation is implemented by a struct that implements the `NumOperation` tra
For example, the sum operation is implemented by the `Sum` struct in `src/operations.rs`.
The `Sum` struct's `execute_t` method does the following:

* Zero copy conversion of the byte array to a multi-dimensional [ndarray::ArrayView](https://docs.rs/ndarray/latest/ndarray/type.ArrayView.html) object of the data type, shape and byte order specified in the request data
* If a selection was specified in the request data, create a sliced `ndarray::ArrayView` onto the original array view
* If missing data was specified in the request data:
* Create an iterator over the array view that filters out missing data, performs the sum operation and counts non-missing elements
* Otherwise:
* Use the array view's native `sum` and `len` methods to take the sum and element count
* Convert the sum to a byte array and return with the element count
- Zero copy conversion of the byte array to a multi-dimensional [ndarray::ArrayView](https://docs.rs/ndarray/latest/ndarray/type.ArrayView.html) object of the data type, shape and byte order specified in the request data
- If a selection was specified in the request data, create a sliced `ndarray::ArrayView` onto the original array view
- Checks whether the reduction should be performed over all or only a subset of the sliced data's axes
- Performs a fold over each of the requested axes to calculate the required reduction while ignoring any specified missing data
- Convert the sum to a byte array and return with the element count

The procedure for other operations varies slightly but generally follows the same pattern.

Expand All @@ -136,9 +133,9 @@ Reductionist supports optional restriction of resource usage.
This is implemented in `src/resource_manager.rs` using [Tokio Semaphores](https://docs.rs/tokio/latest/tokio/sync/struct.Semaphore.html).
This allows Reductionist to limit the quantity of various resources used at any time:

* S3 connections
* memory used for numeric data (this is more of a rough guide than a perfect limit)
* threads used for CPU-bound work
- S3 connections
- memory used for numeric data (this is more of a rough guide than a perfect limit)
- threads used for CPU-bound work

## CPU-bound work

Expand All @@ -158,9 +155,9 @@ The second approach may leave the server more responsive if more CPU-heavy opera
Prometheus metrics are implemented in `src/metrics.rs` and are exposed by the Reductionist API under the `/metrics` path.
These include:

* incoming requests (counter)
* outgoing response (counter)
* response time (histogram)
- incoming requests (counter)
- outgoing response (counter)
- response time (histogram)

## Tracing and profiling

Expand Down
42 changes: 22 additions & 20 deletions docs/pyactivestorage.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,36 @@
Reductionist has been integrated with the [PyActiveStorage](https://github.com/valeriupredoi/PyActiveStorage) library, and PyActiveStorage acts as a client of the Reductionist server.
PyActiveStorage currently works with data in netCDF4 format, and is able to perform reductions on a variable within such a dataset.
Numerical operations are performed on individual storage chunks, with the results later aggregated.
The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implementated in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist.
The original POSIX/NumPy storage chunk reduction in PyActiveStorage is implemented in a `reduce_chunk` Python function in `activestorage/storage.py`, and this interface was used as the basis for the integration of Reductionist.
The following code snippet shows the `reduce_chunk` function signature.

```python
def reduce_chunk(rfile, offset, size, compression, filters, missing, dtype, shape, order, chunk_selection, method=None):
""" We do our own read of chunks and decoding etc
rfile - the actual file with the data
""" We do our own read of chunks and decoding etc

rfile - the actual file with the data
offset, size - where and what we want ...
compression - optional `numcodecs.abc.Codec` compression codec
filters - optional list of `numcodecs.abc.Codec` filter codecs
dtype - likely float32 in most cases.
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
dtype - likely float32 in most cases.
shape - will be a tuple, something like (3,3,1), this is the dimensionality of the
chunk itself
order - typically 'C' for c-type ordering
chunk_selection - python slice tuples for each dimension, e.g.
(slice(0, 2, 1), slice(1, 3, 1), slice(0, 1, 1))
this defines the part of the chunk which is to be obtained
or operated upon.
method - computation desired
(in this Python version it's an actual method, in
method - computation desired
(in this Python version it's an actual method, in
storage implementations we'll change to controlled vocabulary)

"""
```

For Reductionist, the `reduce_chunk` function signature in `activestorage/reductionist.py` is similar, but replaces the local file path with a `requests.Session` object, the Reductionist server URL, S3-compatible object store URL, and the bucket and object containing the data.

<!-- TODO: Update to include axis arg once integrated into PyActiveStorage -->

```python
def reduce_chunk(session, server, source, bucket, object,
offset, size, compression, filters, missing, dtype, shape,
Expand Down Expand Up @@ -64,11 +66,11 @@ def reduce_chunk(session, server, source, bucket, object,

Within the `reduce_chunk` implementation for Reductionist, the following steps are taken:

* build Reductionist API request data
* build Reductionist API URL
* perform an HTTP(S) POST request to Reductionist
* on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers
* on failure, raise a `ReductionistError` with the response status code and JSON encoded error response
- build Reductionist API request data
- build Reductionist API URL
- perform an HTTP(S) POST request to Reductionist
- on success, return a NumPy array containing the data in the response payload, with data type, shape and count determined by response headers
- on failure, raise a `ReductionistError` with the response status code and JSON encoded error response

The use of a `requests.Session` object allows for TCP connection pooling, reducing connection overhead when multiple requests are made within a short timeframe.

Expand All @@ -77,9 +79,9 @@ It should be possible to provide a unified interface to storage systems by abstr
Other changes to the main `activestorage.Active` class were necessary for integration of Reductionist.
These include:

* Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries
* Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket
* Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend
* Use of a thread pool to execute storage chunk reductions in parallel
* Unit tests to cover new and modified code
* Integration test changes to allow running against a POSIX or S3 storage backend
- Support for reading netCDF metadata from files stored in S3 using the [s3fs](https://s3fs.readthedocs.io/) and [h5netcdf](https://pypi.org/project/h5netcdf/) libraries
- Configuration options in `activestorage/config.py` to specify the Reductionist API URL, S3-compatible object store URL, S3 access key, secret key and bucket
- Constructor `storage_type` argument for `activestorage.Active` to specify the storage backend
- Use of a thread pool to execute storage chunk reductions in parallel
- Unit tests to cover new and modified code
- Integration test changes to allow running against a POSIX or S3 storage backend
14 changes: 11 additions & 3 deletions scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def get_args() -> argparse.Namespace:
parser.add_argument("--offset", type=int)
parser.add_argument("--size", type=int)
parser.add_argument("--shape", type=str)
parser.add_argument("--axis", type=str)
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
parser.add_argument("--selection", type=str)
parser.add_argument("--compression", type=str)
Expand Down Expand Up @@ -72,6 +73,8 @@ def build_request_data(args: argparse.Namespace) -> dict:
request_data["byte_order"] = args.byte_order
if args.shape:
request_data["shape"] = json.loads(args.shape)
if args.axis is not None:
request_data["axis"] = json.loads(args.axis)
if args.selection:
request_data["selection"] = json.loads(args.selection)
if args.compression:
Expand Down Expand Up @@ -113,11 +116,16 @@ def display(response, verbose=False):
#print(response.content)
dtype = response.headers['x-activestorage-dtype']
shape = json.loads(response.headers['x-activestorage-shape'])
result = np.frombuffer(response.content, dtype=dtype)
result = result.reshape(shape)
counts = json.loads(response.headers['x-activestorage-count'])
counts = np.array(counts)
if len(counts) > 1:
counts = counts.reshape(shape)
result = np.frombuffer(response.content, dtype=dtype).reshape(shape)
if verbose:
sep = "\n" if len(counts.shape) > 1 else " "
print("\nResponse headers:", response.headers)
print("\nResult:", result)
print("\nNon-missing count(s):", counts, sep=sep)
print("\nResult:", result, sep=sep)
else:
print(result)

Expand Down
3 changes: 3 additions & 0 deletions scripts/parallel-client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def get_args() -> argparse.Namespace:
parser.add_argument("--offset", type=int)
parser.add_argument("--size", type=int)
parser.add_argument("--shape", type=str)
parser.add_argument("--axis", type=str)
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
parser.add_argument("--selection", type=str)
parser.add_argument("--compression", type=str)
Expand Down Expand Up @@ -90,6 +91,8 @@ def build_request_data(args: argparse.Namespace) -> dict:
request_data["byte_order"] = args.byte_order
if args.shape:
request_data["shape"] = json.loads(args.shape)
if args.axis is not None:
request_data["axis"] = json.loads(args.axis)
if args.selection:
request_data["selection"] = json.loads(args.selection)
if args.compression:
Expand Down
6 changes: 3 additions & 3 deletions scripts/upload_sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import s3fs
import zlib

NUM_ITEMS = 10
NUM_ITEMS = 12
OBJECT_PREFIX = "data"
COMPRESSION_ALGS = [None, "gzip", "zlib"]
FILTER_ALGS = [None, "shuffle"]

#Use enum which also subclasses string type so that
# Use enum which also subclasses string type so that
# auto-generated OpenAPI schema can determine allowed dtypes
class AllowedDatatypes(str, Enum):
""" Data types supported by active storage proxy """
Expand All @@ -31,7 +31,7 @@ def n_bytes(self):
s3_fs = s3fs.S3FileSystem(key='minioadmin', secret='minioadmin', client_kwargs={'endpoint_url': S3_URL})
bucket = pathlib.Path('sample-data')

#Make sure s3 bucket exists
# Make sure s3 bucket exists
try:
s3_fs.mkdir(bucket)
except FileExistsError:
Expand Down
Loading
Loading