From 9e7cdd063482a302352c0c9607f38caa217e6c9b Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Tue, 2 Jun 2026 00:04:55 +0200 Subject: [PATCH 1/2] feat(pj_datastore): in-place dataset replace primitives for reload (0.6.0) Add DataEngine::replaceDatasetFrom and ObjectStore::replaceDatasetFrom: swap a dataset's data in place while keeping its DatasetId/TopicId/ObjectTopicId stable, so a host reload can refresh data without orphaning curve keys or 2D-dock bindings. Topics are matched by name; matched topics have their chunks/entries replaced (each moved chunk's topic_id re-stamped to the primary id), staged-only topics are created/registered under the primary dataset, and primary-only topics are retired (scalars: chunks cleared + hidden from listTopics, storage kept so a cached reader pointer hits an empty deque; objects: removed). Returns the staged->primary id map so the host re-registers object parsers under stable ids. - engine: replaceDatasetFrom + retired_topic_ids (listTopics filter); factor the chunk-deque move shared with flushTo into adoptChunksFrom (re-stamps topic_id). - object_store: replaceDatasetFrom; factor find-and-erase into eraseTopicLocked. - replace_result.hpp: DatasetReplaceResult / ObjectDatasetReplaceResult. - tests: 7 new (engine + object store): chunk move/id rewrite, added/retired, id stability; entry move/id preserve, removed/added. Additive API only (pimpl, no vtable/struct changes) -> MINOR bump 0.5.1 -> 0.6.0; abi/baseline.abi unchanged (the additions live in pj_datastore, outside the pj_base ABI canary). Co-Authored-By: Claude Opus 4.8 --- CMakeLists.txt | 2 +- conanfile.py | 4 +- pj_datastore/include/pj_datastore/engine.hpp | 23 ++++ .../include/pj_datastore/object_store.hpp | 27 ++++ .../include/pj_datastore/replace_result.hpp | 31 +++++ pj_datastore/src/engine.cpp | 128 ++++++++++++++++-- pj_datastore/src/object_store.cpp | 77 +++++++++++ .../tests/engine_integration_test.cpp | 127 +++++++++++++++++ pj_datastore/tests/object_store_test.cpp | 84 ++++++++++++ 9 files changed, 488 insertions(+), 15 deletions(-) create mode 100644 pj_datastore/include/pj_datastore/replace_result.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e0318488..936bd94f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -202,7 +202,7 @@ endif() if(PJ_INSTALL_SDK) include(CMakePackageConfigHelpers) - set(PJ_PACKAGE_VERSION "0.5.1") + set(PJ_PACKAGE_VERSION "0.6.0") set(PJ_PACKAGE_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/plotjuggler_core) install(EXPORT plotjuggler_coreTargets diff --git a/conanfile.py b/conanfile.py index 68913376..6a7f4b9f 100644 --- a/conanfile.py +++ b/conanfile.py @@ -7,7 +7,7 @@ plugin_sdk — umbrella for plugin authors (base + dialog SDK + parser SDK) plugin_host — umbrella for host loaders (data_source/parser/toolbox/dialog) -A consuming Conan recipe declares e.g. `plotjuggler_core/0.5.1` and then: +A consuming Conan recipe declares e.g. `plotjuggler_core/0.6.0` and then: find_package(plotjuggler_core REQUIRED COMPONENTS plugin_sdk) target_link_libraries(my_plugin PRIVATE plotjuggler_core::plugin_sdk) @@ -27,7 +27,7 @@ class PlotjugglerCoreConan(ConanFile): name = "plotjuggler_core" - version = "0.5.1" + version = "0.6.0" # Apache-2.0 covers pj_base + pj_plugins (the plugin-facing SDK); # MPL-2.0 covers pj_datastore (the storage engine). See LICENSE. license = "Apache-2.0 AND MPL-2.0" diff --git a/pj_datastore/include/pj_datastore/engine.hpp b/pj_datastore/include/pj_datastore/engine.hpp index ac80546c..4a76d893 100644 --- a/pj_datastore/include/pj_datastore/engine.hpp +++ b/pj_datastore/include/pj_datastore/engine.hpp @@ -9,6 +9,7 @@ #include "pj_base/dataset.hpp" #include "pj_base/expected.hpp" #include "pj_base/types.hpp" +#include "pj_datastore/replace_result.hpp" #include "pj_datastore/topic_storage.hpp" #include "pj_datastore/type_registry.hpp" @@ -95,6 +96,22 @@ class DataEngine { /// lockstep with the source via parallel registration at startup. PJ::Status flushTo(DataEngine& dst); + /// In-place REPLACE of dataset `primary_id`'s data with the data ingested into + /// dataset `staged_id` of a throwaway `staged` engine — used for reload so the + /// primary DatasetId/TopicIds (and therefore all curve keys) stay stable. + /// Topics are matched by name. Matched primary topics have their chunks cleared + /// and replaced with the staged topic's chunks (each chunk's `topic_id` rewritten + /// to the primary id); their inline column layout is copied across. Staged-only + /// topics are created under `primary_id`. Primary-only topics are retired (chunks + /// cleared, id excluded from `listTopics`, storage kept so cached reader pointers + /// see an empty deque rather than dangling). + /// + /// Caller MUST invalidate every reader/adapter bound to `primary_id` BEFORE this + /// call (no live `TopicChunk*` may survive into the chunk clear) and run no event + /// loop between that invalidation and this call. The staged engine is drained. + [[nodiscard]] PJ::Expected + replaceDatasetFrom(DataEngine& staged, PJ::DatasetId staged_id, PJ::DatasetId primary_id); + // Writer/Reader factories /// Create a writer bound to this engine. [[nodiscard]] DataWriter createWriter(); @@ -110,6 +127,12 @@ class DataEngine { [[nodiscard]] std::vector listTopics(PJ::DatasetId dataset_id) const; private: + // Move src's sealed chunks into dst, re-stamping each chunk's topic_id to + // dst's id. Shared by flushTo (append between mirrored topics) and + // replaceDatasetFrom (after dst is cleared). Needs friend access to + // TopicStorage::sealed_chunks_. + static void adoptChunksFrom(TopicStorage& dst, TopicStorage& src); + struct Impl; std::unique_ptr impl_; }; diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index f0149e13..40b61d78 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -45,6 +45,19 @@ struct ObjectTopicDescriptor { std::string metadata_json; }; +/// Outcome of ObjectStore::replaceDatasetFrom() — the object-side parallel of +/// DatasetReplaceResult. The remapped pairs let the caller re-register object +/// parsers under the stable primary ObjectTopicId after the swap. +struct ObjectDatasetReplaceResult { + /// (staged ObjectTopicId, primary ObjectTopicId) for every adopted topic + /// (both matched and newly added), so a parser keyed by the staged id can be + /// re-registered under the primary id. A newly added topic is simply one whose + /// primary id did not exist before — callers needing only the remap use this. + std::vector> remapped; + /// Primary object topics removed (no staged match). + std::vector removed_topics; +}; + /// Eager payload: store-owned bytes, counted against the retention budget. using SharedBuffer = std::shared_ptr>; @@ -178,6 +191,16 @@ class ObjectStore { // Afterward, dst's retention budget is applied to each touched series. Status flushTo(ObjectStore& dst); + // In-place REPLACE of object dataset `primary_id` with the topics ingested + // into `staged` dataset `staged_id` — the object-store half of reload. Topics + // are matched by name. Matched topics keep their primary ObjectTopicId and have + // their entries replaced; staged-only topics are registered under `primary_id`; + // primary-only topics are removed. Returns the staged->primary id map so the + // caller re-registers parsers under the stable ids. Either fully applies or + // (on a validation error) mutates neither store. + [[nodiscard]] Expected + replaceDatasetFrom(ObjectStore& staged, DatasetId staged_id, DatasetId primary_id); + // --- Lifecycle --- void removeTopic(ObjectTopicId id); @@ -196,6 +219,10 @@ class ObjectStore { ObjectSeries* findSeries(ObjectTopicId id); const ObjectSeries* findSeries(ObjectTopicId id) const; + // Erase a topic from topics_. Caller must already hold store_mutex_ (used by + // removeTopic under its own lock and by replaceDatasetFrom under the dual lock). + void eraseTopicLocked(ObjectTopicId id); + static std::optional upperBoundIndex(const std::vector& timestamps, Timestamp ts); static ResolvedObjectEntry resolveEntry(const ObjectEntry& entry); diff --git a/pj_datastore/include/pj_datastore/replace_result.hpp b/pj_datastore/include/pj_datastore/replace_result.hpp new file mode 100644 index 00000000..54af6924 --- /dev/null +++ b/pj_datastore/include/pj_datastore/replace_result.hpp @@ -0,0 +1,31 @@ +#pragma once +// Copyright 2026 Davide Faconti +// SPDX-License-Identifier: MPL-2.0 + +#include + +#include "pj_base/types.hpp" + +namespace PJ { + +/// Outcome of DataEngine::replaceDatasetFrom(): which primary topics had their +/// data swapped, which were created fresh, and which were retired (their source +/// topic vanished from the new data). All ids are PRIMARY engine ids — the whole +/// point of replace is that they stay stable so curve keys do not change. +/// +/// (ObjectStore has the parallel ObjectDatasetReplaceResult, declared in +/// object_store.hpp next to ObjectTopicId.) +struct DatasetReplaceResult { + /// Primary topics whose chunks were replaced (topic name matched staged). + std::vector replaced_topics; + /// Primary topics newly created (name existed in staged but not primary). + std::vector added_topics; + /// Primary topics with no staged match: chunks cleared and the id retired + /// (excluded from listTopics; getTopicStorage still returns an empty storage + /// so any in-flight reader sees an empty deque rather than dangling). + /// Observability only — the engine self-hides these via listTopics, so callers + /// need not act on them (the catalog rebuild drops them on its own). + std::vector retired_topics; +}; + +} // namespace PJ diff --git a/pj_datastore/src/engine.cpp b/pj_datastore/src/engine.cpp index 693e1d26..5cc46837 100644 --- a/pj_datastore/src/engine.cpp +++ b/pj_datastore/src/engine.cpp @@ -7,6 +7,9 @@ #include #include +#include +#include +#include #include #include "pj_base/expected.hpp" @@ -23,6 +26,10 @@ struct DataEngine::Impl { tsl::robin_map datasets; tsl::robin_map topics; tsl::robin_map time_domains; + // Topics retired by replaceDatasetFrom(): their storage is kept (so any + // cached reader pointer dereferences an empty deque, not freed memory) but + // they are hidden from listTopics so the catalog drops them. + std::unordered_set retired_topic_ids; }; DataEngine::DataEngine() : impl_(std::make_unique()) {} @@ -222,23 +229,110 @@ Status DataEngine::flushTo(DataEngine& dst) { plan.push_back({&src_storage, dst_storage}); } - // Phase 2: execute. friend access lets us move sealed_chunks_ directly - // between TopicStorage instances of different engines — the deque move - // transfers chunk ownership without copying any column data or value - // buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides - // along inside the chunk by value, so dst's time_min/time_max queries - // reflect the new state immediately after the move. + // Phase 2: execute. adoptChunksFrom moves sealed_chunks_ directly between + // TopicStorage instances (no column/value copy); the chunks' stats ride along + // by value, so dst's time_min/time_max reflect the new state immediately. The + // topic-id rewrite is a no-op here because flushTo's mirrored dst shares the + // source's TopicId. for (auto& step : plan) { - auto drained = std::move(step.src->sealed_chunks_); - step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty. - for (auto& chunk : drained) { - step.dst->sealed_chunks_.push_back(std::move(chunk)); - } + adoptChunksFrom(*step.dst, *step.src); } return {}; } +void DataEngine::adoptChunksFrom(TopicStorage& dst, TopicStorage& src) { + // friend access: drain src's deque into dst, re-stamping each chunk's topic_id + // to dst's id (a no-op when the two storages already share one, e.g. flushTo). + // Appends — callers that need replace semantics clear dst first. + std::deque drained = std::move(src.sealed_chunks_); + src.sealed_chunks_.clear(); // post-move state: deque is valid but empty. + const TopicId dst_id = dst.topic_id(); + for (auto& chunk : drained) { + chunk.topic_id = dst_id; + dst.sealed_chunks_.push_back(std::move(chunk)); + } +} + +Expected DataEngine::replaceDatasetFrom( + DataEngine& staged, DatasetId staged_id, DatasetId primary_id) { + if (&staged == this) { + return PJ::unexpected("replaceDatasetFrom: staged and primary are the same engine"); + } + auto primary_it = impl_->datasets.find(primary_id); + if (primary_it == impl_->datasets.end()) { + return PJ::unexpected(fmt::format("replaceDatasetFrom: primary dataset {} not found", primary_id)); + } + if (staged.impl_->datasets.find(staged_id) == staged.impl_->datasets.end()) { + return PJ::unexpected(fmt::format("replaceDatasetFrom: staged dataset {} not found", staged_id)); + } + + // Snapshot primary topic name -> primary TopicId (includes ids retired by a + // previous replace, so a topic that comes back re-binds to its stable id). + std::unordered_map primary_by_name; + for (const TopicId tid : primary_it->second.topic_ids) { + if (const auto* storage = getTopicStorage(tid)) { + primary_by_name.emplace(storage->descriptor().name, tid); + } + } + + DatasetReplaceResult result; + std::unordered_set staged_names; + + // Adopt every staged topic into the primary dataset, by name. + for (const TopicId staged_tid : staged.listTopics(staged_id)) { + TopicStorage* staged_storage = staged.getTopicStorage(staged_tid); + if (staged_storage == nullptr) { + continue; + } + const std::string name = staged_storage->descriptor().name; + staged_names.insert(name); + + const auto match = primary_by_name.find(name); + const bool is_new = (match == primary_by_name.end()); + TopicId primary_tid = 0; + if (is_new) { + // Schema-less by design: post-replace reads resolve columns from the moved + // chunks' own descriptors (and the copied inline layout below), so a new + // topic needs no registry schema (which would not exist in this engine). + TopicDescriptor desc = staged_storage->descriptor(); + desc.dataset_id = primary_id; + desc.schema_id = 0; + auto created = createTopic(primary_id, std::move(desc)); + if (!created.has_value()) { + return PJ::unexpected("replaceDatasetFrom: createTopic failed for '" + name + "': " + created.error()); + } + primary_tid = *created; + result.added_topics.push_back(primary_tid); + } else { + primary_tid = match->second; + impl_->retired_topic_ids.erase(primary_tid); // un-retire if it had vanished + result.replaced_topics.push_back(primary_tid); + } + + // Re-fetch AFTER any createTopic above (emplace may have rehashed impl_->topics). + TopicStorage* primary_storage = getTopicStorage(primary_tid); + primary_storage->clearChunks(); + // Adopt the staged topic's inline layout (move, not copy — the staged engine + // is discarded next) then re-stamp + move its chunks onto the primary id. + primary_storage->column_descriptors_ = std::move(staged_storage->column_descriptors_); + adoptChunksFrom(*primary_storage, *staged_storage); + } + + // Retire primary topics the new data no longer provides. + for (const auto& [name, primary_tid] : primary_by_name) { + if (staged_names.count(name) == 0 && impl_->retired_topic_ids.count(primary_tid) == 0) { + if (auto* storage = getTopicStorage(primary_tid)) { + storage->clearChunks(); + } + impl_->retired_topic_ids.insert(primary_tid); + result.retired_topics.push_back(primary_tid); + } + } + + return result; +} + // --------------------------------------------------------------------------- // Listing helpers // --------------------------------------------------------------------------- @@ -257,7 +351,17 @@ std::vector DataEngine::listTopics(DatasetId dataset_id) const { if (it == impl_->datasets.end()) { return {}; } - return it->second.topic_ids; + if (impl_->retired_topic_ids.empty()) { + return it->second.topic_ids; + } + std::vector result; + result.reserve(it->second.topic_ids.size()); + for (const TopicId tid : it->second.topic_ids) { + if (impl_->retired_topic_ids.count(tid) == 0) { + result.push_back(tid); + } + } + return result; } // --------------------------------------------------------------------------- diff --git a/pj_datastore/src/object_store.cpp b/pj_datastore/src/object_store.cpp index f24fc0e9..0e8e0ca7 100644 --- a/pj_datastore/src/object_store.cpp +++ b/pj_datastore/src/object_store.cpp @@ -4,6 +4,9 @@ #include "pj_datastore/object_store.hpp" #include +#include +#include +#include namespace PJ { @@ -333,10 +336,84 @@ Status ObjectStore::flushTo(ObjectStore& dst) { return {}; } +Expected ObjectStore::replaceDatasetFrom( + ObjectStore& staged, DatasetId staged_id, DatasetId primary_id) { + if (&staged == this) { + return unexpected("replaceDatasetFrom: staged and primary are the same store"); + } + + // Deterministic lock order by address (same discipline as flushTo). + ObjectStore* first = this < &staged ? this : &staged; + ObjectStore* second = first == this ? &staged : this; + std::unique_lock first_lock(first->store_mutex_); + std::unique_lock second_lock(second->store_mutex_); + + ObjectDatasetReplaceResult result; + + // Index primary (this) series under primary_id, by name -> (series, id). + // Pointers stay valid across topics_.emplace_back below because the + // ObjectSeries are heap-owned by unique_ptr — a vector realloc moves the + // pointers, not the pointees. + std::unordered_map> primary_by_name; + for (auto& [pid, series] : topics_) { + if (series->descriptor.dataset_id == primary_id) { + primary_by_name.emplace(series->descriptor.topic_name, std::pair{series.get(), pid}); + } + } + + std::unordered_set staged_names; + + for (auto& [sid, staged_series] : staged.topics_) { + if (staged_series->descriptor.dataset_id != staged_id) { + continue; + } + const std::string& name = staged_series->descriptor.topic_name; + staged_names.insert(name); + + if (auto match = primary_by_name.find(name); match != primary_by_name.end()) { + // Matched: keep the primary ObjectTopicId, swap the entries in. + ObjectSeries* p = match->second.first; + p->entries = std::move(staged_series->entries); + p->entry_timestamps = std::move(staged_series->entry_timestamps); + p->memory_bytes = staged_series->memory_bytes; + p->descriptor.metadata_json = staged_series->descriptor.metadata_json; // adopt reloaded metadata + result.remapped.emplace_back(sid, match->second.second); + } else { + // Added: register a fresh primary series under primary_id, move entries in. + ObjectTopicId new_id{next_id_++}; + auto series = std::make_unique(); + series->descriptor = staged_series->descriptor; + series->descriptor.dataset_id = primary_id; + series->entries = std::move(staged_series->entries); + series->entry_timestamps = std::move(staged_series->entry_timestamps); + series->memory_bytes = staged_series->memory_bytes; + topics_.emplace_back(new_id, std::move(series)); + result.remapped.emplace_back(sid, new_id); + } + staged_series->memory_bytes = 0; // entries/timestamps already moved-from + } + + // Remove primary topics the reloaded dataset no longer provides. + for (const auto& [name, slot] : primary_by_name) { + if (staged_names.count(name) == 0) { + result.removed_topics.push_back(slot.second); + } + } + for (const ObjectTopicId rid : result.removed_topics) { + eraseTopicLocked(rid); + } + + return result; +} + // --- Lifecycle --- void ObjectStore::removeTopic(ObjectTopicId id) { std::unique_lock lock(store_mutex_); + eraseTopicLocked(id); +} + +void ObjectStore::eraseTopicLocked(ObjectTopicId id) { auto it = std::find_if(topics_.begin(), topics_.end(), [&](const auto& pair) { return pair.first == id; }); if (it != topics_.end()) { topics_.erase(it); diff --git a/pj_datastore/tests/engine_integration_test.cpp b/pj_datastore/tests/engine_integration_test.cpp index 3d771734..b2928b41 100644 --- a/pj_datastore/tests/engine_integration_test.cpp +++ b/pj_datastore/tests/engine_integration_test.cpp @@ -1119,5 +1119,132 @@ TEST(DataEngineFlushTest, PreservesTopicRegistrationOnSrc) { EXPECT_FALSE(src_storage->empty()); } +// =========================================================================== +// replaceDatasetFrom: in-place data replace that keeps DatasetId/TopicId stable +// (the engine half of reload). Match-by-name; staged chunks adopt the primary +// topic id; added/retired topics handled. +// =========================================================================== + +TopicId replaceWriteScalar(DataEngine& engine, DatasetId ds, const std::string& name, double base, int count) { + DataWriter writer = engine.createWriter(); + auto handle = writer.registerScalarSeries(ds, name, NumericType::kFloat64); + EXPECT_TRUE(handle.has_value()); + for (int i = 0; i < count; ++i) { + writer.appendScalar(*handle, static_cast(i) * 1000, base + static_cast(i)); + } + engine.commitChunks(writer.flushAll()); + return handle->topic_id; +} + +double firstValue(DataEngine& engine, TopicId topic) { + DataReader reader = engine.createReader(); + auto latest_or = reader.latestAt(QueryPoint{.topic_id = topic, .t = 0}); + EXPECT_TRUE(latest_or.has_value()); + auto& latest = *latest_or; + EXPECT_TRUE(latest.has_value()); + return latest->chunk->readNumericAsDouble(0, latest->row_index); +} + +std::size_t rowCount(DataEngine& engine, TopicId topic) { + DataReader reader = engine.createReader(); + auto cursor_or = reader.rangeQuery(QueryRange{.topic_id = topic, .t_min = 0, .t_max = static_cast(1) << 40}); + EXPECT_TRUE(cursor_or.has_value()); + std::size_t n = 0; + cursor_or->forEach([&n](const SampleRow&) { ++n; }); + return n; +} + +TEST(EngineReplaceTest, ChunkMoveAndTopicIdRewrite) { + DataEngine primary; + auto pds = primary.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(pds.has_value()); + const TopicId a = replaceWriteScalar(primary, *pds, "a", /*base=*/0.0, /*count=*/3); + const TopicId b = replaceWriteScalar(primary, *pds, "b", 0.0, 3); + + DataEngine staged; + auto sds = staged.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(sds.has_value()); + replaceWriteScalar(staged, *sds, "a", /*base=*/100.0, /*count=*/5); + replaceWriteScalar(staged, *sds, "b", 100.0, 5); + + auto res = primary.replaceDatasetFrom(staged, *sds, *pds); + ASSERT_TRUE(res.has_value()) << res.error(); + EXPECT_EQ(res->replaced_topics.size(), 2u); + EXPECT_TRUE(res->added_topics.empty()); + EXPECT_TRUE(res->retired_topics.empty()); + + // Topic ids stable; moved chunks re-stamped to the primary id; data is staged's. + const TopicStorage* sa = primary.getTopicStorage(a); + ASSERT_NE(sa, nullptr); + ASSERT_FALSE(sa->sealedChunks().empty()); + EXPECT_EQ(sa->sealedChunks().front().topic_id, a); + EXPECT_DOUBLE_EQ(firstValue(primary, a), 100.0); + EXPECT_EQ(rowCount(primary, a), 5u); + EXPECT_DOUBLE_EQ(firstValue(primary, b), 100.0); +} + +TEST(EngineReplaceTest, AddedTopicCreatedUnderPrimary) { + DataEngine primary; + auto pds = primary.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(pds.has_value()); + replaceWriteScalar(primary, *pds, "a", 0.0, 3); + + DataEngine staged; + auto sds = staged.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(sds.has_value()); + replaceWriteScalar(staged, *sds, "a", 0.0, 3); + replaceWriteScalar(staged, *sds, "b", 0.0, 3); // new topic + + auto res = primary.replaceDatasetFrom(staged, *sds, *pds); + ASSERT_TRUE(res.has_value()) << res.error(); + EXPECT_EQ(res->added_topics.size(), 1u); + EXPECT_EQ(primary.listTopics(*pds).size(), 2u); +} + +TEST(EngineReplaceTest, RetiredTopicHiddenButStorageKept) { + DataEngine primary; + auto pds = primary.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(pds.has_value()); + const TopicId a = replaceWriteScalar(primary, *pds, "a", 0.0, 3); + const TopicId b = replaceWriteScalar(primary, *pds, "b", 0.0, 3); + + DataEngine staged; + auto sds = staged.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(sds.has_value()); + replaceWriteScalar(staged, *sds, "a", 7.0, 3); // only "a" survives + + auto res = primary.replaceDatasetFrom(staged, *sds, *pds); + ASSERT_TRUE(res.has_value()) << res.error(); + ASSERT_EQ(res->retired_topics.size(), 1u); + EXPECT_EQ(res->retired_topics[0], b); + + // listTopics hides the retired id; getTopicStorage still returns an empty storage. + const auto topics = primary.listTopics(*pds); + ASSERT_EQ(topics.size(), 1u); + EXPECT_EQ(topics[0], a); + const TopicStorage* sb = primary.getTopicStorage(b); + ASSERT_NE(sb, nullptr); + EXPECT_TRUE(sb->empty()); +} + +TEST(EngineReplaceTest, IdStabilityReaderSeesNewData) { + DataEngine primary; + auto pds = primary.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(pds.has_value()); + const TopicId a = replaceWriteScalar(primary, *pds, "a", 0.0, 3); + + DataEngine staged; + auto sds = staged.createDataset(DatasetDescriptor{.source_name = "f", .time_domain_id = 0}); + ASSERT_TRUE(sds.has_value()); + replaceWriteScalar(staged, *sds, "a", 50.0, 3); + + ASSERT_TRUE(primary.replaceDatasetFrom(staged, *sds, *pds).has_value()); + + const auto topics = primary.listTopics(*pds); + ASSERT_EQ(topics.size(), 1u); + EXPECT_EQ(topics[0], a) << "topic id must be stable across replace"; + EXPECT_DOUBLE_EQ(firstValue(primary, a), 50.0); +} + } // namespace } // namespace PJ diff --git a/pj_datastore/tests/object_store_test.cpp b/pj_datastore/tests/object_store_test.cpp index bd825073..ee6066f1 100644 --- a/pj_datastore/tests/object_store_test.cpp +++ b/pj_datastore/tests/object_store_test.cpp @@ -733,5 +733,89 @@ TEST(ObjectStoreFlushTest, ZeroCopyOwnershipChainSurvives) { EXPECT_EQ(post_handle->payload.anchor.get(), pre_ptr) << "shared_ptr identity must survive the flush"; } +// ========================================================================= +// replaceDatasetFrom: in-place object-data replace keeping ObjectTopicId stable +// (the object-store half of reload). +// ========================================================================= + +TEST(ObjectStoreReplaceTest, EntryMoveAndIdPreservation) { + ObjectStore primary; + ObjectStore staged; + auto primary_id = primary.registerTopic({.dataset_id = 1, .topic_name = "cam/image", .metadata_json = "{}"}); + ASSERT_TRUE(primary_id.has_value()); + ASSERT_TRUE(primary.pushOwned(*primary_id, 10, makePayload(4, 0x11)).has_value()); + + auto staged_id = staged.registerTopic({.dataset_id = 7, .topic_name = "cam/image", .metadata_json = R"({"k":1})"}); + ASSERT_TRUE(staged_id.has_value()); + for (Timestamp t : {Timestamp{100}, Timestamp{200}, Timestamp{300}}) { + ASSERT_TRUE(staged.pushOwned(*staged_id, t, makePayload(4, 0x22)).has_value()); + } + + auto res = primary.replaceDatasetFrom(staged, /*staged_id=*/7, /*primary_id=*/1); + ASSERT_TRUE(res.has_value()); + EXPECT_EQ(primary.entryCount(*primary_id), 3u) << "entries replaced with staged's"; + EXPECT_EQ(staged.entryCount(*staged_id), 0u) << "staged drained"; + EXPECT_EQ(primary.descriptor(*primary_id).metadata_json, R"({"k":1})") << "reloaded metadata adopted"; + ASSERT_EQ(res->remapped.size(), 1u); + EXPECT_EQ(res->remapped[0].first.id, staged_id->id); + EXPECT_EQ(res->remapped[0].second.id, primary_id->id) << "primary ObjectTopicId preserved"; + EXPECT_TRUE(res->removed_topics.empty()); +} + +TEST(ObjectStoreReplaceTest, RemovedTopic) { + ObjectStore primary; + ObjectStore staged; + auto keep = primary.registerTopic({.dataset_id = 1, .topic_name = "cam/image", .metadata_json = "{}"}); + auto drop = primary.registerTopic({.dataset_id = 1, .topic_name = "cam/depth", .metadata_json = "{}"}); + ASSERT_TRUE(keep.has_value()); + ASSERT_TRUE(drop.has_value()); + ASSERT_TRUE(primary.pushOwned(*keep, 10, makePayload(4)).has_value()); + ASSERT_TRUE(primary.pushOwned(*drop, 10, makePayload(4)).has_value()); + + auto staged_keep = staged.registerTopic({.dataset_id = 5, .topic_name = "cam/image", .metadata_json = "{}"}); + ASSERT_TRUE(staged_keep.has_value()); + ASSERT_TRUE(staged.pushOwned(*staged_keep, 100, makePayload(4)).has_value()); + + auto res = primary.replaceDatasetFrom(staged, 5, 1); + ASSERT_TRUE(res.has_value()); + ASSERT_EQ(res->removed_topics.size(), 1u); + EXPECT_EQ(res->removed_topics[0].id, drop->id); + EXPECT_TRUE(primary.descriptor(*drop).topic_name.empty()) << "removed topic gone from store"; + EXPECT_EQ(primary.descriptor(*keep).topic_name, "cam/image"); + EXPECT_EQ(primary.entryCount(*keep), 1u); +} + +TEST(ObjectStoreReplaceTest, AddedTopic) { + ObjectStore primary; + ObjectStore staged; + auto img = primary.registerTopic({.dataset_id = 1, .topic_name = "cam/image", .metadata_json = "{}"}); + ASSERT_TRUE(img.has_value()); + ASSERT_TRUE(primary.pushOwned(*img, 10, makePayload(4)).has_value()); + + auto s_img = staged.registerTopic({.dataset_id = 3, .topic_name = "cam/image", .metadata_json = "{}"}); + auto s_lidar = staged.registerTopic({.dataset_id = 3, .topic_name = "cam/lidar", .metadata_json = "{}"}); + ASSERT_TRUE(s_img.has_value()); + ASSERT_TRUE(s_lidar.has_value()); + ASSERT_TRUE(staged.pushOwned(*s_img, 100, makePayload(4)).has_value()); + ASSERT_TRUE(staged.pushOwned(*s_lidar, 100, makePayload(4)).has_value()); + + auto res = primary.replaceDatasetFrom(staged, 3, 1); + ASSERT_TRUE(res.has_value()); + // The new lidar topic is registered under the primary dataset and reported in + // the remap as (staged lidar id -> its fresh primary id). + auto lidar_in_primary = primary.findTopic(1, "cam/lidar"); + ASSERT_TRUE(lidar_in_primary.has_value()); + EXPECT_EQ(primary.entryCount(*lidar_in_primary), 1u); + bool lidar_remapped = false; + for (const auto& [staged_id, primary_topic_id] : res->remapped) { + if (staged_id.id == s_lidar->id) { + EXPECT_EQ(primary_topic_id.id, lidar_in_primary->id); + lidar_remapped = true; + } + } + EXPECT_TRUE(lidar_remapped) << "added topic must appear in the remap"; + EXPECT_EQ(primary.findTopic(1, "cam/image")->id, img->id) << "existing topic id preserved"; +} + } // namespace } // namespace PJ From 0664a926c02a67006e0f1ff7550b2deed8130dbd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 2 Jun 2026 07:07:34 +0000 Subject: [PATCH 2/2] style(pj_datastore): apply pre-commit clang-format fixes --- pj_datastore/include/pj_datastore/engine.hpp | 4 ++-- pj_datastore/include/pj_datastore/object_store.hpp | 4 ++-- pj_datastore/tests/engine_integration_test.cpp | 3 ++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pj_datastore/include/pj_datastore/engine.hpp b/pj_datastore/include/pj_datastore/engine.hpp index 4a76d893..169cd30d 100644 --- a/pj_datastore/include/pj_datastore/engine.hpp +++ b/pj_datastore/include/pj_datastore/engine.hpp @@ -109,8 +109,8 @@ class DataEngine { /// Caller MUST invalidate every reader/adapter bound to `primary_id` BEFORE this /// call (no live `TopicChunk*` may survive into the chunk clear) and run no event /// loop between that invalidation and this call. The staged engine is drained. - [[nodiscard]] PJ::Expected - replaceDatasetFrom(DataEngine& staged, PJ::DatasetId staged_id, PJ::DatasetId primary_id); + [[nodiscard]] PJ::Expected replaceDatasetFrom( + DataEngine& staged, PJ::DatasetId staged_id, PJ::DatasetId primary_id); // Writer/Reader factories /// Create a writer bound to this engine. diff --git a/pj_datastore/include/pj_datastore/object_store.hpp b/pj_datastore/include/pj_datastore/object_store.hpp index 40b61d78..31235e2b 100644 --- a/pj_datastore/include/pj_datastore/object_store.hpp +++ b/pj_datastore/include/pj_datastore/object_store.hpp @@ -198,8 +198,8 @@ class ObjectStore { // primary-only topics are removed. Returns the staged->primary id map so the // caller re-registers parsers under the stable ids. Either fully applies or // (on a validation error) mutates neither store. - [[nodiscard]] Expected - replaceDatasetFrom(ObjectStore& staged, DatasetId staged_id, DatasetId primary_id); + [[nodiscard]] Expected replaceDatasetFrom( + ObjectStore& staged, DatasetId staged_id, DatasetId primary_id); // --- Lifecycle --- diff --git a/pj_datastore/tests/engine_integration_test.cpp b/pj_datastore/tests/engine_integration_test.cpp index b2928b41..b3a2a13f 100644 --- a/pj_datastore/tests/engine_integration_test.cpp +++ b/pj_datastore/tests/engine_integration_test.cpp @@ -1147,7 +1147,8 @@ double firstValue(DataEngine& engine, TopicId topic) { std::size_t rowCount(DataEngine& engine, TopicId topic) { DataReader reader = engine.createReader(); - auto cursor_or = reader.rangeQuery(QueryRange{.topic_id = topic, .t_min = 0, .t_max = static_cast(1) << 40}); + auto cursor_or = + reader.rangeQuery(QueryRange{.topic_id = topic, .t_min = 0, .t_max = static_cast(1) << 40}); EXPECT_TRUE(cursor_or.has_value()); std::size_t n = 0; cursor_or->forEach([&n](const SampleRow&) { ++n; });