Skip to content

Commit 7a80398

Browse files
committed
Parametrize all pipeline tests to run with arrow output format
1 parent f935133 commit 7a80398

20 files changed

+721
-392
lines changed

python/arcticdb/version_store/_store.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
from packaging.version import Version
9191
import arcticdb_ext as ae
9292

93+
from arcticdb.util.arrow import stringify_dictionary_encoded_columns
94+
9395
IS_WINDOWS = sys.platform == "win32"
9496

9597
FlattenResult = namedtuple("FlattenResult", ["is_recursive_normalize_preferred", "metastruct", "to_write"])
@@ -354,12 +356,17 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ
354356
self._open_mode = open_mode
355357
self._native_cfg = native_cfg
356358
self._runtime_options = runtime_options
359+
self._test_convert_arrow_back_to_pandas = False
357360

358361
def set_output_format(self, output_format: Union[OutputFormat, str]):
359362
if self._runtime_options is None:
360363
self._runtime_options = RuntimeOptions()
361364
self._runtime_options.set_output_format(output_format)
362365

366+
def _set_output_format_for_pipeline_tests(self, output_format):
367+
self.set_output_format(output_format)
368+
self._test_convert_arrow_back_to_pandas = True
369+
363370
@classmethod
364371
def create_store_from_lib_config(cls, lib_cfg, env, open_mode=OpenMode.DELETE, native_cfg=None):
365372
lib = cls.create_lib_from_lib_config(lib_cfg, env, open_mode, native_cfg)
@@ -740,6 +747,9 @@ def _resolve_dynamic_strings(self, kwargs):
740747
"Windows only supports dynamic_strings=True, using dynamic strings despite configuration or kwarg"
741748
)
742749
dynamic_strings = True
750+
if self._test_convert_arrow_back_to_pandas:
751+
# TODO: Hackery, maybe better to skip
752+
dynamic_strings = True
743753
return dynamic_strings
744754

745755
last_mismatch_msg: Optional[str] = None
@@ -2434,6 +2444,14 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
24342444
record_batches.append(pa.RecordBatch._import_from_c(record_batch.array(), record_batch.schema()))
24352445
table = pa.Table.from_batches(record_batches)
24362446
data = self._arrow_normalizer.denormalize(table, read_result.norm)
2447+
if self._test_convert_arrow_back_to_pandas:
2448+
# TODO: Deduplicate with convert_arrow_to_pandas_and_remove_categoricals
2449+
data = stringify_dictionary_encoded_columns(data)
2450+
for i, name in enumerate(data.column_names):
2451+
if pa.types.is_integer(data.column(i).type):
2452+
new_col = data.column(i).fill_null(0)
2453+
data = data.set_column(i, name, new_col)
2454+
data = data.to_pandas()
24372455
else:
24382456
data = self._normalizer.denormalize(read_result.frame_data, read_result.norm)
24392457
if read_result.norm.HasField("custom"):
@@ -2679,9 +2697,6 @@ def add_to_snapshot(
26792697
"""
26802698
Add items to a snapshot. Will replace if the snapshot already contains an entry for a particular symbol.
26812699
2682-
Note: attempt to add non-existing symbol or version to a snapshot will not fail, but will have no effect
2683-
on the snapshot.
2684-
26852700
Parameters
26862701
----------
26872702
snap_name : `str`
@@ -2699,9 +2714,6 @@ def remove_from_snapshot(self, snap_name: str, symbols: List[str], versions: Lis
26992714
"""
27002715
Remove items from a snapshot
27012716
2702-
Note: attempt to remove non-existing symbol or version from a snapshot will not fail, but will have no effect
2703-
on the snapshot.
2704-
27052717
Parameters
27062718
----------
27072719
snap_name : `str`

python/tests/hypothesis/arcticdb/test_aggregation_hypothesis.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
)
2525

2626

27-
pytestmark = pytest.mark.pipeline
27+
pytestmark = pytest.mark.pipeline # Covered
2828

2929

3030
@use_of_function_scoped_fixtures_in_hypothesis_checked
@@ -37,9 +37,10 @@
3737
],
3838
),
3939
)
40-
def test_aggregation_numeric(lmdb_version_store_v1, df):
40+
def test_aggregation_numeric(lmdb_version_store_v1, any_output_format, df):
4141
assume(not df.empty)
4242
lib = lmdb_version_store_v1
43+
lib._set_output_format_for_pipeline_tests(any_output_format)
4344
symbol = "test_aggregation_numeric"
4445
lib.write(symbol, df)
4546

