Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cpp/arcticdb/arrow/arrow_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ std::vector<sparrow::array> arrow_arrays_from_column(const Column& column, std::
std::shared_ptr<std::vector<sparrow::record_batch>> segment_to_arrow_data(SegmentInMemory& segment) {
const auto total_blocks = segment.num_blocks();
const auto num_columns = segment.num_columns();
if (num_columns == 0) {
// We can't construct a record batch with no columns, so in this case we return an empty list of record batches,
// which needs special handling in python.
return {};
}
const auto column_blocks = segment.column(0).num_blocks();
util::check(total_blocks == column_blocks * num_columns, "Expected regular block size");

Expand Down
16 changes: 11 additions & 5 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,17 @@ SegmentInMemory allocate_chunked_frame(const std::shared_ptr<PipelineContext>& c
};
auto handlers = TypeHandlerRegistry::instance();

if (row_count > 0) {
for (auto& column : output.columns()) {
auto handler = handlers->get_handler(output_format, column->type());
const auto data_size = data_type_size(column->type(), output_format, DataTypeMode::EXTERNAL);
for (auto block_row_count : block_row_counts) {
for (auto& column : output.columns()) {
auto handler = handlers->get_handler(output_format, column->type());
const auto data_size = data_type_size(column->type(), output_format, DataTypeMode::EXTERNAL);
for (auto block_row_count : block_row_counts) {
if (block_row_count > 0) {
// We can end up with empty segments from the processing pipeline, e.g. when:
// - Filtering a data key to the empty set (e.g. date_range = (3, 3) in a data key with no index=3)
// - Resampling with a date range with a bucket slice containing no indices
// 0 sized memory blocks would break the offset assumptions in chunked buffers, and it is fine to have
// number of memory blocks not equal number of segments because follow-up methods like
// `copy_frame_data_to_buffer` rely on offsets rather than block indices.
const auto bytes = block_row_count * data_size;
column->allocate_data(bytes);
column->advance_data(bytes);
Expand Down
4 changes: 3 additions & 1 deletion python/arcticdb/version_store/_normalization.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,9 @@ def denormalize(self, item, norm_meta):
index_type = pandas_meta.WhichOneof("index_type")
if index_type == "index":
index_meta = pandas_meta.index
if index_meta.is_physically_stored:
# Empty tables don't have `is_physically_stored=True` but we still output them with an empty DateTimeIndex.
is_empty_table_with_datetime_index = len(item) == 0 and not index_meta.step
if index_meta.is_physically_stored or is_empty_table_with_datetime_index:
pandas_indexes = 1
if index_meta.tz:
timezones[0] = index_meta.tz
Expand Down
34 changes: 27 additions & 7 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@
from packaging.version import Version
import arcticdb_ext as ae

from arcticdb.util.arrow import stringify_dictionary_encoded_columns

IS_WINDOWS = sys.platform == "win32"

FlattenResult = namedtuple("FlattenResult", ["is_recursive_normalize_preferred", "metastruct", "to_write"])
Expand Down Expand Up @@ -354,12 +356,17 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ
self._open_mode = open_mode
self._native_cfg = native_cfg
self._runtime_options = runtime_options
self._test_convert_arrow_back_to_pandas = False

def set_output_format(self, output_format: Union[OutputFormat, str]):
if self._runtime_options is None:
self._runtime_options = RuntimeOptions()
self._runtime_options.set_output_format(output_format)

def _set_output_format_for_pipeline_tests(self, output_format):
self.set_output_format(output_format)
self._test_convert_arrow_back_to_pandas = True

@classmethod
def create_store_from_lib_config(cls, lib_cfg, env, open_mode=OpenMode.DELETE, native_cfg=None):
lib = cls.create_lib_from_lib_config(lib_cfg, env, open_mode, native_cfg)
Expand Down Expand Up @@ -740,6 +747,9 @@ def _resolve_dynamic_strings(self, kwargs):
"Windows only supports dynamic_strings=True, using dynamic strings despite configuration or kwarg"
)
dynamic_strings = True
if self._test_convert_arrow_back_to_pandas:
# TODO: Hackery, maybe better to skip
dynamic_strings = True
return dynamic_strings

last_mismatch_msg: Optional[str] = None
Expand Down Expand Up @@ -2432,8 +2442,24 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
record_batches = []
for record_batch in frame_data.extract_record_batches():
record_batches.append(pa.RecordBatch._import_from_c(record_batch.array(), record_batch.schema()))
table = pa.Table.from_batches(record_batches)
if len(record_batches) == 0:
# We get an empty list of record batches when output has no columns
table = pa.Table.from_arrays([])
else:
table = pa.Table.from_batches(record_batches)
data = self._arrow_normalizer.denormalize(table, read_result.norm)
if self._test_convert_arrow_back_to_pandas:
# TODO: Deduplicate with convert_arrow_to_pandas_and_remove_categoricals
data = stringify_dictionary_encoded_columns(data)
for i, name in enumerate(data.column_names):
if pa.types.is_integer(data.column(i).type):
new_col = data.column(i).fill_null(0)
data = data.set_column(i, name, new_col)
# TODO: Copy this to convert_arrow_to_pandas_and_remove_categoricals
if pa.types.is_boolean(data.column(i).type):
new_col = data.column(i).fill_null(False)
data = data.set_column(i, name, new_col)
data = data.to_pandas()
else:
data = self._normalizer.denormalize(read_result.frame_data, read_result.norm)
if read_result.norm.HasField("custom"):
Expand Down Expand Up @@ -2679,9 +2705,6 @@ def add_to_snapshot(
"""
Add items to a snapshot. Will replace if the snapshot already contains an entry for a particular symbol.

Note: attempt to add non-existing symbol or version to a snapshot will not fail, but will have no effect
on the snapshot.

Parameters
----------
snap_name : `str`
Expand All @@ -2699,9 +2722,6 @@ def remove_from_snapshot(self, snap_name: str, symbols: List[str], versions: Lis
"""
Remove items from a snapshot

Note: attempt to remove non-existing symbol or version from a snapshot will not fail, but will have no effect
on the snapshot.

Parameters
----------
snap_name : `str`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)


pytestmark = pytest.mark.pipeline
pytestmark = pytest.mark.pipeline # Covered


@use_of_function_scoped_fixtures_in_hypothesis_checked
Expand All @@ -37,9 +37,10 @@
],
),
)
def test_aggregation_numeric(lmdb_version_store_v1, df):
def test_aggregation_numeric(lmdb_version_store_v1, any_output_format, df):
assume(not df.empty)
lib = lmdb_version_store_v1
lib._set_output_format_for_pipeline_tests(any_output_format)
symbol = "test_aggregation_numeric"
lib.write(symbol, df)

Expand Down Expand Up @@ -71,9 +72,10 @@ def test_aggregation_numeric(lmdb_version_store_v1, df):
],
),
)
def test_aggregation_strings(lmdb_version_store_v1, df):
def test_aggregation_strings(lmdb_version_store_v1, any_output_format, df):
assume(not df.empty)
lib = lmdb_version_store_v1
lib._set_output_format_for_pipeline_tests(any_output_format)
symbol = "test_aggregation_strings"
lib.write(symbol, df)

Expand Down Expand Up @@ -116,12 +118,13 @@ def aggregation_dataframe_list_strategy(draw):
@use_of_function_scoped_fixtures_in_hypothesis_checked
@settings(deadline=None)
@given(dfs=aggregation_dataframe_list_strategy())
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, dfs):
agg_column_dtypes = [df["agg_column"].dtype for df in dfs if "agg_column" in df.columns]
common_agg_type = functools.reduce(valid_common_type, agg_column_dtypes) if len(agg_column_dtypes) > 0 else None
assume(any("grouping_column" in df.columns for df in dfs) and common_agg_type is not None)

