Skip to content

Commit 7210ae0

Browse files
authored
Fix batch_append and batch_update bust version chain if upsert (#2661)
#### Reference Issues/PRs <!--Example: Fixes #1234. See also #3456.--> https://man312219.monday.com/boards/7852509418/pulses/9903341116 #### What does this implement or fix? Existing code incorrectly assumes when the symbol being upsert is not live, it has never existed. So the version being upserted has always been set to 0. But the symbol could exist but deleted at that moment. So the version being upserted has always been set to 0. If so, this will bust the version chain. This PR also fixes exception thrown in writing version keys being swallowed #### Any other comments? #### Checklist <details> <summary> Checklist for code changes... </summary> - [ ] Have you updated the relevant docstrings, documentation and copyright notice? - [ ] Is this contribution tested against [all ArcticDB's features](../docs/mkdocs/docs/technical/contributing.md)? - [ ] Do all exceptions introduced raise appropriate [error messages](https://docs.arcticdb.io/error_messages/)? - [ ] Are API changes highlighted in the PR description? - [ ] Is the PR labelled as enhancement or bug so it appears in autogenerated release notes? </details> <!-- Thanks for contributing a Pull Request to ArcticDB! Please ensure you have taken a look at: - ArcticDB's Code of Conduct: https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md - ArcticDB's Contribution Licensing: https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing -->
1 parent 810df8c commit 7210ae0

File tree

3 files changed

+39
-5
lines changed

3 files changed

+39
-5
lines changed

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,7 +1476,11 @@ folly::Future<VersionedItem> LocalVersionedEngine::write_index_key_to_version_ma
14761476
if (add_new_symbol) {
14771477
write_version_fut =
14781478
std::move(write_version_fut)
1479-
.then([this, index_key_id = index_key.id(), reference_id = index_key.version_id()](auto&&) {
1479+
.thenValue([this,
1480+
index_key_id = index_key.id(),
1481+
reference_id = index_key.version_id(
1482+
)](auto&&) { // Using .then and not using validating result from previous future
1483+
// will swallow exception from previous future
14801484
return async::submit_io_task(
14811485
WriteSymbolTask(store(), symbol_list_ptr(), index_key_id, reference_id)
14821486
);
@@ -1722,10 +1726,9 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
17221726
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
17231727
upsert, "Cannot append to non-existent symbol {}", stream_id
17241728
);
1725-
constexpr static auto version_id = 0;
17261729
index_key_fut = async_write_dataframe_impl(
17271730
store(),
1728-
version_id,
1731+
update_info.next_version_id_,
17291732
frame,
17301733
write_options,
17311734
std::make_shared<DeDupMap>(),
@@ -1815,7 +1818,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
18151818
);
18161819
index_key_fut = async_write_dataframe_impl(
18171820
store(),
1818-
0,
1821+
update_info.next_version_id_,
18191822
std::move(frame),
18201823
std::move(write_options),
18211824
std::make_shared<DeDupMap>(),

python/tests/integration/arcticdb/test_update.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from arcticdb.util.logger import get_logger
2828
from arcticdb.version_store._store import VersionedItem
2929
from arcticdb.version_store.library import Library, UpdatePayload, WritePayload
30-
from arcticdb.util.test import assert_frame_equal
30+
from arcticdb.util.test import assert_frame_equal, sample_dataframe
3131
from arcticdb_ext.version_store import DataError, NoSuchVersionException
3232
from tests.util.mark import LINUX
3333

@@ -575,3 +575,18 @@ def test_update_bool_named_col(lmdb_version_store_dynamic_schema, idx):
575575
lmdb_version_store_dynamic_schema.update(symbol, bad_df)
576576

577577
assert_frame_equal(lmdb_version_store_dynamic_schema.read(symbol).data, initial)
578+
579+
580+
def test_batch_update_after_delete_upsert(arctic_library_lmdb):
581+
lib = arctic_library_lmdb
582+
lib.write("sym", sample_dataframe())
583+
lib.write("sym1", sample_dataframe())
584+
lib.write("sym1", sample_dataframe())
585+
lib.delete_batch(["sym", "sym1"])
586+
df = sample_dataframe()
587+
df1 = sample_dataframe()
588+
results = lib.update_batch([UpdatePayload("sym", df), UpdatePayload("sym1", df1)], upsert=True)
589+
assert results[0].version == 1
590+
assert results[1].version == 2
591+
assert_frame_equal(lib.read("sym").data, df)
592+
assert_frame_equal(lib.read("sym1").data, df1)

python/tests/integration/arcticdb/version_store/test_basic_version_store.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,22 @@ def test_prune_previous_versions_append_batch(basic_store):
727727
assert len(lib_tool.find_keys(KeyType.SYMBOL_LIST)) == 6
728728

729729

730+
def test_batch_append_after_delete_upsert(lmdb_version_store_v1):
731+
lib = lmdb_version_store_v1
732+
lib.write("sym", 1)
733+
lib.write("sym1", 1)
734+
lib.write("sym1", 1)
735+
lib.batch_delete_symbols(["sym", "sym1"])
736+
737+
df = sample_dataframe()
738+
df1 = sample_dataframe()
739+
results = lib.batch_append(["sym", "sym1"], [df, df1])
740+
assert results[0].version == 1
741+
assert results[1].version == 2
742+
assert_frame_equal(lib.read("sym").data, df)
743+
assert_frame_equal(lib.read("sym1").data, df1)
744+
745+
730746
@pytest.mark.storage
731747
def test_deleting_unknown_symbol(basic_store, symbol):
732748
df = sample_dataframe()

0 commit comments

Comments
 (0)