Skip to content

Commit 2785b43

Browse files
authored
Arrow read support (experimental release) (#2489)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> #### What does this implement or fix? Adds a frontend API for using Arrow. The API looks like below: ``` from arcticdb import Arctic, OutputFormat ac = adb.Arctic(uri, output_format=OutputFormat.EXPERIMENTAL_ARROW) # Sets a runtime output format option so all read operations return arrow tables. lib = ac["lib_name"] lib.read(sym).data # This will return pyarrow.Table lib.read(sym, output_format=OutputFormat.PANDAS) # We can also override the output format for any specific read operation ``` All read operations `read`, `read_batch`, `read_batch_and_join` and their `lazy` equivalents adhere to the `output_format` argument. The changes in this PR are: - Change the `_output_format` argument to `output_format` - Separates the internal C++ `OutputFormat` from the python one. For python we use a `StrEnum` which allows users to pass both the enum and the string value - Adds a `RuntimeOptions` class stored inside the `Arctic` and `NativeVersionStore` instances. `RuntimeOptions` contains only the `output_format` currently but will later on include things like `arrow_string_column_encoding` and other layout configurations. - Allow passing `output_format` in all V2 APIs and make it work for `lazy=True` cases - Run all query builder tests also with `output_format=ARROW` - Clean up `test_arrow` and `test_arrow_normalization` to adhere to new API - Add tests for all user facing arrow APIs in `test_arrow_api.py` - Fixes an issue where `read_batch_and_join` didn't respect the input `ReadOptions` - Introduces the `arcticdb.dependencies` for handling optional dependencies. #### Any other comments? Writing tests for the optional dependencies is difficult. So I ran a manual test: In a venv with pyarrow: ``` >>> import arcticdb as adb >>> ac = adb.Arctic("lmdb:///tmp/test-arrow") >>> lib = ac["test"] >>> lib.read("test", output_format="pandas").data x 0 5 1 6 2 7 >>> lib.read("test", output_format="experimental_arrow").data pyarrow.Table x: int64 ---- x: [[5,6,7]] ``` And in a venv without pyarrow: ``` >>> import arcticdb as adb >>> ac = adb.Arctic("lmdb:///tmp/test-arrow") >>> lib = ac["test"] >>> lib.read("test", output_format="pandas").data x 0 5 1 6 2 7 >>> lib.read("test", output_format="experimental_arrow").data Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/ivo/source/read_as_arrow/python/arcticdb/version_store/library.py", line 1887, in read return self._nvs.read( ^^^^^^^^^^^^^^^ File "/home/ivo/source/read_as_arrow/python/arcticdb/version_store/_store.py", line 2063, in read version_query, read_options, read_query = self._get_queries( ^^^^^^^^^^^^^^^^^^ File "/home/ivo/source/read_as_arrow/python/arcticdb/version_store/_store.py", line 1970, in _get_queries read_options = self._get_read_options(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/ivo/source/read_as_arrow/python/arcticdb/version_store/_store.py", line 1958, in _get_read_options output_format_to_internal( File "/home/ivo/source/read_as_arrow/python/arcticdb/options.py", line 162, in output_format_to_internal raise ModuleNotFoundError("ArcticDB's pyarrow optional dependency missing but is required to use arrow output format.") ModuleNotFoundError: ArcticDB's pyarrow optional dependency missing but is required to use arrow output format. ``` #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent a88815a commit 2785b43

File tree

19 files changed

+710
-305
lines changed

19 files changed

+710
-305
lines changed

cpp/arcticdb/pipeline/read_options.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,5 +90,9 @@ struct ReadOptions {
9090
[[nodiscard]] OutputFormat output_format() const {
9191
return data_->output_format_;
9292
}
93+
94+
[[nodiscard]] ReadOptions clone() const {
95+
return ReadOptions(std::make_shared<ReadOptionsData>(*data_));
96+
}
9397
};
9498
} //namespace arcticdb

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,22 +1293,22 @@ MultiSymbolReadOutput LocalVersionedEngine::batch_read_and_join_internal(
12931293
}
12941294
auto clauses_ptr = std::make_shared<std::vector<std::shared_ptr<Clause>>>(std::move(clauses));
12951295
return folly::collect(symbol_processing_result_futs).via(&async::io_executor())
1296-
.thenValueInline([this, &handler_data, clauses_ptr, component_manager](std::vector<SymbolProcessingResult>&& symbol_processing_results) mutable {
1296+
.thenValueInline([this, &handler_data, clauses_ptr, component_manager, read_options](std::vector<SymbolProcessingResult>&& symbol_processing_results) mutable {
12971297
auto [input_schemas, entity_ids, res_versioned_items, res_metadatas] = unpack_symbol_processing_results(std::move(symbol_processing_results));
12981298
auto pipeline_context = setup_join_pipeline_context(std::move(input_schemas), *clauses_ptr);
12991299
return schedule_remaining_iterations(std::move(entity_ids), clauses_ptr)
13001300
.thenValueInline([component_manager](std::vector<EntityId>&& processed_entity_ids) {
13011301
auto proc = gather_entities<std::shared_ptr<SegmentInMemory>, std::shared_ptr<RowRange>, std::shared_ptr<ColRange>>(*component_manager, std::move(processed_entity_ids));
13021302
return collect_segments(std::move(proc));
13031303
})
1304-
.thenValueInline([store=store(), &handler_data, pipeline_context](std::vector<SliceAndKey>&& slice_and_keys) mutable {
1305-
return prepare_output_frame(std::move(slice_and_keys), pipeline_context, store, ReadOptions{}, handler_data);
1304+
.thenValueInline([store=store(), &handler_data, pipeline_context, read_options](std::vector<SliceAndKey>&& slice_and_keys) mutable {
1305+
return prepare_output_frame(std::move(slice_and_keys), pipeline_context, store, read_options, handler_data);
13061306
})
1307-
.thenValueInline([&handler_data, pipeline_context, res_versioned_items, res_metadatas](SegmentInMemory&& frame) mutable {
1307+
.thenValueInline([&handler_data, pipeline_context, res_versioned_items, res_metadatas, read_options](SegmentInMemory&& frame) mutable {
13081308
// Needed to force our usual backfilling behaviour when columns have been outer-joined and some are not present in all input symbols
1309-
ReadOptions read_options;
1310-
read_options.set_dynamic_schema(true);
1311-
return reduce_and_fix_columns(pipeline_context, frame, read_options, handler_data)
1309+
ReadOptions read_options_with_dynamic_schema = read_options.clone();
1310+
read_options_with_dynamic_schema.set_dynamic_schema(true);
1311+
return reduce_and_fix_columns(pipeline_context, frame, read_options_with_dynamic_schema, handler_data)
13121312
.thenValueInline([pipeline_context, frame, res_versioned_items, res_metadatas](auto&&) mutable {
13131313
return MultiSymbolReadOutput{
13141314
std::move(*res_versioned_items),

cpp/arcticdb/version/python_bindings.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
230230
.def("set_timestamp", &VersionQuery::set_timestamp)
231231
.def("set_version", &VersionQuery::set_version);
232232

233-
py::enum_<OutputFormat>(version, "OutputFormat")
233+
py::enum_<OutputFormat>(version, "InternalOutputFormat")
234234
.value("PANDAS", OutputFormat::PANDAS)
235235
.value("ARROW", OutputFormat::ARROW);
236236

python/arcticdb/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import sys as _sys
66

77
from arcticdb.arctic import Arctic
8-
from arcticdb.options import LibraryOptions
8+
from arcticdb.options import LibraryOptions, OutputFormat, RuntimeOptions
99
from arcticdb.version_store.processing import QueryBuilder, where
1010
from arcticdb.version_store._store import VersionedItem
1111
import arcticdb.version_store.library as library

python/arcticdb/arctic.py

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import logging
99
from typing import List, Optional, Any, Union
1010

11-
from arcticdb.options import DEFAULT_ENCODING_VERSION, LibraryOptions, EnterpriseLibraryOptions
11+
from arcticdb.options import DEFAULT_ENCODING_VERSION, LibraryOptions, EnterpriseLibraryOptions, RuntimeOptions, OutputFormat
1212
from arcticdb_ext.storage import LibraryManager
1313
from arcticdb.exceptions import LibraryNotFound, MismatchingLibraryOptions
1414
from arcticdb.version_store.library import ArcticInvalidApiUsageException, Library
@@ -46,7 +46,7 @@ class Arctic:
4646
# It is set by the LmdbStorageFixture
4747
_accessed_libs: Optional[List[NativeVersionStore]] = None
4848

49-
def __init__(self, uri: str, encoding_version: EncodingVersion = DEFAULT_ENCODING_VERSION):
49+
def __init__(self, uri: str, encoding_version: EncodingVersion = DEFAULT_ENCODING_VERSION, output_format: Union[OutputFormat, str] = OutputFormat.PANDAS):
5050
"""
5151
Initializes a top-level Arctic library management instance.
5252
@@ -59,10 +59,19 @@ def __init__(self, uri: str, encoding_version: EncodingVersion = DEFAULT_ENCODIN
5959
URI specifying the backing store used to access, configure, and create Arctic libraries.
6060
For more details about the parameters, please refer to the [Arctic URI Documentation](./arctic_uri.md).
6161
62-
encoding_version: EncodingVersion, default DEFAULT_ENCODING_VERSION
62+
encoding_version: EncodingVersion, default = EncodingVersion.V1
6363
When creating new libraries with this Arctic instance, the default encoding version to use.
6464
Can be overridden by specifying the encoding version in the LibraryOptions argument to create_library.
6565
66+
output_format: Union[OutputFormat, str], default = OutputFormat.PANDAS
67+
Controls the default output format of all operations returning a dataframe.
68+
The default behavior (OutputFormat.PANDAS) is to return `pandas.DataFrame`s or `pandas.Series` backed by
69+
numpy arrays.
70+
OutputFormat.EXPERIMENTAL_ARROW will return all dataframes as `pyarrow.Table`s. The arrow API is still
71+
experimental and the arrow layout might change in a minor release.
72+
Accepts the OutputFormat as either OutputFormat enum values or as case-insensitive strings like "pandas"
73+
and "experimental_arrow".
74+
6675
Examples
6776
--------
6877
@@ -89,31 +98,46 @@ def __init__(self, uri: str, encoding_version: EncodingVersion = DEFAULT_ENCODIN
8998
self._library_adapter: ArcticLibraryAdapter = _cls(uri, self._encoding_version)
9099
self._library_manager = LibraryManager(self._library_adapter.config_library)
91100
self._uri = uri
101+
self._runtime_options = RuntimeOptions(output_format=output_format)
92102

93-
def __getitem__(self, name: str) -> Library:
103+
def _get_library(self, name: str, output_format: Optional[Union[OutputFormat, str]] = None) -> Library:
94104
lib_mgr_name = self._library_adapter.get_name_for_library_manager(name)
95105
if not self._library_manager.has_library(lib_mgr_name):
96106
raise LibraryNotFound(name)
97107

98108
storage_override = self._library_adapter.get_storage_override()
109+
110+
if output_format is not None:
111+
runtime_options = RuntimeOptions(output_format=output_format)
112+
else:
113+
runtime_options = self._runtime_options
114+
99115
lib = NativeVersionStore(
100116
self._library_manager.get_library(lib_mgr_name, storage_override, native_storage_config=self._library_adapter.native_config()),
101117
repr(self._library_adapter),
102118
lib_cfg=self._library_manager.get_library_config(lib_mgr_name, storage_override),
103-
native_cfg=self._library_adapter.native_config()
119+
native_cfg=self._library_adapter.native_config(),
120+
runtime_options=runtime_options
104121
)
105122
if self._accessed_libs is not None:
106123
self._accessed_libs.append(lib)
107124
return Library(repr(self), lib)
108125

126+
def __getitem__(self, name: str):
127+
return self._get_library(name)
128+
109129
def __repr__(self):
110130
return "Arctic(config=%r)" % self._library_adapter
111131

112132
def __contains__(self, name: str):
113133
return self.has_library(name)
114134

115135
def get_library(
116-
self, name: str, create_if_missing: Optional[bool] = False, library_options: Optional[LibraryOptions] = None
136+
self,
137+
name: str,
138+
create_if_missing: Optional[bool] = False,
139+
library_options: Optional[LibraryOptions] = None,
140+
output_format: Optional[Union[OutputFormat, str]] = None,
117141
) -> Library:
118142
"""
119143
Returns the library named ``name``.
@@ -136,6 +160,11 @@ def get_library(
136160
match these.
137161
Unused if create_if_missing is False.
138162
163+
output_format: Optional[Union[OutputFormat, str]], default = None
164+
Controls the default output format of all operations on the library returning a dataframe.
165+
For more information see documentation of `Arctic.__init__`.
166+
If `None` uses the output format from the Arctic instance.
167+
139168
Examples
140169
--------
141170
>>> arctic = adb.Arctic('s3://MY_ENDPOINT:MY_BUCKET')
@@ -152,7 +181,7 @@ def get_library(
152181
"In get_library, library_options must be falsey if create_if_missing is falsey"
153182
)
154183
try:
155-
lib = self[name]
184+
lib = self._get_library(name, output_format)
156185
if create_if_missing and library_options:
157186
if library_options.encoding_version is None:
158187
library_options.encoding_version = self._encoding_version
@@ -161,14 +190,15 @@ def get_library(
161190
return lib
162191
except LibraryNotFound as e:
163192
if create_if_missing:
164-
return self.create_library(name, library_options)
193+
return self.create_library(name, library_options, output_format)
165194
else:
166195
raise e
167196

168197
def create_library(self,
169198
name: str,
170199
library_options: Optional[LibraryOptions] = None,
171-
enterprise_library_options: Optional[EnterpriseLibraryOptions] = None) -> Library:
200+
enterprise_library_options: Optional[EnterpriseLibraryOptions] = None,
201+
output_format: Optional[Union[OutputFormat, str]] = None) -> Library:
172202
"""
173203
Creates the library named ``name``.
174204
@@ -192,6 +222,11 @@ def create_library(self,
192222
Enterprise options to use in configuring the library. Defaults if not provided are the same as documented in
193223
EnterpriseLibraryOptions. These options are only relevant to ArcticDB enterprise users.
194224
225+
output_format: Optional[Union[OutputFormat, str]], default = None
226+
Controls the default output format of all operations on the library returning a dataframe.
227+
For more information see documentation of `Arctic.__init__`.
228+
If `None` uses the output format from the Arctic instance.
229+
195230
Examples
196231
--------
197232
>>> arctic = adb.Arctic('s3://MY_ENDPOINT:MY_BUCKET')
@@ -214,7 +249,7 @@ def create_library(self,
214249
cfg = self._library_adapter.get_library_config(name, library_options, enterprise_library_options)
215250
lib_mgr_name = self._library_adapter.get_name_for_library_manager(name)
216251
self._library_manager.write_library_config(cfg, lib_mgr_name, self._library_adapter.get_masking_override())
217-
return self.get_library(name)
252+
return self.get_library(name, output_format=output_format)
218253

219254
def delete_library(self, name: str) -> None:
220255
"""

python/arcticdb/dependencies.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from types import ModuleType
2+
from typing import Any, Tuple
3+
from importlib import import_module
4+
5+
6+
class MissingModule(ModuleType):
7+
"""
8+
A dummy module used to represent a missing optional dependency.
9+
Raises a meaningful error message when trying to get any attribute.
10+
"""
11+
12+
def __init__(
13+
self,
14+
module_name: str,
15+
) -> None:
16+
self._module_name = module_name
17+
super().__init__(module_name)
18+
19+
def __getattr__(self, name: str) -> Any:
20+
msg = f"ArcticDB's {self._module_name!r} optional dependency is missing but should be installed to use this feature."
21+
raise ModuleNotFoundError(msg)
22+
23+
24+
def _import_optional_dependency(module_name: str) -> Tuple[ModuleType, bool]:
25+
try:
26+
module = import_module(module_name)
27+
return module, True
28+
except ImportError:
29+
module = MissingModule(module_name)
30+
return module, False
31+
32+
33+
pyarrow, _PYARROW_AVAILABLE = _import_optional_dependency("pyarrow")
34+
35+
36+
__all__ = [
37+
"pyarrow",
38+
"_PYARROW_AVAILABLE",
39+
]

python/arcticdb/options.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
77
"""
88

9-
from typing import Optional
9+
from typing import Optional, Union
1010
from enum import Enum
1111

12+
from arcticdb.dependencies import _PYARROW_AVAILABLE
1213
from arcticdb.encoding_version import EncodingVersion
1314
from arcticdb_ext.storage import ModifiableLibraryOption, ModifiableEnterpriseLibraryOption
15+
from arcticdb_ext.version_store import InternalOutputFormat
1416

1517

1618
DEFAULT_ENCODING_VERSION = EncodingVersion.V1
@@ -146,6 +148,35 @@ def __repr__(self):
146148
)
147149

148150

151+
# TODO: Use enum.StrEnum when we no longer need to support python 3.9
152+
class OutputFormat(str, Enum):
153+
PANDAS = "PANDAS"
154+
EXPERIMENTAL_ARROW = "EXPERIMENTAL_ARROW"
155+
156+
157+
def output_format_to_internal(output_format: Union[OutputFormat, str]) -> InternalOutputFormat:
158+
if output_format.lower() == OutputFormat.PANDAS.lower():
159+
return InternalOutputFormat.PANDAS
160+
elif output_format.lower() == OutputFormat.EXPERIMENTAL_ARROW.lower():
161+
if not _PYARROW_AVAILABLE:
162+
raise ModuleNotFoundError("ArcticDB's pyarrow optional dependency missing but is required to use arrow output format.")
163+
return InternalOutputFormat.ARROW
164+
else:
165+
raise ValueError(f"Unknown OutputFormat: {output_format}")
166+
167+
class RuntimeOptions:
168+
def __init__(
169+
self,
170+
*,
171+
output_format: Union[OutputFormat, str] = OutputFormat.PANDAS,
172+
):
173+
self.output_format=output_format
174+
175+
176+
def set_output_format(self, output_format: Union[OutputFormat, str]):
177+
self.output_format = output_format
178+
179+
149180
class EnterpriseLibraryOptions:
150181
"""
151182
Configuration options for ArcticDB libraries, that should only be used when you are using the ArcticDB enterprise

python/arcticdb/util/arrow.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from arcticdb.dependencies import pyarrow as pa
2+
3+
def stringify_dictionary_encoded_columns(table, string_type=None):
4+
"""
5+
Converts all pyarrow.Table dictionary encoded columns to strings.
6+
7+
ArcticDB currently returns string columns in dictionary encoded arrow arrays when using
8+
OutputFormat.EXPERIMENTAL_ARROW.
9+
10+
Useful for testing when comparing to the source dataframe where we want regular large_string columns instead of
11+
categorical columns.
12+
"""
13+
if string_type is None:
14+
string_type = pa.large_string()
15+
for i, name in enumerate(table.column_names):
16+
if pa.types.is_dictionary(table.column(i).type):
17+
table = table.set_column(i, name, table.column(name).cast(string_type))
18+
return table

python/arcticdb/util/test.py

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import time
2020
import attr
2121
from functools import wraps, reduce
22+
from arcticdb.dependencies import pyarrow as pa
2223

2324
from arcticdb.util.marks import SHORTER_LOGS
2425

@@ -29,6 +30,7 @@
2930

3031
from arcticdb import QueryBuilder
3132
from arcticdb.util._versions import IS_PANDAS_ONE, PANDAS_VERSION, CHECK_FREQ_VERSION
33+
from arcticdb.util.arrow import stringify_dictionary_encoded_columns
3234
from arcticdb.version_store import NativeVersionStore
3335
from arcticdb.version_store._custom_normalizers import CustomNormalizer
3436
from arcticc.pb2.descriptors_pb2 import NormalizationMetadata
@@ -253,38 +255,15 @@ def assert_frame_equal_rebuild_index_first(expected: pd.DataFrame, actual: pd.Da
253255

254256

255257
def convert_arrow_to_pandas_and_remove_categoricals(table):
256-
"""
257-
Converts a pyarrow.Table to a pandas.DataFrame and unwinds all dictionary encoded columns.
258-
259-
Useful for testing because ArcticDB currently returns string columns in arrow as dictionary encoded, but when
260-
comparing to the source pandas dataframes we want regular string columns instead of categoricals.
261-
"""
262-
import pyarrow as pa
263-
new_columns = []
264-
new_fields = []
265-
metadata = table.schema.metadata
266-
267-
for i, col in enumerate(table.columns):
268-
field = table.field(i)
269-
if isinstance(field.type, pa.DictionaryType):
270-
col = pa.compute.cast(col, field.type.value_type)
271-
field = field.with_type(field.type.value_type)
272-
new_columns.append(col)
273-
new_fields.append(field)
274-
275-
new_table = pa.Table.from_arrays(
276-
new_columns,
277-
schema=pa.schema(new_fields).with_metadata(metadata)
278-
)
258+
new_table = stringify_dictionary_encoded_columns(table)
279259
return new_table.to_pandas()
280260

281-
def assert_frame_equal_with_arrow(left, right):
282-
import pyarrow as pa
261+
def assert_frame_equal_with_arrow(left, right, **kwargs):
283262
if isinstance(left, pa.Table):
284263
left = convert_arrow_to_pandas_and_remove_categoricals(left)
285264
if isinstance(right, pa.Table):
286265
right = convert_arrow_to_pandas_and_remove_categoricals(right)
287-
assert_frame_equal(left, right)
266+
assert_frame_equal(left, right, **kwargs)
288267

289268

290269
unicode_symbol = "\u00A0" # start of latin extensions

0 commit comments

Comments
 (0)