Skip to content

Commit d6e7c85

Browse files
committed
Parametrize all pipeline tests to run with arrow output format
1 parent d21054d commit d6e7c85

20 files changed

+659
-334
lines changed

python/arcticdb/version_store/_store.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@
8888
from packaging.version import Version
8989
import arcticdb_ext as ae
9090

91+
from arcticdb.util.arrow import stringify_dictionary_encoded_columns
92+
9193
IS_WINDOWS = sys.platform == "win32"
9294

9395

@@ -348,12 +350,17 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ
348350
self._open_mode = open_mode
349351
self._native_cfg = native_cfg
350352
self._runtime_options=runtime_options
353+
self._test_convert_arrow_back_to_pandas = False
351354

352355
def set_output_format(self, output_format: Union[OutputFormat, str]):
353356
if self._runtime_options is None:
354357
self._runtime_options = RuntimeOptions()
355358
self._runtime_options.set_output_format(output_format)
356359

360+
def _set_output_format_for_pipeline_tests(self, output_format):
361+
self.set_output_format(output_format)
362+
self._test_convert_arrow_back_to_pandas = True
363+
357364
@classmethod
358365
def create_store_from_lib_config(cls, lib_cfg, env, open_mode=OpenMode.DELETE, native_cfg=None):
359366
lib = cls.create_lib_from_lib_config(lib_cfg, env, open_mode, native_cfg)
@@ -723,6 +730,9 @@ def _resolve_dynamic_strings(self, kwargs):
723730
"Windows only supports dynamic_strings=True, using dynamic strings despite configuration or kwarg"
724731
)
725732
dynamic_strings = True
733+
if self._test_convert_arrow_back_to_pandas:
734+
# TODO: Hackery, maybe better to skip
735+
dynamic_strings = True
726736
return dynamic_strings
727737

728738
last_mismatch_msg: Optional[str] = None
@@ -2400,6 +2410,8 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
24002410
record_batches.append(pa.RecordBatch._import_from_c(record_batch.array(), record_batch.schema()))
24012411
table = pa.Table.from_batches(record_batches)
24022412
data = self._arrow_normalizer.denormalize(table, read_result.norm)
2413+
if self._test_convert_arrow_back_to_pandas:
2414+
data = stringify_dictionary_encoded_columns(data).to_pandas()
24032415
else:
24042416
data = self._normalizer.denormalize(read_result.frame_data, read_result.norm)
24052417
if read_result.norm.HasField("custom"):

python/tests/hypothesis/arcticdb/test_aggregation_hypothesis.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
)
2424

2525

26-
pytestmark = pytest.mark.pipeline
26+
pytestmark = pytest.mark.pipeline # Covered
2727

2828

2929
@use_of_function_scoped_fixtures_in_hypothesis_checked
@@ -36,9 +36,10 @@
3636
],
3737
),
3838
)
39-
def test_aggregation_numeric(lmdb_version_store_v1, df):
39+
def test_aggregation_numeric(lmdb_version_store_v1, any_output_format, df):
4040
assume(not df.empty)
4141
lib = lmdb_version_store_v1
42+
lib._set_output_format_for_pipeline_tests(any_output_format)
4243
symbol = "test_aggregation_numeric"
4344
lib.write(symbol, df)
4445

@@ -70,9 +71,10 @@ def test_aggregation_numeric(lmdb_version_store_v1, df):
7071
],
7172
),
7273
)
73-
def test_aggregation_strings(lmdb_version_store_v1, df):
74+
def test_aggregation_strings(lmdb_version_store_v1, any_output_format, df):
7475
assume(not df.empty)
7576
lib = lmdb_version_store_v1
77+
lib._set_output_format_for_pipeline_tests(any_output_format)
7678
symbol = "test_aggregation_strings"
7779
lib.write(symbol, df)
7880

@@ -112,12 +114,13 @@ def aggregation_dataframe_list_strategy(draw):
112114
@use_of_function_scoped_fixtures_in_hypothesis_checked
113115
@settings(deadline=None)
114116
@given(dfs=aggregation_dataframe_list_strategy())
115-
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
117+
def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, dfs):
116118
agg_column_dtypes = [df['agg_column'].dtype for df in dfs if 'agg_column' in df.columns]
117119
common_agg_type = functools.reduce(valid_common_type, agg_column_dtypes) if len(agg_column_dtypes) > 0 else None
118120
assume(any('grouping_column' in df.columns for df in dfs) and common_agg_type is not None)
119121

