Skip to content

Commit 724dc79

Browse files
authored
Utility for writing segment_in_memory (#2515)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> #### What does this implement or fix? This pull request implements a feature in the local versioned engine for writing a SegmentInMemory similarly to how an InputTensorFrame would be written using write_versioned_dataframe_internaly (this is what is used in the standard lib.write()). #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent fdbc93e commit 724dc79

16 files changed

+214
-40
lines changed

cpp/arcticdb/column_store/memory_segment_impl.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,9 @@ std::vector<std::shared_ptr<SegmentInMemoryImpl>> SegmentInMemoryImpl::split(siz
639639
util::BitSetSizeType end = std::min(start + rows, total_rows);
640640
// set_range is close interval on [left, right]
641641
bitset.set_range(start, end - 1, true);
642-
output.emplace_back(filter(std::move(bitset), filter_down_stringpool));
642+
auto output_segment = filter(std::move(bitset), filter_down_stringpool);
643+
output_segment->set_offset(start);
644+
output.emplace_back(std::move(output_segment));
643645
}
644646
return output;
645647
}

cpp/arcticdb/pipeline/slicing.hpp

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,10 @@ inline auto end_index_generator(T end_index){//works for both rawtype and rawtyp
9595
}
9696
}
9797

98-
inline auto get_partial_key_gen(std::shared_ptr<InputTensorFrame> frame, TypedStreamVersion key) {
98+
inline auto get_partial_key_gen(std::shared_ptr<InputTensorFrame> frame, const TypedStreamVersion& key) {
9999
using PartialKey = stream::StreamSink::PartialKey;
100100

101-
return [frame=std::move(frame), key = std::move(key)](const FrameSlice& s) {
101+
return [frame=std::move(frame), &key](const FrameSlice& s) {
102102
if (frame->has_index()) {
103103
util::check(static_cast<bool>(frame->index_tensor), "Got null index tensor in get_partial_key_gen");
104104
auto& idx = frame->index_tensor.value();
@@ -116,5 +116,30 @@ inline auto get_partial_key_gen(std::shared_ptr<InputTensorFrame> frame, TypedSt
116116
};
117117
}
118118

119+
inline stream::StreamSink::PartialKey get_partial_key_for_segment_slice(const IndexDescriptorImpl& index, const TypedStreamVersion& key, const SegmentInMemory& slice) {
120+
using PartialKey = stream::StreamSink::PartialKey;
121+
122+
if (index.field_count() != 0) {
123+
util::check(static_cast<bool>(index.type() == IndexDescriptor::Type::TIMESTAMP), "Got unexpected index type in get_partial_key_for_segment_slice");
124+
auto& idx = slice.column(0);
125+
util::check(idx.scalar_at<timestamp>(0).has_value(), "First element of index column of slice does not contain a value");
126+
util::check(idx.scalar_at<timestamp>(slice.row_count()-1).has_value(), "Last element of index column of slice does not contain a value");
127+
auto start = idx.scalar_at<timestamp>(0).value();
128+
auto end = idx.scalar_at<timestamp>(slice.row_count()-1).value();
129+
return PartialKey{
130+
key.type, key.version_id, key.id, start, end_index_generator(end)
131+
};
132+
}
133+
else {
134+
return PartialKey{
135+
key.type,
136+
key.version_id,
137+
key.id,
138+
entity::safe_convert_to_numeric_index(slice.offset(), "Rows"),
139+
entity::safe_convert_to_numeric_index(slice.offset() + slice.row_count(), "Rows")
140+
};
141+
}
142+
}
143+
119144
} //arcticdb::pipelines
120145

cpp/arcticdb/toolbox/library_tool.cpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,17 @@ void LibraryTool::overwrite_segment_in_memory(VariantKey key, SegmentInMemory& s
7575
write(key, segment);
7676
}
7777

78+
SegmentInMemory LibraryTool::item_to_segment_in_memory(
79+
const StreamId &stream_id,
80+
const py::tuple &item,
81+
const py::object &norm,
82+
const py::object &user_meta,
83+
std::optional<AtomKey> next_key) {
84+
auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types());
85+
auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse());
86+
return segment_in_memory;
87+
}
88+
7889
SegmentInMemory LibraryTool::overwrite_append_data(
7990
VariantKey key,
8091
const py::tuple &item,
@@ -92,8 +103,7 @@ SegmentInMemory LibraryTool::overwrite_append_data(
92103
}
93104

94105
auto stream_id = util::variant_match(key, [](const auto& key){return key.id();});
95-
auto frame = convert::py_ndf_to_frame(stream_id, item, norm, user_meta, engine_.cfg().write_options().empty_types());
96-
auto segment_in_memory = incomplete_segment_from_frame(frame, 0, std::move(next_key), engine_.cfg().write_options().allow_sparse());
106+
auto segment_in_memory = item_to_segment_in_memory(stream_id, item, norm, user_meta, next_key);
97107
overwrite_segment_in_memory(key, segment_in_memory);
98108
return old_segment_in_memory;
99109
}

cpp/arcticdb/toolbox/library_tool.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class LibraryTool {
5151

5252
void overwrite_segment_in_memory(VariantKey key, SegmentInMemory& segment_in_memory);
5353

54+
SegmentInMemory item_to_segment_in_memory(const StreamId &stream_id, const py::tuple &item, const py::object &norm, const py::object & user_meta, std::optional<AtomKey> next_key = std::nullopt);
55+
5456
SegmentInMemory overwrite_append_data(VariantKey key, const py::tuple &item, const py::object &norm, const py::object & user_meta);
5557

5658
void remove(VariantKey key);

cpp/arcticdb/toolbox/python_bindings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ void register_bindings(py::module &m, py::exception<arcticdb::ArcticException>&
5454
)pbdoc")
5555
.def("write", &LibraryTool::write)
5656
.def("overwrite_segment_in_memory", &LibraryTool::overwrite_segment_in_memory)
57+
.def("item_to_segment_in_memory", &LibraryTool::item_to_segment_in_memory)
5758
.def("overwrite_append_data", &LibraryTool::overwrite_append_data)
5859
.def("remove", &LibraryTool::remove)
5960
.def("find_keys", &LibraryTool::find_keys)

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -757,30 +757,83 @@ std::pair<VersionedItem, TimeseriesDescriptor> LocalVersionedEngine::restore_ver
757757
return res.at(0);
758758
}
759759

760-
VersionedItem LocalVersionedEngine::write_individual_segment(
761-
const StreamId& stream_id,
762-
SegmentInMemory&& segment,
763-
bool prune_previous_versions
764-
) {
760+
VersionedItem LocalVersionedEngine::write_segment(
761+
const StreamId& stream_id,
762+
SegmentInMemory&& segment,
763+
bool prune_previous_versions,
764+
Slicing slicing
765+
) {
765766
ARCTICDB_SAMPLE(WriteVersionedDataFrame, 0)
766-
767+
util::check(segment.descriptor().id() == stream_id, "Stream_id does not match the one in the SegmentInMemory. Stream_id was {}, but SegmentInMemory had {}", stream_id, segment.descriptor().id());
767768
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: write individual segment");
768769
auto [maybe_prev, deleted] = ::arcticdb::get_latest_version(store(), version_map(), stream_id);
769770
auto version_id = get_next_version_from_key(maybe_prev);
770771
ARCTICDB_DEBUG(log::version(), "write individual segment for stream_id: {} , version_id = {}", stream_id, version_id);
771-
auto index = index_type_from_descriptor(segment.descriptor());
772-
auto range = get_range_from_segment(index, segment);
773772

774-
stream::StreamSink::PartialKey pk{
775-
KeyType::TABLE_DATA, version_id, stream_id, range.start_, range.end_
776-
};
773+
auto write_options = get_write_options();
774+
auto de_dup_map = get_de_dup_map(stream_id, maybe_prev, write_options);
777775

778-
auto frame_slice = FrameSlice{segment};
779-
auto descriptor = make_timeseries_descriptor(segment.row_count(),segment.descriptor().clone(), {}, std::nullopt,std::nullopt, std::nullopt, false);
780-
auto key = store_->write(pk, std::move(segment)).get();
781-
std::vector sk{SliceAndKey{frame_slice, to_atom(key)}};
782-
auto index_key_fut = index::write_index(index, std::move(descriptor), std::move(sk), IndexPartialKey{stream_id, version_id}, store_);
783-
auto versioned_item = VersionedItem{std::move(index_key_fut).get()};
776+
if(version_id == 0){
777+
auto check_outcome = verify_symbol_key(stream_id);
778+
if (std::holds_alternative<Error>(check_outcome)) {
779+
std::get<Error>(check_outcome).throw_error();
780+
}
781+
}
782+
auto partial_key = IndexPartialKey(stream_id, version_id);
783+
784+
// TODO: do the segment splitting in parallel
785+
std::vector<SegmentInMemory> slices;
786+
switch (slicing) {
787+
case Slicing::NoSlicing:
788+
slices = std::vector<SegmentInMemory>({segment});
789+
break;
790+
case Slicing::RowSlicing:
791+
slices = segment.split(get_write_options().segment_row_size, true);
792+
break;
793+
}
794+
795+
TypedStreamVersion tsv{partial_key.id, partial_key.version_id, KeyType::TABLE_DATA};
796+
int64_t write_window = write_window_size();
797+
auto fut_slice_keys = folly::collect(folly::window(std::move(slices), [sink = store(), de_dup_map, index_desc = segment.descriptor().index(), tsv=std::move(tsv)](auto&& slice) {
798+
auto descriptor = std::make_shared<entity::StreamDescriptor>(slice.descriptor());
799+
ColRange column_slice = {arcticdb::pipelines::get_index_field_count(slice), slice.descriptor().field_count()};
800+
RowRange row_slice = {slice.offset(), slice.offset() + slice.row_count()};
801+
auto frame_slice = FrameSlice{descriptor, column_slice, row_slice};
802+
auto pkey = get_partial_key_for_segment_slice(index_desc, tsv, slice);
803+
auto ks = std::make_tuple(
804+
std::move(pkey), std::move(slice), std::move(frame_slice)
805+
);
806+
return sink->async_write(std::move(ks), de_dup_map);
807+
},write_window)).via(&async::io_executor());
808+
809+
auto index = stream::index_type_from_descriptor(segment.descriptor());
810+
811+
// Create a TimeseriesDescriptor needed for the index key if segment doesn't already have one
812+
auto tsd = [&] () {
813+
if(!segment.has_index_descriptor()) {
814+
auto tsd = TimeseriesDescriptor();
815+
tsd.set_stream_descriptor(segment.descriptor());
816+
tsd.set_total_rows(segment.row_count());
817+
arcticdb::proto::descriptors::NormalizationMetadata norm_meta;
818+
norm_meta.mutable_df()->mutable_common()->mutable_index()->set_is_physically_stored(false);
819+
norm_meta.mutable_df()->mutable_common()->mutable_index()->set_start(0);
820+
norm_meta.mutable_df()->mutable_common()->mutable_index()->set_step(1);
821+
tsd.set_normalization_metadata(std::move(norm_meta));
822+
return tsd;
823+
}
824+
else {
825+
return segment.index_descriptor();
826+
}
827+
}();
828+
829+
auto atom_key_fut = std::move(fut_slice_keys).thenValue([partial_key = std::move(partial_key), sink = store(), tsd = std::move(tsd), index = std::move(index)](auto&& slice_keys) {
830+
return index::write_index(index, tsd, std::forward<decltype(slice_keys)>(slice_keys), partial_key, sink);
831+
});
832+
833+
auto versioned_item = VersionedItem(std::move(atom_key_fut).get());
834+
835+
if(cfg().symbol_list())
836+
symbol_list().add_symbol(store(), stream_id, versioned_item.key_.version_id());
784837

785838
write_version_and_prune_previous(prune_previous_versions, versioned_item.key_, deleted ? std::nullopt : maybe_prev);
786839
return versioned_item;

cpp/arcticdb/version/local_versioned_engine.hpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,13 @@ class LocalVersionedEngine : public VersionedEngine {
186186
bool validate_index
187187
) override;
188188

189+
VersionedItem write_segment(
190+
const StreamId& stream_id,
191+
SegmentInMemory&& segment,
192+
bool prune_previous_versions,
193+
Slicing slicing
194+
) override;
195+
189196
VersionedItem write_versioned_metadata_internal(
190197
const StreamId& stream_id,
191198
bool prune_previous_versions,
@@ -242,12 +249,6 @@ class LocalVersionedEngine : public VersionedEngine {
242249
const StreamId& stream_id,
243250
const VersionQuery& version_query);
244251

245-
VersionedItem write_individual_segment(
246-
const StreamId& stream_id,
247-
SegmentInMemory&& segment,
248-
bool prune_previous_versions
249-
) override;
250-
251252
std::set<StreamId> get_incomplete_symbols() override;
252253
std::set<StreamId> get_incomplete_refs() override;
253254
std::set<StreamId> get_active_incomplete_refs() override;

cpp/arcticdb/version/python_bindings.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,10 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
570570
.value("SPEED", PipelineOptimisation::SPEED)
571571
.value("MEMORY", PipelineOptimisation::MEMORY);
572572

573+
py::enum_<Slicing>(version, "Slicing")
574+
.value("NoSlicing", Slicing::NoSlicing)
575+
.value("RowSlicing", Slicing::RowSlicing);
576+
573577
py::class_<ExpressionContext, std::shared_ptr<ExpressionContext>>(version, "ExpressionContext")
574578
.def(py::init())
575579
.def("add_expression_node", &ExpressionContext::add_expression_node)
@@ -758,6 +762,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
758762
.def("write_versioned_dataframe",
759763
&PythonVersionStore::write_versioned_dataframe,
760764
py::call_guard<SingleThreadMutexHolder>(), "Write the most recent version of this dataframe to the store")
765+
.def("_test_write_versioned_segment",
766+
&PythonVersionStore::test_write_versioned_segment,
767+
py::call_guard<SingleThreadMutexHolder>(), "Write the most recent version of this segment to the store")
761768
.def("write_versioned_composite_data",
762769
&PythonVersionStore::write_versioned_composite_data,
763770
py::call_guard<SingleThreadMutexHolder>(), "Allows the user to write multiple dataframes in a batch with one version entity")

cpp/arcticdb/version/test/test_sparse.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ TEST_F(SparseTestStore, SegmentWithExistingIndex) {
419419
bool written = false;
420420
DynamicAggregator aggregator(std::move(schema), [&](SegmentInMemory &&segment) {
421421
if(!written) {
422-
test_store_->write_individual_segment(stream_id, std::move(segment), false);
422+
test_store_->write_segment(stream_id, std::move(segment), false, arcticdb::version_store::Slicing::NoSlicing);
423423
written = true;
424424
}
425425
else {
@@ -479,7 +479,7 @@ TEST_F(SparseTestStore, SegmentAndFilterColumn) {
479479
bool written = false;
480480
DynamicAggregator aggregator(std::move(schema), [&](SegmentInMemory &&segment) {
481481
if(!written) {
482-
test_store_->write_individual_segment(stream_id, std::move(segment), false);
482+
test_store_->write_segment(stream_id, std::move(segment), false, arcticdb::version_store::Slicing::NoSlicing);
483483
written = true;
484484
}
485485
else {
@@ -536,7 +536,7 @@ TEST_F(SparseTestStore, SegmentWithRangeFilter) {
536536
bool written = false;
537537
DynamicAggregator aggregator(std::move(schema), [&](SegmentInMemory &&segment) {
538538
if(!written) {
539-
test_store_->write_individual_segment(stream_id, std::move(segment), false);
539+
test_store_->write_segment(stream_id, std::move(segment), false, arcticdb::version_store::Slicing::NoSlicing);
540540
written = true;
541541
}
542542
else {

cpp/arcticdb/version/version_core.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ static void modify_descriptor(const std::shared_ptr<pipelines::PipelineContext>&
8282
VersionedItem write_dataframe_impl(
8383
const std::shared_ptr<Store>& store,
8484
VersionId version_id,
85-
const std::shared_ptr<pipelines::InputTensorFrame>& frame,
85+
const std::shared_ptr<InputTensorFrame>& frame,
8686
const WriteOptions& options,
8787
const std::shared_ptr<DeDupMap>& de_dup_map,
8888
bool sparsify_floats,
@@ -99,7 +99,7 @@ folly::Future<entity::AtomKey> async_write_dataframe_impl(
9999
VersionId version_id,
100100
const std::shared_ptr<InputTensorFrame>& frame,
101101
const WriteOptions& options,
102-
const std::shared_ptr<DeDupMap> &de_dup_map,
102+
const std::shared_ptr<DeDupMap>& de_dup_map,
103103
bool sparsify_floats,
104104
bool validate_index
105105
) {

0 commit comments

Comments
 (0)