Skip to content

Commit 052433f

Browse files
committed
Parametrize all pipeline tests to run with arrow output format
1 parent 3ebb24a commit 052433f

20 files changed

+725
-392
lines changed

python/arcticdb/version_store/_store.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
from packaging.version import Version
9191
import arcticdb_ext as ae
9292

93+
from arcticdb.util.arrow import stringify_dictionary_encoded_columns
94+
9395
IS_WINDOWS = sys.platform == "win32"
9496

9597
FlattenResult = namedtuple("FlattenResult", ["is_recursive_normalize_preferred", "metastruct", "to_write"])
@@ -354,12 +356,17 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ
354356
self._open_mode = open_mode
355357
self._native_cfg = native_cfg
356358
self._runtime_options = runtime_options
359+
self._test_convert_arrow_back_to_pandas = False
357360

358361
def set_output_format(self, output_format: Union[OutputFormat, str]):
359362
if self._runtime_options is None:
360363
self._runtime_options = RuntimeOptions()
361364
self._runtime_options.set_output_format(output_format)
362365

366+
def _set_output_format_for_pipeline_tests(self, output_format):
367+
self.set_output_format(output_format)
368+
self._test_convert_arrow_back_to_pandas = True
369+
363370
@classmethod
364371
def create_store_from_lib_config(cls, lib_cfg, env, open_mode=OpenMode.DELETE, native_cfg=None):
365372
lib = cls.create_lib_from_lib_config(lib_cfg, env, open_mode, native_cfg)
@@ -740,6 +747,9 @@ def _resolve_dynamic_strings(self, kwargs):
740747
"Windows only supports dynamic_strings=True, using dynamic strings despite configuration or kwarg"
741748
)
742749
dynamic_strings = True
750+
if self._test_convert_arrow_back_to_pandas:
751+
# TODO: Hackery, maybe better to skip
752+
dynamic_strings = True
743753
return dynamic_strings
744754

745755
last_mismatch_msg: Optional[str] = None
@@ -2438,6 +2448,18 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
24382448
else:
24392449
table = pa.Table.from_batches(record_batches)
24402450
data = self._arrow_normalizer.denormalize(table, read_result.norm)
2451+
if self._test_convert_arrow_back_to_pandas:
2452+
# TODO: Deduplicate with convert_arrow_to_pandas_and_remove_categoricals
2453+
data = stringify_dictionary_encoded_columns(data)
2454+
for i, name in enumerate(data.column_names):
2455+
if pa.types.is_integer(data.column(i).type):
2456+
new_col = data.column(i).fill_null(0)
2457+
data = data.set_column(i, name, new_col)
2458+
# TODO: Copy this to convert_arrow_to_pandas_and_remove_categoricals
2459+
if pa.types.is_boolean(data.column(i).type):
2460+
new_col = data.column(i).fill_null(False)
2461+
data = data.set_column(i, name, new_col)
2462+
data = data.to_pandas()
24412463
else:
24422464
data = self._normalizer.denormalize(read_result.frame_data, read_result.norm)
24432465
if read_result.norm.HasField("custom"):
@@ -2683,9 +2705,6 @@ def add_to_snapshot(
26832705
"""
26842706
Add items to a snapshot. Will replace if the snapshot already contains an entry for a particular symbol.
26852707
2686-
Note: attempt to add non-existing symbol or version to a snapshot will not fail, but will have no effect
2687-
on the snapshot.
2688-
26892708
Parameters
26902709
----------
26912710
snap_name : `str`
@@ -2703,9 +2722,6 @@ def remove_from_snapshot(self, snap_name: str, symbols: List[str], versions: Lis
27032722
"""
27042723
Remove items from a snapshot
27052724
2706-
Note: attempt to remove non-existing symbol or version from a snapshot will not fail, but will have no effect
2707-
on the snapshot.
2708-
27092725
Parameters
27102726
----------
27112727
snap_name : `str`

python/tests/hypothesis/arcticdb/test_aggregation_hypothesis.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
)
2525

2626

27-
pytestmark = pytest.mark.pipeline
27+
pytestmark = pytest.mark.pipeline # Covered
2828

2929

3030
@use_of_function_scoped_fixtures_in_hypothesis_checked
@@ -37,9 +37,10 @@
3737
],
3838
),
3939
)
40-
def test_aggregation_numeric(lmdb_version_store_v1, df):
40+
def test_aggregation_numeric(lmdb_version_store_v1, any_output_format, df):
4141
assume(not df.empty)
4242
lib = lmdb_version_store_v1
43+
lib._set_output_format_for_pipeline_tests(any_output_format)
4344
symbol = "test_aggregation_numeric"
4445
lib.write(symbol, df)
4546

