Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,11 @@ folly::Future<VersionedItem> LocalVersionedEngine::write_index_key_to_version_ma
if (add_new_symbol) {
write_version_fut =
std::move(write_version_fut)
.then([this, index_key_id = index_key.id(), reference_id = index_key.version_id()](auto&&) {
.thenValue([this,
index_key_id = index_key.id(),
reference_id = index_key.version_id(
)](auto&&) { // Using .then and not using validating result from previous future
// will swallow exception from previous future
return async::submit_io_task(
WriteSymbolTask(store(), symbol_list_ptr(), index_key_id, reference_id)
);
Expand Down Expand Up @@ -1722,10 +1726,9 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
upsert, "Cannot append to non-existent symbol {}", stream_id
);
constexpr static auto version_id = 0;
index_key_fut = async_write_dataframe_impl(
store(),
version_id,
update_info.next_version_id_,
frame,
write_options,
std::make_shared<DeDupMap>(),
Expand Down Expand Up @@ -1815,7 +1818,7 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
);
index_key_fut = async_write_dataframe_impl(
store(),
0,
update_info.next_version_id_,
std::move(frame),
std::move(write_options),
std::make_shared<DeDupMap>(),
Expand Down
17 changes: 16 additions & 1 deletion python/tests/integration/arcticdb/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from arcticdb.util.logger import get_logger
from arcticdb.version_store._store import VersionedItem
from arcticdb.version_store.library import Library, UpdatePayload, WritePayload
from arcticdb.util.test import assert_frame_equal
from arcticdb.util.test import assert_frame_equal, sample_dataframe
from arcticdb_ext.version_store import DataError, NoSuchVersionException
from tests.util.mark import LINUX

Expand Down Expand Up @@ -575,3 +575,18 @@ def test_update_bool_named_col(lmdb_version_store_dynamic_schema, idx):
lmdb_version_store_dynamic_schema.update(symbol, bad_df)

assert_frame_equal(lmdb_version_store_dynamic_schema.read(symbol).data, initial)


def test_batch_update_after_delete_upsert(arctic_library_lmdb):
lib = arctic_library_lmdb
lib.write("sym", sample_dataframe())
lib.write("sym1", sample_dataframe())
lib.write("sym1", sample_dataframe())
lib.delete_batch(["sym", "sym1"])
df = sample_dataframe()
df1 = sample_dataframe()
results = lib.update_batch([UpdatePayload("sym", df), UpdatePayload("sym1", df1)], upsert=True)
assert results[0].version == 1
assert results[1].version == 2
assert_frame_equal(lib.read("sym").data, df)
assert_frame_equal(lib.read("sym1").data, df1)
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,22 @@ def test_prune_previous_versions_append_batch(basic_store):
assert len(lib_tool.find_keys(KeyType.SYMBOL_LIST)) == 6


def test_batch_append_after_delete_upsert(lmdb_version_store_v1):
lib = lmdb_version_store_v1
lib.write("sym", 1)
lib.write("sym1", 1)
lib.write("sym1", 1)
lib.batch_delete_symbols(["sym", "sym1"])

df = sample_dataframe()
df1 = sample_dataframe()
results = lib.batch_append(["sym", "sym1"], [df, df1])
assert results[0].version == 1
assert results[1].version == 2
assert_frame_equal(lib.read("sym").data, df)
assert_frame_equal(lib.read("sym1").data, df1)


@pytest.mark.storage
def test_deleting_unknown_symbol(basic_store, symbol):
df = sample_dataframe()
Expand Down
Loading