lib = lmdb_version_store_dynamic_schema_v1
lib._set_output_format_for_pipeline_tests(any_output_format)
symbol = "test_aggregation_numeric_dynamic"
lib.delete(symbol)
for df in dfs:
Expand Down Expand Up @@ -160,9 +163,10 @@ def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
],
),
)
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, df):
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, df):
assume(len(df) >= 3)
lib = lmdb_version_store_dynamic_schema_v1
lib._set_output_format_for_pipeline_tests(any_output_format)
symbol = "test_aggregation_strings_dynamic"
lib.delete(symbol)
slices = [
Expand Down
35 changes: 12 additions & 23 deletions python/tests/hypothesis/arcticdb/test_resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,10 @@

COLUMN_DTYPE = ["float", "int", "uint"]
ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"]
# Make sure the start date is pre-epoch so that we can test pre-epoch dates. Not all C++ libraries handle pre-epoch well.
MIN_DATE = np.datetime64("1960-01-01")
MAX_DATE = np.datetime64("2025-01-01")
MIN_DATE = np.datetime64("1969-06-01")
MAX_DATE = np.datetime64("1970-06-01")

pytestmark = pytest.mark.pipeline


def dense_row_count_in_resampled_dataframe(df_list, rule):
"""
The number of rows Arctic's resampling will produce after appending all dataframes in `df_list` and then resampling
with `rule`. Assumes df_list is sorted by start date and the indexes are not overlapping.
"""
return (df_list[-1].index[-1] - df_list[0].index[0]).value // pd.Timedelta(rule).value
pytestmark = pytest.mark.pipeline # Covered


@st.composite
Expand Down Expand Up @@ -111,22 +102,22 @@ def freq_fits_in_64_bits(count, unit):
This is used to check if a frequency is usable by Arctic. ArcticDB converts the frequency to signed 64-bit integer.
"""
billion = 1_000_000_000
mult = {"h": 3600 * billion, "min": 60 * billion, "s": billion, "ms": billion // 1000, "us": 1000, "ns": 1}
mult = {"h": 3600 * billion, "min": 60 * billion, "s": billion}
return (mult[unit] * count).bit_length() <= 63


@st.composite
def rule(draw):
count = draw(st.integers(min_value=1, max_value=10_000))
unit = draw(st.sampled_from(["min", "h", "s", "ms", "us", "ns"]))
unit = draw(st.sampled_from(["min", "h", "s"]))
result = f"{count}{unit}"
assume(freq_fits_in_64_bits(count=count, unit=unit))
return result


@st.composite
def offset(draw):
unit = draw(st.sampled_from(["s", "min", "h", "ms", "us", "ns", None]))
unit = draw(st.sampled_from(["s", "min", "h", None]))
if unit is None:
return None
count = draw(st.integers(min_value=1, max_value=100))
Expand Down Expand Up @@ -181,11 +172,9 @@ def dynamic_schema_column_list(draw):
origin=origin(),
offset=offset(),
)
def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
# The assumption below is to avoid OOM-ing the GitHub runners.
assume(dense_row_count_in_resampled_dataframe([df], rule) < 150000)

def test_resample(lmdb_version_store_v1, any_output_format, df, rule, origin, offset):
lib = lmdb_version_store_v1
lib._set_output_format_for_pipeline_tests(any_output_format)
sym = "sym"
logger = get_logger()
logger.info(f"Data frame generated has {df.shape[0]} rows")
Expand Down Expand Up @@ -231,12 +220,12 @@ def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
@use_of_function_scoped_fixtures_in_hypothesis_checked
@given(df_list=dynamic_schema_column_list(), rule=rule(), origin=origin(), offset=offset())
@settings(deadline=None, suppress_health_check=[HealthCheck.data_too_large])
def test_resample_dynamic_schema(lmdb_version_store_dynamic_schema_v1, df_list, rule, origin, offset):
# The assumption below is to avoid OOM-ing the GitHub runners.
assume(dense_row_count_in_resampled_dataframe(df_list, rule) < 150000)

def test_resample_dynamic_schema(
lmdb_version_store_dynamic_schema_v1, any_output_format, df_list, rule, origin, offset
):
common_column_types = compute_common_type_for_columns_in_df_list(df_list)
lib = lmdb_version_store_dynamic_schema_v1
lib._set_output_format_for_pipeline_tests(any_output_format)
lib.version_store.clear()
sym = "sym"
agg = {f"{name}_{op}": (name, op) for name in common_column_types for op in ALL_AGGREGATIONS}
Expand Down
Loading
Loading