@@ -71,9 +72,10 @@ def test_aggregation_numeric(lmdb_version_store_v1, df):
7172
],
7273
),
7374
)
74-
def test_aggregation_strings(lmdb_version_store_v1, df):
75+
def test_aggregation_strings(lmdb_version_store_v1, any_output_format, df):
7576
assume(not df.empty)
7677
lib = lmdb_version_store_v1
78+
lib._set_output_format_for_pipeline_tests(any_output_format)
7779
symbol = "test_aggregation_strings"
7880
lib.write(symbol, df)
7981

@@ -116,12 +118,13 @@ def aggregation_dataframe_list_strategy(draw):
116118
@use_of_function_scoped_fixtures_in_hypothesis_checked
117119
@settings(deadline=None)
118120
@given(dfs=aggregation_dataframe_list_strategy())
119-
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
121+
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, dfs):
120122
agg_column_dtypes = [df["agg_column"].dtype for df in dfs if "agg_column" in df.columns]
121123
common_agg_type = functools.reduce(valid_common_type, agg_column_dtypes) if len(agg_column_dtypes) > 0 else None
122124
assume(any("grouping_column" in df.columns for df in dfs) and common_agg_type is not None)
123125

124126
lib = lmdb_version_store_dynamic_schema_v1
127+
lib._set_output_format_for_pipeline_tests(any_output_format)
125128
symbol = "test_aggregation_numeric_dynamic"
126129
lib.delete(symbol)
127130
for df in dfs:
@@ -160,9 +163,10 @@ def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
160163
],
161164
),
162165
)
163-
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, df):
166+
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, df):
164167
assume(len(df) >= 3)
165168
lib = lmdb_version_store_dynamic_schema_v1
169+
lib._set_output_format_for_pipeline_tests(any_output_format)
166170
symbol = "test_aggregation_strings_dynamic"
167171
lib.delete(symbol)
168172
slices = [

python/tests/hypothesis/arcticdb/test_resample.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,10 @@
1818

1919
COLUMN_DTYPE = ["float", "int", "uint"]
2020
ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"]
21-
# Make sure the start date is pre-epoch so that we can test pre-epoch dates. Not all C++ libraries handle pre-epoch well.
22-
MIN_DATE = np.datetime64("1960-01-01")
23-
MAX_DATE = np.datetime64("2025-01-01")
21+
MIN_DATE = np.datetime64("1969-06-01")
22+
MAX_DATE = np.datetime64("1970-06-01")
2423

25-
pytestmark = pytest.mark.pipeline
26-
27-
28-
def dense_row_count_in_resampled_dataframe(df_list, rule):
29-
"""
30-
The number of rows Arctic's resampling will produce after appending all dataframes in `df_list` and then resampling
31-
with `rule`. Assumes df_list is sorted by start date and the indexes are not overlapping.
32-
"""
33-
return (df_list[-1].index[-1] - df_list[0].index[0]).value // pd.Timedelta(rule).value
24+
pytestmark = pytest.mark.pipeline # Covered
3425

3526

3627
@st.composite
@@ -111,22 +102,22 @@ def freq_fits_in_64_bits(count, unit):
111102
This is used to check if a frequency is usable by Arctic. ArcticDB converts the frequency to signed 64-bit integer.
112103
"""
113104
billion = 1_000_000_000
114-
mult = {"h": 3600 * billion, "min": 60 * billion, "s": billion, "ms": billion // 1000, "us": 1000, "ns": 1}
105+
mult = {"h": 3600 * billion, "min": 60 * billion, "s": billion}
115106
return (mult[unit] * count).bit_length() <= 63
116107

117108

118109
@st.composite
119110
def rule(draw):
120111
count = draw(st.integers(min_value=1, max_value=10_000))
121-
unit = draw(st.sampled_from(["min", "h", "s", "ms", "us", "ns"]))
112+
unit = draw(st.sampled_from(["min", "h", "s"]))
122113
result = f"{count}{unit}"
123114
assume(freq_fits_in_64_bits(count=count, unit=unit))
124115
return result
125116

126117

127118
@st.composite
128119
def offset(draw):
129-
unit = draw(st.sampled_from(["s", "min", "h", "ms", "us", "ns", None]))
120+
unit = draw(st.sampled_from(["s", "min", "h", None]))
130121
if unit is None:
131122
return None
132123
count = draw(st.integers(min_value=1, max_value=100))
@@ -181,11 +172,9 @@ def dynamic_schema_column_list(draw):
181172
origin=origin(),
182173
offset=offset(),
183174
)
184-
def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
185-
# The assumption below is to avoid OOM-ing the GitHub runners.
186-
assume(dense_row_count_in_resampled_dataframe([df], rule) < 150000)
187-
175+
def test_resample(lmdb_version_store_v1, any_output_format, df, rule, origin, offset):
188176
lib = lmdb_version_store_v1
177+
lib._set_output_format_for_pipeline_tests(any_output_format)
189178
sym = "sym"
190179
logger = get_logger()
191180
logger.info(f"Data frame generated has {df.shape[0]} rows")
@@ -231,12 +220,12 @@ def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
231220
@use_of_function_scoped_fixtures_in_hypothesis_checked
232221
@given(df_list=dynamic_schema_column_list(), rule=rule(), origin=origin(), offset=offset())
233222
@settings(deadline=None, suppress_health_check=[HealthCheck.data_too_large])
234-
def test_resample_dynamic_schema(lmdb_version_store_dynamic_schema_v1, df_list, rule, origin, offset):
235-
# The assumption below is to avoid OOM-ing the GitHub runners.
236-
assume(dense_row_count_in_resampled_dataframe(df_list, rule) < 150000)
237-
223+
def test_resample_dynamic_schema(
224+
lmdb_version_store_dynamic_schema_v1, any_output_format, df_list, rule, origin, offset
225+
):
238226
common_column_types = compute_common_type_for_columns_in_df_list(df_list)
239227
lib = lmdb_version_store_dynamic_schema_v1
228+
lib._set_output_format_for_pipeline_tests(any_output_format)
240229
lib.version_store.clear()
241230
sym = "sym"
242231
agg = {f"{name}_{op}": (name, op) for name in common_column_types for op in ALL_AGGREGATIONS}

python/tests/integration/arcticdb/version_store/test_basic_version_store.py

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
ArcticDbNotYetImplemented,
2828
InternalException,
2929
UserInputException,
30-
ArcticException,
3130
)
3231
from arcticdb import QueryBuilder
3332
from arcticdb.flattener import Flattener
@@ -36,12 +35,7 @@
3635
from arcticdb.version_store._store import VersionedItem
3736
from arcticdb_ext.exceptions import _ArcticLegacyCompatibilityException, StorageException
3837
from arcticdb_ext.storage import KeyType, NoDataFoundException
39-
from arcticdb_ext.version_store import (
40-
NoSuchVersionException,
41-
StreamDescriptorMismatch,
42-
ManualClockVersionStore,
43-
DataError,
44-
)
38+
from arcticdb_ext.version_store import NoSuchVersionException, StreamDescriptorMismatch, ManualClockVersionStore
4539
from arcticdb.util.test import (
4640
sample_dataframe,
4741
sample_dataframe_only_strings,
@@ -51,12 +45,10 @@
5145
config_context,
5246
distinct_timestamps,
5347
)
54-
from tests.conftest import Marks
5548
from tests.util.date import DateRange
5649
from arcticdb.util.test import equals
5750
from arcticdb.version_store._store import resolve_defaults
5851
from tests.util.mark import MACOS, MACOS_WHEEL_BUILD, xfail_azure_chars
59-
from tests.util.marking import marks
6052

6153

6254
@pytest.fixture()
@@ -847,9 +839,11 @@ def test_range_index(basic_store, sym):
847839
assert_equal(expected, vit.data)
848840

849841

842+
@pytest.mark.pipeline # Covered
850843
@pytest.mark.parametrize("use_date_range_clause", [True, False])
851-
@marks([Marks.pipeline, Marks.storage])
852-
def test_date_range(basic_store, use_date_range_clause):
844+
@pytest.mark.storage
845+
def test_date_range(basic_store, use_date_range_clause, any_output_format):
846+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
853847
initial_timestamp = pd.Timestamp("2019-01-01")
854848
df = pd.DataFrame(data=np.arange(100), index=pd.date_range(initial_timestamp, periods=100))
855849
sym = "date_test"
@@ -895,9 +889,11 @@ def test_date_range(basic_store, use_date_range_clause):
895889
assert data_closed[data_closed.columns[0]][-1] == end_offset
896890

897891

892+
@pytest.mark.pipeline # Covered
898893
@pytest.mark.parametrize("use_date_range_clause", [True, False])
899-
@marks([Marks.pipeline, Marks.storage])
900-
def test_date_range_none(basic_store, use_date_range_clause):
894+
@pytest.mark.storage
895+
def test_date_range_none(basic_store, use_date_range_clause, any_output_format):
896+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
901897
sym = "date_test2"
902898
rows = 100
903899
initial_timestamp = pd.Timestamp("2019-01-01")
@@ -914,9 +910,11 @@ def test_date_range_none(basic_store, use_date_range_clause):
914910
assert len(data) == rows
915911

916912

913+
@pytest.mark.pipeline # Covered
917914
@pytest.mark.parametrize("use_date_range_clause", [True, False])
918-
@marks([Marks.pipeline, Marks.storage])
919-
def test_date_range_start_equals_end(basic_store, use_date_range_clause):
915+
@pytest.mark.storage
916+
def test_date_range_start_equals_end(basic_store, use_date_range_clause, any_output_format):
917+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
920918
sym = "date_test2"
921919
rows = 100
922920
initial_timestamp = pd.Timestamp("2019-01-01")
@@ -936,10 +934,12 @@ def test_date_range_start_equals_end(basic_store, use_date_range_clause):
936934
assert data[data.columns[0]][0] == start_offset
937935

938936

937+
@pytest.mark.pipeline # Covered
939938
@pytest.mark.parametrize("use_date_range_clause", [True, False])
940-
@marks([Marks.pipeline, Marks.storage])
941-
def test_date_range_row_sliced(basic_store_tiny_segment, use_date_range_clause):
939+
@pytest.mark.storage
940+
def test_date_range_row_sliced(basic_store_tiny_segment, use_date_range_clause, any_output_format):
942941
lib = basic_store_tiny_segment
942+
lib._set_output_format_for_pipeline_tests(any_output_format)
943943
sym = "test_date_range_row_sliced"
944944
# basic_store_tiny_segment produces 2x2 segments
945945
num_rows = 6
@@ -1705,7 +1705,7 @@ def test_batch_write_then_list_symbol_without_cache(basic_store_factory):
17051705
assert set(lib.list_symbols()) == set(symbols)
17061706

17071707

1708-
@marks([Marks.storage, Marks.dedup])
1708+
@pytest.mark.storage
17091709
def test_batch_write_missing_keys_dedup(basic_store_factory):
17101710
"""When there is duplicate data to reuse for the current write, we need to access the index key of the previous
17111711
versions in order to refer to the corresponding keys for the deduplicated data."""
@@ -2265,26 +2265,6 @@ def test_batch_read_meta_multiple_versions(object_version_store):
22652265
assert results_dict["sym3"][0].metadata == {"meta3": 1}
22662266
assert results_dict["sym2"][3].metadata == {"meta2": 4}
22672267

2268-
# We can supply only an array of symbols, including repeating symbols
2269-
results_dict = lib.batch_read_metadata_multi(["sym1", "sym2", "sym1", "sym3", "sym2", "sym1", "sym1"])
2270-
assert results_dict["sym1"][2].metadata == {"meta1": 3}
2271-
assert len(results_dict["sym1"]) == 1
2272-
assert results_dict["sym2"][3].metadata == {"meta2": 4}
2273-
assert results_dict["sym3"][0].metadata == {"meta3": 1}
2274-
2275-
# The lists are of different sizr
2276-
with pytest.raises(ArcticException):
2277-
results_dict = lib.batch_read_metadata_multi(["sym1", "sym2"], [0, 0, -2])
2278-
2279-
# With negative number we can go back from current versions
2280-
assert lib.batch_read_metadata_multi(["sym1", "sym1"], [-1, -2]) == lib.batch_read_metadata_multi(
2281-
["sym1", "sym1"], [2, 1]
2282-
)
2283-
2284-
# Check DataError is thrown when requesting non-existing version
2285-
with pytest.raises(TypeError): # Not a good error though - issue 10070002655
2286-
results_dict = lib.batch_read_metadata_multi(["sym1"], [10])
2287-
22882268

22892269
@pytest.mark.storage
22902270
def test_list_symbols(basic_store):
@@ -2830,10 +2810,12 @@ def test_batch_append_with_throw_exception(basic_store, three_col_df):
28302810
)
28312811

28322812

2813+
@pytest.mark.pipeline # Covered
28332814
@pytest.mark.parametrize("use_date_range_clause", [True, False])
2834-
@marks([Marks.pipeline, Marks.storage])
2835-
def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_range_clause):
2815+
@pytest.mark.storage
2816+
def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_range_clause, any_output_format):
28362817
lmdb_version_store = basic_store_tombstone_and_sync_passive
2818+
lmdb_version_store._set_output_format_for_pipeline_tests(any_output_format)
28372819
symbols = []
28382820
for i in range(5):
28392821
symbols.append("sym_{}".format(i))
@@ -2872,7 +2854,6 @@ def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_
28722854

28732855

28742856
@pytest.mark.parametrize("use_row_range_clause", [True, False])
2875-
@marks([Marks.pipeline])
28762857
def test_batch_read_row_range(lmdb_version_store_v1, use_row_range_clause):
28772858
lib = lmdb_version_store_v1
28782859
num_symbols = 5

0 commit comments

Comments
 (0)