Skip to content

Commit e14ef98

Browse files
authored
Raise exception if filtering recursively normalized data or numpy array (#2641)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> https://man312219.monday.com/boards/7852509418/pulses/9927290197 https://man312219.monday.com/boards/7852509418/pulses/9921000181 #### What does this implement or fix? Raise exception for reading recursively normalized data or numpy array if query_builder is passed, as query_builder doesnt work on such data anyway #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent a7e6cfe commit e14ef98

File tree

6 files changed

+85
-28
lines changed

6 files changed

+85
-28
lines changed

cpp/arcticdb/pipeline/index_segment_reader.cpp

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -91,26 +91,4 @@ bool IndexSegmentReader::is_pickled() const {
9191

9292
bool IndexSegmentReader::has_timestamp_index() const { return tsd().index().type_ == IndexDescriptor::Type::TIMESTAMP; }
9393

94-
void check_column_and_date_range_filterable(
95-
const pipelines::index::IndexSegmentReader& index_segment_reader, const ReadQuery& read_query
96-
) {
97-
util::check(
98-
!index_segment_reader.is_pickled() ||
99-
(!read_query.columns.has_value() && std::holds_alternative<std::monostate>(read_query.row_filter)),
100-
"The data for this symbol is pickled and does not support column stats, date_range, row_range, or column "
101-
"queries"
102-
);
103-
util::check(
104-
index_segment_reader.has_timestamp_index() || !std::holds_alternative<IndexRange>(read_query.row_filter),
105-
"Cannot apply date range filter to symbol with non-timestamp index"
106-
);
107-
sorting::check<ErrorCode::E_UNSORTED_DATA>(
108-
index_segment_reader.sorted() == SortedValue::UNKNOWN ||
109-
index_segment_reader.sorted() == SortedValue::ASCENDING ||
110-
!std::holds_alternative<IndexRange>(read_query.row_filter),
111-
"When filtering data using date_range, the symbol must be sorted in ascending order. ArcticDB believes it "
112-
"is not sorted in ascending order and cannot therefore filter the data using date_range."
113-
);
114-
}
115-
11694
} // namespace arcticdb::pipelines::index

cpp/arcticdb/pipeline/index_segment_reader.hpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,4 @@ folly::Future<IndexSegmentReader> async_get_index_reader(
122122

123123
IndexRange get_index_segment_range(const AtomKey& prev_index, const std::shared_ptr<Store>& store);
124124

125-
void check_column_and_date_range_filterable(
126-
const IndexSegmentReader& index_segment_reader, const ReadQuery& read_query
127-
);
128-
129125
} // namespace arcticdb::pipelines::index

cpp/arcticdb/util/error_code.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ inline std::unordered_map<ErrorCategory, const char*> get_error_category_names()
7474
ERROR_CODE(4002, E_UNSUPPORTED_COLUMN_TYPE) \
7575
ERROR_CODE(4003, E_UNSUPPORTED_INDEX_TYPE) \
7676
ERROR_CODE(4004, E_OPERATION_NOT_SUPPORTED_WITH_PICKLED_DATA) \
77+
ERROR_CODE(4005, E_OPERATION_NOT_SUPPORTED_WITH_RECURSIVE_NORMALIZED_DATA) \
78+
ERROR_CODE(4006, E_OPERATION_NOT_SUPPORTED_WITH_NUMPY_ARRAY) \
7779
ERROR_CODE(5000, E_KEY_NOT_FOUND) \
7880
ERROR_CODE(5001, E_DUPLICATE_KEY) \
7981
ERROR_CODE(5002, E_SYMBOL_NOT_FOUND) \

cpp/arcticdb/version/version_core.cpp

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1211,6 +1211,52 @@ void check_multi_key_is_not_index_only(const PipelineContext& pipeline_context,
12111211
);
12121212
}
12131213

