Skip to content

Commit 7799f08

Browse files
committed
Fix batch append and batch update bust version chain issue
1 parent d78b668 commit 7799f08

File tree

3 files changed

+39
-5
lines changed

3 files changed

+39
-5
lines changed

cpp/arcticdb/version/local_versioned_engine.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1476,7 +1476,11 @@ folly::Future<VersionedItem> LocalVersionedEngine::write_index_key_to_version_ma
14761476
if (add_new_symbol) {
14771477
write_version_fut =
14781478
std::move(write_version_fut)
1479-
.then([this, index_key_id = index_key.id(), reference_id = index_key.version_id()](auto&&) {
1479+
.thenValue([this,
1480+
index_key_id = index_key.id(),
1481+
reference_id = index_key.version_id(
1482+
)](auto&&) { // Using .then and not using validating result from previous future
1483+
// will swallow exception from previous future
14801484
return async::submit_io_task(
14811485
WriteSymbolTask(store(), symbol_list_ptr(), index_key_id, reference_id)
14821486
);
@@ -1722,10 +1726,9 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
17221726
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
17231727
upsert, "Cannot append to non-existent symbol {}", stream_id
17241728
);
1725-
constexpr static auto version_id = 0;
17261729
index_key_fut = async_write_dataframe_impl(
17271730
store(),
1728-
version_id,
1731+
update_info.next_version_id_,
17291732
frame,
17301733
write_options,
17311734
std::make_shared<DeDupMap>(),
@@ -1815,7 +1818,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
18151818
);
18161819
index_key_fut = async_write_dataframe_impl(
18171820
store(),
1818-
0,
1821+
update_info.next_version_id_,
18191822
std::move(frame),
18201823
std::move(write_options),
18211824
std::make_shared<DeDupMap>(),

python/tests/integration/arcticdb/test_update.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from arcticdb.util.logger import get_logger
2626
from arcticdb.version_store._store import VersionedItem
2727
from arcticdb.version_store.library import Library, UpdatePayload, WritePayload
28-
from arcticdb.util.test import assert_frame_equal
28+
from arcticdb.util.test import assert_frame_equal, sample_dataframe
2929
from arcticdb_ext.version_store import DataError, NoSuchVersionException
3030
from tests.util.mark import LINUX
3131

@@ -555,3 +555,18 @@ def test_update_batch_different_updates_dynamic_schema(custom_library):
555555
ArcticSymbolSimulator.assert_frame_equal_rebuild_index_first(
556556
expected_results[result.symbol], read_data[result.symbol].data
557557
)
558+
559+
560+
def test_batch_update_after_delete_upsert(arctic_library_lmdb):
561+
lib = arctic_library_lmdb
562+
lib.write("sym", sample_dataframe())
563+
lib.write("sym1", sample_dataframe())
564+
lib.write("sym1", sample_dataframe())
565+
lib.delete_batch(["sym", "sym1"])
566+
df = sample_dataframe()
567+
df1 = sample_dataframe()
568+
results = lib.update_batch([UpdatePayload("sym", df), UpdatePayload("sym1", df1)], upsert=True)
569+
assert results[0].version == 1
570+
assert results[1].version == 2
571+
assert_frame_equal(lib.read("sym").data, df)
572+
assert_frame_equal(lib.read("sym1").data, df1)

python/tests/integration/arcticdb/version_store/test_basic_version_store.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,22 @@ def test_prune_previous_versions_append_batch(basic_store):
718718
assert len(lib_tool.find_keys(KeyType.SYMBOL_LIST)) == 6
719719

720720

721+
def test_batch_append_after_delete_upsert(lmdb_version_store_v1):
722+
lib = lmdb_version_store_v1
723+
lib.write("sym", 1)
724+
lib.write("sym1", 1)
725+
lib.write("sym1", 1)
726+
lib.batch_delete_symbols(["sym", "sym1"])
727+
728+
df = sample_dataframe()
729+
df1 = sample_dataframe()
730+
results = lib.batch_append(["sym", "sym1"], [df, df1])
731+
assert results[0].version == 1
732+
assert results[1].version == 2
733+
assert_frame_equal(lib.read("sym").data, df)
734+
assert_frame_equal(lib.read("sym1").data, df1)
735+
736+
721737
@pytest.mark.storage
722738
def test_deleting_unknown_symbol(basic_store, symbol):
723739
df = sample_dataframe()

0 commit comments

Comments
 (0)