@@ -71,9 +72,10 @@ def test_aggregation_numeric(lmdb_version_store_v1, df):
7172
],
7273
),
7374
)
74-
def test_aggregation_strings(lmdb_version_store_v1, df):
75+
def test_aggregation_strings(lmdb_version_store_v1, any_output_format, df):
7576
assume(not df.empty)
7677
lib = lmdb_version_store_v1
78+
lib._set_output_format_for_pipeline_tests(any_output_format)
7779
symbol = "test_aggregation_strings"
7880
lib.write(symbol, df)
7981

@@ -116,12 +118,13 @@ def aggregation_dataframe_list_strategy(draw):
116118
@use_of_function_scoped_fixtures_in_hypothesis_checked
117119
@settings(deadline=None)
118120
@given(dfs=aggregation_dataframe_list_strategy())
119-
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
121+
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, dfs):
120122
agg_column_dtypes = [df["agg_column"].dtype for df in dfs if "agg_column" in df.columns]
121123
common_agg_type = functools.reduce(valid_common_type, agg_column_dtypes) if len(agg_column_dtypes) > 0 else None
122124
assume(any("grouping_column" in df.columns for df in dfs) and common_agg_type is not None)
123125

124126
lib = lmdb_version_store_dynamic_schema_v1
127+
lib._set_output_format_for_pipeline_tests(any_output_format)
125128
symbol = "test_aggregation_numeric_dynamic"
126129
lib.delete(symbol)
127130
for df in dfs:
@@ -160,9 +163,10 @@ def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
160163
],
161164
),
162165
)
163-
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, df):
166+
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, df):
164167
assume(len(df) >= 3)
165168
lib = lmdb_version_store_dynamic_schema_v1
169+
lib._set_output_format_for_pipeline_tests(any_output_format)
166170
symbol = "test_aggregation_strings_dynamic"
167171
lib.delete(symbol)
168172
slices = [

python/tests/hypothesis/arcticdb/test_resample.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,10 @@
1818

1919
COLUMN_DTYPE = ["float", "int", "uint"]
2020
ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"]
21-
# Make sure the start date is pre-epoch so that we can test pre-epoch dates. Not all C++ libraries handle pre-epoch well.
22-
MIN_DATE = np.datetime64("1960-01-01")
23-
MAX_DATE = np.datetime64("2025-01-01")
21+
MIN_DATE = np.datetime64("1969-06-01")
22+
MAX_DATE = np.datetime64("1970-06-01")
2423

25-
pytestmark = pytest.mark.pipeline
26-
27-
28-
def dense_row_count_in_resampled_dataframe(df_list, rule):
29-
"""
30-
The number of rows Arctic's resampling will produce after appending all dataframes in `df_list` and then resampling
31-
with `rule`. Assumes df_list is sorted by start date and the indexes are not overlapping.
32-
"""
33-
return (df_list[-1].index[-1] - df_list[0].index[0]).value // pd.Timedelta(rule).value
24+
pytestmark = pytest.mark.pipeline # Covered
3425

3526

3627
@st.composite
@@ -111,22 +102,22 @@ def freq_fits_in_64_bits(count, unit):
111102
This is used to check if a frequency is usable by Arctic. ArcticDB converts the frequency to signed 64-bit integer.
112103
"""
113104
billion = 1_000_000_000
114-
mult = {"h": 3600 * billion, "min": 60 * billion, "s": billion, "ms": billion // 1000, "us": 1000, "ns": 1}
105+
mult = {"h": 3600 * billion, "min": 60 * billion, "s": billion}
115106
return (mult[unit] * count).bit_length() <= 63
116107

117108

118109
@st.composite
119110
def rule(draw):
120111
count = draw(st.integers(min_value=1, max_value=10_000))
121-
unit = draw(st.sampled_from(["min", "h", "s", "ms", "us", "ns"]))
112+
unit = draw(st.sampled_from(["min", "h", "s"]))
122113
result = f"{count}{unit}"
123114
assume(freq_fits_in_64_bits(count=count, unit=unit))
124115
return result
125116

126117

127118
@st.composite
128119
def offset(draw):
129-
unit = draw(st.sampled_from(["s", "min", "h", "ms", "us", "ns", None]))
120+
unit = draw(st.sampled_from(["s", "min", "h", None]))
130121
if unit is None:
131122
return None
132123
count = draw(st.integers(min_value=1, max_value=100))
@@ -181,11 +172,9 @@ def dynamic_schema_column_list(draw):
181172
origin=origin(),
182173
offset=offset(),
183174
)
184-
def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
185-
# The assumption below is to avoid OOM-ing the GitHub runners.
186-
assume(dense_row_count_in_resampled_dataframe([df], rule) < 150000)
187-
175+
def test_resample(lmdb_version_store_v1, any_output_format, df, rule, origin, offset):
188176
lib = lmdb_version_store_v1
177+
lib._set_output_format_for_pipeline_tests(any_output_format)
189178
sym = "sym"
190179
logger = get_logger()
191180
logger.info(f"Data frame generated has {df.shape[0]} rows")
@@ -231,12 +220,12 @@ def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
231220
@use_of_function_scoped_fixtures_in_hypothesis_checked
232221
@given(df_list=dynamic_schema_column_list(), rule=rule(), origin=origin(), offset=offset())
233222
@settings(deadline=None, suppress_health_check=[HealthCheck.data_too_large])
234-
def test_resample_dynamic_schema(lmdb_version_store_dynamic_schema_v1, df_list, rule, origin, offset):
235-
# The assumption below is to avoid OOM-ing the GitHub runners.
236-
assume(dense_row_count_in_resampled_dataframe(df_list, rule) < 150000)
237-
223+
def test_resample_dynamic_schema(
224+
lmdb_version_store_dynamic_schema_v1, any_output_format, df_list, rule, origin, offset
225+
):
238226
common_column_types = compute_common_type_for_columns_in_df_list(df_list)
239227
lib = lmdb_version_store_dynamic_schema_v1
228+
lib._set_output_format_for_pipeline_tests(any_output_format)
240229
lib.version_store.clear()
241230
sym = "sym"
242231
agg = {f"{name}_{op}": (name, op) for name in common_column_types for op in ALL_AGGREGATIONS}

python/tests/integration/arcticdb/version_store/test_basic_version_store.py

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,14 @@
2727
ArcticDbNotYetImplemented,
2828
InternalException,
2929
UserInputException,
30-
ArcticException,
3130
)
3231
from arcticdb import QueryBuilder
3332
from arcticdb.flattener import Flattener
3433
from arcticdb.version_store import NativeVersionStore
3534
from arcticdb.version_store._store import VersionedItem
3635
from arcticdb_ext.exceptions import _ArcticLegacyCompatibilityException, StorageException
3736
from arcticdb_ext.storage import KeyType, NoDataFoundException
38-
from arcticdb_ext.version_store import (
39-
NoSuchVersionException,
40-
StreamDescriptorMismatch,
41-
ManualClockVersionStore,
42-
DataError,
43-
)
37+
from arcticdb_ext.version_store import NoSuchVersionException, StreamDescriptorMismatch, ManualClockVersionStore
4438
from arcticdb.util.test import (
4539
sample_dataframe,
4640
sample_dataframe_only_strings,
@@ -50,12 +44,10 @@
5044
config_context,
5145
distinct_timestamps,
5246
)
53-
from tests.conftest import Marks
5447
from tests.util.date import DateRange
5548
from arcticdb.util.test import equals
5649
from arcticdb.version_store._store import resolve_defaults
5750
from tests.util.mark import MACOS, MACOS_WHEEL_BUILD, xfail_azure_chars
58-
from tests.util.marking import marks
5951

6052

6153
@pytest.fixture()
@@ -830,9 +822,11 @@ def test_range_index(basic_store, sym):
830822
assert_equal(expected, vit.data)
831823

832824

825+
@pytest.mark.pipeline # Covered
833826
@pytest.mark.parametrize("use_date_range_clause", [True, False])
834-
@marks([Marks.pipeline, Marks.storage])
835-
def test_date_range(basic_store, use_date_range_clause):
827+
@pytest.mark.storage
828+
def test_date_range(basic_store, use_date_range_clause, any_output_format):
829+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
836830
initial_timestamp = pd.Timestamp("2019-01-01")
837831
df = pd.DataFrame(data=np.arange(100), index=pd.date_range(initial_timestamp, periods=100))
838832
sym = "date_test"
@@ -878,9 +872,11 @@ def test_date_range(basic_store, use_date_range_clause):
878872
assert data_closed[data_closed.columns[0]][-1] == end_offset
879873

880874

875+
@pytest.mark.pipeline # Covered
881876
@pytest.mark.parametrize("use_date_range_clause", [True, False])
882-
@marks([Marks.pipeline, Marks.storage])
883-
def test_date_range_none(basic_store, use_date_range_clause):
877+
@pytest.mark.storage
878+
def test_date_range_none(basic_store, use_date_range_clause, any_output_format):
879+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
884880
sym = "date_test2"
885881
rows = 100
886882
initial_timestamp = pd.Timestamp("2019-01-01")
@@ -897,9 +893,11 @@ def test_date_range_none(basic_store, use_date_range_clause):
897893
assert len(data) == rows
898894

899895

896+
@pytest.mark.pipeline # Covered
900897
@pytest.mark.parametrize("use_date_range_clause", [True, False])
901-
@marks([Marks.pipeline, Marks.storage])
902-
def test_date_range_start_equals_end(basic_store, use_date_range_clause):
898+
@pytest.mark.storage
899+
def test_date_range_start_equals_end(basic_store, use_date_range_clause, any_output_format):
900+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
903901
sym = "date_test2"
904902
rows = 100
905903
initial_timestamp = pd.Timestamp("2019-01-01")
@@ -919,10 +917,12 @@ def test_date_range_start_equals_end(basic_store, use_date_range_clause):
919917
assert data[data.columns[0]][0] == start_offset
920918

921919

920+
@pytest.mark.pipeline # Covered
922921
@pytest.mark.parametrize("use_date_range_clause", [True, False])
923-
@marks([Marks.pipeline, Marks.storage])
924-
def test_date_range_row_sliced(basic_store_tiny_segment, use_date_range_clause):
922+
@pytest.mark.storage
923+
def test_date_range_row_sliced(basic_store_tiny_segment, use_date_range_clause, any_output_format):
925924
lib = basic_store_tiny_segment
925+
lib._set_output_format_for_pipeline_tests(any_output_format)
926926
sym = "test_date_range_row_sliced"
927927
# basic_store_tiny_segment produces 2x2 segments
928928
num_rows = 6
@@ -1657,7 +1657,7 @@ def test_batch_write_then_list_symbol_without_cache(basic_store_factory):
16571657
assert set(lib.list_symbols()) == set(symbols)
16581658

16591659

1660-
@marks([Marks.storage, Marks.dedup])
1660+
@pytest.mark.storage
16611661
def test_batch_write_missing_keys_dedup(basic_store_factory):
16621662
"""When there is duplicate data to reuse for the current write, we need to access the index key of the previous
16631663
versions in order to refer to the corresponding keys for the deduplicated data."""
@@ -2217,26 +2217,6 @@ def test_batch_read_meta_multiple_versions(object_version_store):
22172217
assert results_dict["sym3"][0].metadata == {"meta3": 1}
22182218
assert results_dict["sym2"][3].metadata == {"meta2": 4}
22192219

2220-
# We can supply only an array of symbols, including repeating symbols
2221-
results_dict = lib.batch_read_metadata_multi(["sym1", "sym2", "sym1", "sym3", "sym2", "sym1", "sym1"])
2222-
assert results_dict["sym1"][2].metadata == {"meta1": 3}
2223-
assert len(results_dict["sym1"]) == 1
2224-
assert results_dict["sym2"][3].metadata == {"meta2": 4}
2225-
assert results_dict["sym3"][0].metadata == {"meta3": 1}
2226-
2227-
# The lists are of different sizr
2228-
with pytest.raises(ArcticException):
2229-
results_dict = lib.batch_read_metadata_multi(["sym1", "sym2"], [0, 0, -2])
2230-
2231-
# With negative number we can go back from current versions
2232-
assert lib.batch_read_metadata_multi(["sym1", "sym1"], [-1, -2]) == lib.batch_read_metadata_multi(
2233-
["sym1", "sym1"], [2, 1]
2234-
)
2235-
2236-
# Check DataError is thrown when requesting non-existing version
2237-
with pytest.raises(TypeError): # Not a good error though - issue 10070002655
2238-
results_dict = lib.batch_read_metadata_multi(["sym1"], [10])
2239-
22402220

22412221
@pytest.mark.storage
22422222
def test_list_symbols(basic_store):
@@ -2746,10 +2726,12 @@ def test_batch_append_with_throw_exception(basic_store, three_col_df):
27462726
)
27472727

27482728

2729+
@pytest.mark.pipeline # Covered
27492730
@pytest.mark.parametrize("use_date_range_clause", [True, False])
2750-
@marks([Marks.pipeline, Marks.storage])
2751-
def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_range_clause):
2731+
@pytest.mark.storage
2732+
def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_range_clause, any_output_format):
27522733
lmdb_version_store = basic_store_tombstone_and_sync_passive
2734+
lmdb_version_store._set_output_format_for_pipeline_tests(any_output_format)
27532735
symbols = []
27542736
for i in range(5):
27552737
symbols.append("sym_{}".format(i))
@@ -2788,7 +2770,6 @@ def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_
27882770

27892771

27902772
@pytest.mark.parametrize("use_row_range_clause", [True, False])
2791-
@marks([Marks.pipeline])
27922773
def test_batch_read_row_range(lmdb_version_store_v1, use_row_range_clause):
27932774
lib = lmdb_version_store_v1
27942775
num_symbols = 5

0 commit comments

Comments
 (0)