1214+
void check_can_be_filtered(const std::shared_ptr<PipelineContext>& pipeline_context, const ReadQuery& read_query) {
1215+
// To remain backward compatibility, pending new major release to merge into below section
1216+
// Ticket: 18038782559
1217+
bool is_pickled = pipeline_context->norm_meta_ && pipeline_context->is_pickled();
1218+
util::check(
1219+
!is_pickled ||
1220+
(!read_query.columns.has_value() && std::holds_alternative<std::monostate>(read_query.row_filter)),
1221+
"The data for this symbol is pickled and does not support column stats, date_range, row_range, or column "
1222+
"queries"
1223+
);
1224+
if (pipeline_context->multi_key_) {
1225+
check_multi_key_is_not_index_only(*pipeline_context, read_query);
1226+
}
1227+
1228+
// To keep
1229+
if (pipeline_context->desc_) {
1230+
util::check(
1231+
pipeline_context->descriptor().index().type() == IndexDescriptor::Type::TIMESTAMP ||
1232+
!std::holds_alternative<IndexRange>(read_query.row_filter),
1233+
"Cannot apply date range filter to symbol with non-timestamp index"
1234+
);
1235+
auto sorted_value = pipeline_context->descriptor().sorted();
1236+
sorting::check<ErrorCode::E_UNSORTED_DATA>(
1237+
sorted_value == SortedValue::UNKNOWN || sorted_value == SortedValue::ASCENDING ||
1238+
!std::holds_alternative<IndexRange>(read_query.row_filter),
1239+
"When filtering data using date_range, the symbol must be sorted in ascending order. ArcticDB believes "
1240+
"it "
1241+
"is not sorted in ascending order and cannot therefore filter the data using date_range."
1242+
);
1243+
}
1244+
bool is_query_empty =
1245+
(!read_query.columns && !read_query.row_range &&
1246+
std::holds_alternative<std::monostate>(read_query.row_filter) && read_query.clauses_.empty());
1247+
bool is_numpy_array = pipeline_context->norm_meta_ && pipeline_context->norm_meta_->has_np();
1248+
if (!is_query_empty) {
1249+
// Exception for filterig pickled data is skipped for now for backward compatibility
1250+
if (pipeline_context->multi_key_) {
1251+
schema::raise<ErrorCode::E_OPERATION_NOT_SUPPORTED_WITH_RECURSIVE_NORMALIZED_DATA>(
1252+
"Cannot filter recursively normalized data"
1253+
);
1254+
} else if (is_numpy_array) {
1255+
schema::raise<ErrorCode::E_OPERATION_NOT_SUPPORTED_WITH_NUMPY_ARRAY>("Cannot filter numpy array");
1256+
}
1257+
}
1258+
}
1259+
12141260
static void read_indexed_keys_to_pipeline(
12151261
const std::shared_ptr<Store>& store, const std::shared_ptr<PipelineContext>& pipeline_context,
12161262
const VersionedItem& version_info, ReadQuery& read_query, const ReadOptions& read_options
@@ -1222,7 +1268,6 @@ static void read_indexed_keys_to_pipeline(
12221268
auto index_segment_reader = std::move(*maybe_reader);
12231269
ARCTICDB_DEBUG(log::version(), "Read index segment with {} keys", index_segment_reader.size());
12241270
check_can_read_index_only_if_required(index_segment_reader, read_query);
1225-
check_column_and_date_range_filterable(index_segment_reader, read_query);
12261271
add_index_columns_to_query(read_query, index_segment_reader.tsd());
12271272

12281273
const auto& tsd = index_segment_reader.tsd();
@@ -1245,6 +1290,7 @@ static void read_indexed_keys_to_pipeline(
12451290
std::move(*index_segment_reader.mutable_tsd().mutable_proto().mutable_user_meta())
12461291
);
12471292
pipeline_context->bucketize_dynamic_ = bucketize_dynamic;
1293+
check_can_be_filtered(pipeline_context, read_query);
12481294
ARCTICDB_DEBUG(
12491295
log::version(),
12501296
"read_indexed_keys_to_pipeline: Symbol {} Found {} keys with {} total rows",
@@ -2570,8 +2616,11 @@ folly::Future<ReadVersionOutput> read_frame_for_version(
25702616
) {
25712617
auto pipeline_context = setup_pipeline_context(store, version_info, *read_query, read_options);
25722618
auto res_versioned_item = generate_result_versioned_item(version_info);
2619+
25732620
if (pipeline_context->multi_key_) {
2574-
check_multi_key_is_not_index_only(*pipeline_context, *read_query);
2621+
if (read_query) {
2622+
check_can_be_filtered(pipeline_context, *read_query);
2623+
}
25752624
return read_multi_key(store, *pipeline_context->multi_key_, handler_data, std::move(res_versioned_item.key_));
25762625
}
25772626
ARCTICDB_DEBUG(log::version(), "Fetching data to frame");

python/arcticdb/util/test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1067,6 +1067,9 @@ def equals(x, y):
10671067
elif isinstance(x, np.ndarray):
10681068
assert isinstance(y, np.ndarray)
10691069
assert np.allclose(x, y)
1070+
elif isinstance(x, pd.DataFrame):
1071+
assert isinstance(y, pd.DataFrame)
1072+
assert_frame_equal(x, y)
10701073
else:
10711074
assert x == y
10721075

python/tests/unit/arcticdb/version_store/test_filtering.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
generic_filter_test_strings,
3232
generic_filter_test_nans,
3333
unicode_symbols,
34+
equals,
3435
)
3536
from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION, IS_NUMPY_TWO
3637

@@ -1142,6 +1143,34 @@ def test_float32_binary_comparison(lmdb_version_store_v1):
11421143
generic_filter_test(lib, symbol, q, expected)
11431144

11441145

1146+
@pytest.mark.parametrize("data", ({"a": pd.DataFrame({"col": [0]})}, np.array([1, 2, 3, 4]), np.ndarray((3, 3))))
1147+
@pytest.mark.parametrize("empty", (True, False))
1148+
def test_filter_unfilterable_data(lmdb_version_store_v1, empty, data, sym):
1149+
lib = lmdb_version_store_v1
1150+
lib.write(sym, data, recursive_normalizers=True)
1151+
1152+
q = QueryBuilder()
1153+
if empty:
1154+
equals(lib.read(sym, query_builder=q).data, data)
1155+
else:
1156+
q = q[q["col"] == 0]
1157+
with pytest.raises(SchemaException):
1158+
lib.read(sym, query_builder=q)
1159+
1160+
1161+
@pytest.mark.parametrize("data", ({"a": pd.DataFrame({"col": [0]})}, np.array([1, 2, 3, 4]), np.ndarray((3, 3))))
1162+
@pytest.mark.parametrize("head", (True, False))
1163+
def test_head_tail_unfilterable_data(lmdb_version_store_v1, head, sym, data):
1164+
lib = lmdb_version_store_v1
1165+
lib.write(sym, data, recursive_normalizers=True)
1166+
1167+
with pytest.raises(SchemaException):
1168+
if head:
1169+
lib.head(sym)
1170+
else:
1171+
lib.tail(sym)
1172+
1173+
11451174
################################
11461175
# MIXED SCHEMA TESTS FROM HERE #
11471176
################################

0 commit comments

Comments
 (0)