From dcf09f362653fdca49973be13205e0eaa20fd03c Mon Sep 17 00:00:00 2001 From: phoebusm Date: Thu, 18 Sep 2025 18:36:56 +0100 Subject: [PATCH 1/2] Fix batch append and batch update bust version chain issue --- cpp/arcticdb/version/local_versioned_engine.cpp | 11 +++++++---- .../tests/integration/arcticdb/test_update.py | 17 ++++++++++++++++- .../version_store/test_basic_version_store.py | 16 ++++++++++++++++ 3 files changed, 39 insertions(+), 5 deletions(-) diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index 1a75251209..7edc3029f5 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -1476,7 +1476,11 @@ folly::Future LocalVersionedEngine::write_index_key_to_version_ma if (add_new_symbol) { write_version_fut = std::move(write_version_fut) - .then([this, index_key_id = index_key.id(), reference_id = index_key.version_id()](auto&&) { + .thenValue([this, + index_key_id = index_key.id(), + reference_id = index_key.version_id( + )](auto&&) { // Using .then and not using validating result from previous future + // will swallow exception from previous future return async::submit_io_task( WriteSymbolTask(store(), symbol_list_ptr(), index_key_id, reference_id) ); @@ -1722,10 +1726,9 @@ std::vector> LocalVersionedEngine::batch_ missing_data::check( upsert, "Cannot append to non-existent symbol {}", stream_id ); - constexpr static auto version_id = 0; index_key_fut = async_write_dataframe_impl( store(), - version_id, + update_info.next_version_id_, frame, write_options, std::make_shared(), @@ -1815,7 +1818,7 @@ std::vector> LocalVersionedEngine::batch_ ); index_key_fut = async_write_dataframe_impl( store(), - 0, + update_info.next_version_id_, std::move(frame), std::move(write_options), std::make_shared(), diff --git a/python/tests/integration/arcticdb/test_update.py b/python/tests/integration/arcticdb/test_update.py index 3452476ef5..1e67c7b75e 100644 --- a/python/tests/integration/arcticdb/test_update.py +++ b/python/tests/integration/arcticdb/test_update.py @@ -27,7 +27,7 @@ from arcticdb.util.logger import get_logger from arcticdb.version_store._store import VersionedItem from arcticdb.version_store.library import Library, UpdatePayload, WritePayload -from arcticdb.util.test import assert_frame_equal +from arcticdb.util.test import assert_frame_equal, sample_dataframe from arcticdb_ext.version_store import DataError, NoSuchVersionException from tests.util.mark import LINUX @@ -575,3 +575,18 @@ def test_update_bool_named_col(lmdb_version_store_dynamic_schema, idx): lmdb_version_store_dynamic_schema.update(symbol, bad_df) assert_frame_equal(lmdb_version_store_dynamic_schema.read(symbol).data, initial) + + +def test_batch_update_after_delete_upsert(arctic_library_lmdb): + lib = arctic_library_lmdb + lib.write("sym", sample_dataframe()) + lib.write("sym1", sample_dataframe()) + lib.write("sym1", sample_dataframe()) + lib.delete_batch(["sym", "sym1"]) + df = sample_dataframe() + df1 = sample_dataframe() + results = lib.update_batch([UpdatePayload("sym", df), UpdatePayload("sym1", df1)], upsert=True) + assert results[0].version == 1 + assert results[1].version == 2 + assert_frame_equal(lib.read("sym").data, df) + assert_frame_equal(lib.read("sym1").data, df1) diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 3876d3ca11..1f15bf6726 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -726,6 +726,22 @@ def test_prune_previous_versions_append_batch(basic_store): assert len(lib_tool.find_keys(KeyType.SYMBOL_LIST)) == 6 +def test_batch_append_after_delete_upsert(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + lib.write("sym", 1) + lib.write("sym1", 1) + lib.write("sym1", 1) + lib.batch_delete_symbols(["sym", "sym1"]) + + df = sample_dataframe() + df1 = sample_dataframe() + results = lib.batch_append(["sym", "sym1"], [df, df1]) + assert results[0].version == 1 + assert results[1].version == 2 + assert_frame_equal(lib.read("sym").data, df) + assert_frame_equal(lib.read("sym1").data, df1) + + @pytest.mark.storage def test_deleting_unknown_symbol(basic_store, symbol): df = sample_dataframe() From f5a1f20416d23a0dead1943ffd99ddd628c0bcc0 Mon Sep 17 00:00:00 2001 From: phoebusm Date: Fri, 26 Sep 2025 16:38:06 +0100 Subject: [PATCH 2/2] Format --- python/tests/integration/arcticdb/test_update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/integration/arcticdb/test_update.py b/python/tests/integration/arcticdb/test_update.py index 1e67c7b75e..a70f3b42dc 100644 --- a/python/tests/integration/arcticdb/test_update.py +++ b/python/tests/integration/arcticdb/test_update.py @@ -576,7 +576,7 @@ def test_update_bool_named_col(lmdb_version_store_dynamic_schema, idx): assert_frame_equal(lmdb_version_store_dynamic_schema.read(symbol).data, initial) - + def test_batch_update_after_delete_upsert(arctic_library_lmdb): lib = arctic_library_lmdb lib.write("sym", sample_dataframe())