Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/rojak/turbulence/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ def received_operating_characteristic(
>>> y = da.asarray(y[decrease_idx])
>>> roc = received_operating_characteristic(y, scores, positive_classification_label=2)
>>> roc
BinaryClassificationResult(false_positives=dask.array<truediv, shape=(5,), dtype=float64, chunksize=(3,),
chunktype=numpy.ndarray>, true_positives=dask.array<truediv, shape=(5,), dtype=float64, chunksize=(3,),
chunktype=numpy.ndarray>, thresholds=dask.array<concatenate, shape=(5,), dtype=float64, chunksize=(3,),
BinaryClassificationResult(false_positives=dask.array<truediv, shape=(5,), dtype=float64, chunksize=(4,),
chunktype=numpy.ndarray>, true_positives=dask.array<truediv, shape=(5,), dtype=float64, chunksize=(4,),
chunktype=numpy.ndarray>, thresholds=dask.array<concatenate, shape=(5,), dtype=float64, chunksize=(4,),
chunktype=numpy.ndarray>)
>>> roc.false_positives.compute()
array([0. , 0. , 0.5, 0.5, 1. ])
Expand Down Expand Up @@ -168,10 +168,10 @@ def binary_classification_curve(

>>> classification = binary_classification_curve(y, scores, positive_classification_label=2)
>>> classification
BinaryClassificationResult(false_positives=dask.array<sub, shape=(4,), dtype=int64, chunksize=(3,),
BinaryClassificationResult(false_positives=dask.array<sub, shape=(4,), dtype=int64, chunksize=(4,),
chunktype=numpy.ndarray>, true_positives=dask.array<slice_with_int_dask_array_aggregate, shape=(4,), dtype=int64,
chunksize=(3,), chunktype=numpy.ndarray>, thresholds=dask.array<slice_with_int_dask_array_aggregate, shape=(4,),
dtype=float64, chunksize=(3,), chunktype=numpy.ndarray>)
chunksize=(4,), chunktype=numpy.ndarray>, thresholds=dask.array<slice_with_int_dask_array_aggregate, shape=(4,),
dtype=float64, chunksize=(4,), chunktype=numpy.ndarray>)


The method returns a named tuple :py:class:`BinaryClassificationResult` containing `dask.array.Array`. To get the
Expand Down Expand Up @@ -213,7 +213,11 @@ def binary_classification_curve(
# To reduce the data, use step_size to determine the minimum difference between two data points
bucketed_value_indices = da.nonzero(diff_values > minimum_step_size)[0].compute_chunk_sizes()
# ^ computing chunks here means that the subsequent dask arrays have a known number of chunks
threshold_indices = da.hstack((bucketed_value_indices, da.asarray([sorted_truth.size - 1]))).persist()
threshold_indices = (
da.hstack((bucketed_value_indices, da.asarray([sorted_truth.size - 1])))
.rechunk(sorted_truth.chunksize[0]) # pyright: ignore [reportAttributeAccessIssue]
.persist()
)

true_positive = da.cumsum(sorted_truth)[threshold_indices].persist()
# Magical equation from scikit-learn which means another cumsum is avoided
Expand Down
13 changes: 9 additions & 4 deletions src/rojak/turbulence/verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,14 @@ def _create_diagnostic_value_series(
]
assert set(name_of_index_columns).issubset(observational_data)

# Compute chunksize of array we are indexing into so that when the indices array indexes into it
# the number of chunks does not increase significantly. This prevents a PerformanceWarning from being thrown
# e.g. PerformanceWarning: Increasing number of chunks by factor of 12
array_chunk_size: int = da.ravel(grid_prototype).chunks[0]
# Retrieves the index for each row of data and stores them as dask arrays
indexing_columns: list[da.Array] = [
observational_data[col_name].to_dask_array(lengths=True).persist() for col_name in name_of_index_columns
observational_data[col_name].to_dask_array(lengths=True).rechunk(array_chunk_size).persist()
for col_name in name_of_index_columns
]
# Order of coordinate axes are not known beforehand. Therefore, use the axis order so that the index
# values matches the dimension of the array
Expand All @@ -138,10 +143,10 @@ def _create_diagnostic_value_series(

return {
diagnostic_name: da.ravel(computed_diagnostic.data)[flattened_index]
# .compute_chunk_sizes()
.to_dask_dataframe(
meta=pd.DataFrame({diagnostic_name: pd.Series(dtype=float)}),
index=observational_data.index, # ensures it matches observational_data.index
# As the chunks now follows that of the array we are indexing into, npartitions doesn't match the index
# Thus, index cannot be used as an kwarg to the conversion
columns=diagnostic_name,
)
.persist()
Expand Down Expand Up @@ -192,7 +197,7 @@ def nearest_diagnostic_value(self, time_window: "Limits[np.datetime64]") -> "dd.
observational_data,
as_column,
).persist()
return observational_data
return observational_data.optimize()


def _any_aggregation() -> dd.Aggregation:
Expand Down
Loading