120122
lib = lmdb_version_store_dynamic_schema_v1
123+
lib._set_output_format_for_pipeline_tests(any_output_format)
121124
symbol = "test_aggregation_numeric_dynamic"
122125
lib.delete(symbol)
123126
for df in dfs:
@@ -155,9 +158,10 @@ def test_aggregation_numeric_dynamic(lmdb_version_store_dynamic_schema_v1, dfs):
155158
],
156159
),
157160
)
158-
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, df):
161+
def test_aggregation_strings_dynamic(lmdb_version_store_dynamic_schema_v1, any_output_format, df):
159162
assume(len(df) >= 3)
160163
lib = lmdb_version_store_dynamic_schema_v1
164+
lib._set_output_format_for_pipeline_tests(any_output_format)
161165
symbol = "test_aggregation_strings_dynamic"
162166
lib.delete(symbol)
163167
slices = [

python/tests/hypothesis/arcticdb/test_resample.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
MIN_DATE = np.datetime64('1969-06-01')
2222
MAX_DATE = np.datetime64('1970-06-01')
2323

24-
pytestmark = pytest.mark.pipeline
24+
pytestmark = pytest.mark.pipeline # Covered
2525

2626

2727
@st.composite
@@ -149,8 +149,9 @@ def dynamic_schema_column_list(draw):
149149
origin=origin(),
150150
offset=offset()
151151
)
152-
def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
152+
def test_resample(lmdb_version_store_v1, any_output_format, df, rule, origin, offset):
153153
lib = lmdb_version_store_v1
154+
lib._set_output_format_for_pipeline_tests(any_output_format)
154155
sym = "sym"
155156
logger = get_logger()
156157
logger.info(f"Data frame generated has {df.shape[0]} rows")
@@ -197,9 +198,10 @@ def test_resample(lmdb_version_store_v1, df, rule, origin, offset):
197198
offset=offset()
198199
)
199200
@settings(deadline=None, suppress_health_check=[HealthCheck.data_too_large])
200-
def test_resample_dynamic_schema(lmdb_version_store_dynamic_schema_v1, df_list, rule, origin, offset):
201+
def test_resample_dynamic_schema(lmdb_version_store_dynamic_schema_v1, any_output_format, df_list, rule, origin, offset):
201202
common_column_types = compute_common_type_for_columns_in_df_list(df_list)
202203
lib = lmdb_version_store_dynamic_schema_v1
204+
lib._set_output_format_for_pipeline_tests(any_output_format)
203205
lib.version_store.clear()
204206
sym = "sym"
205207
agg = {f"{name}_{op}": (name, op) for name in common_column_types for op in ALL_AGGREGATIONS}

python/tests/integration/arcticdb/version_store/test_basic_version_store.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -822,10 +822,11 @@ def test_range_index(basic_store, sym):
822822
assert_equal(expected, vit.data)
823823

824824

825-
@pytest.mark.pipeline
825+
@pytest.mark.pipeline # Covered
826826
@pytest.mark.parametrize("use_date_range_clause", [True, False])
827827
@pytest.mark.storage
828-
def test_date_range(basic_store, use_date_range_clause):
828+
def test_date_range(basic_store, use_date_range_clause, any_output_format):
829+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
829830
initial_timestamp = pd.Timestamp("2019-01-01")
830831
df = pd.DataFrame(data=np.arange(100), index=pd.date_range(initial_timestamp, periods=100))
831832
sym = "date_test"
@@ -871,10 +872,11 @@ def test_date_range(basic_store, use_date_range_clause):
871872
assert data_closed[data_closed.columns[0]][-1] == end_offset
872873

873874

874-
@pytest.mark.pipeline
875+
@pytest.mark.pipeline # Covered
875876
@pytest.mark.parametrize("use_date_range_clause", [True, False])
876877
@pytest.mark.storage
877-
def test_date_range_none(basic_store, use_date_range_clause):
878+
def test_date_range_none(basic_store, use_date_range_clause, any_output_format):
879+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
878880
sym = "date_test2"
879881
rows = 100
880882
initial_timestamp = pd.Timestamp("2019-01-01")
@@ -891,10 +893,11 @@ def test_date_range_none(basic_store, use_date_range_clause):
891893
assert len(data) == rows
892894

893895

894-
@pytest.mark.pipeline
896+
@pytest.mark.pipeline # Covered
895897
@pytest.mark.parametrize("use_date_range_clause", [True, False])
896898
@pytest.mark.storage
897-
def test_date_range_start_equals_end(basic_store, use_date_range_clause):
899+
def test_date_range_start_equals_end(basic_store, use_date_range_clause, any_output_format):
900+
basic_store._set_output_format_for_pipeline_tests(any_output_format)
898901
sym = "date_test2"
899902
rows = 100
900903
initial_timestamp = pd.Timestamp("2019-01-01")
@@ -914,11 +917,12 @@ def test_date_range_start_equals_end(basic_store, use_date_range_clause):
914917
assert data[data.columns[0]][0] == start_offset
915918

916919

917-
@pytest.mark.pipeline
920+
@pytest.mark.pipeline # Covered
918921
@pytest.mark.parametrize("use_date_range_clause", [True, False])
919922
@pytest.mark.storage
920-
def test_date_range_row_sliced(basic_store_tiny_segment, use_date_range_clause):
923+
def test_date_range_row_sliced(basic_store_tiny_segment, use_date_range_clause, any_output_format):
921924
lib = basic_store_tiny_segment
925+
lib._set_output_format_for_pipeline_tests(any_output_format)
922926
sym = "test_date_range_row_sliced"
923927
# basic_store_tiny_segment produces 2x2 segments
924928
num_rows = 6
@@ -2719,11 +2723,12 @@ def test_batch_append_with_throw_exception(basic_store, three_col_df):
27192723
)
27202724

27212725

2722-
@pytest.mark.pipeline
2726+
@pytest.mark.pipeline # Covered
27232727
@pytest.mark.parametrize("use_date_range_clause", [True, False])
27242728
@pytest.mark.storage
2725-
def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_range_clause):
2729+
def test_batch_read_date_range(basic_store_tombstone_and_sync_passive, use_date_range_clause, any_output_format):
27262730
lmdb_version_store = basic_store_tombstone_and_sync_passive
2731+
lmdb_version_store._set_output_format_for_pipeline_tests(any_output_format)
27272732
symbols = []
27282733
for i in range(5):
27292734
symbols.append("sym_{}".format(i))

0 commit comments

Comments
 (0)