Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ endif()
if(PJ_INSTALL_SDK)
include(CMakePackageConfigHelpers)

set(PJ_PACKAGE_VERSION "0.5.1")
set(PJ_PACKAGE_VERSION "0.6.0")
set(PJ_PACKAGE_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/plotjuggler_core)

install(EXPORT plotjuggler_coreTargets
Expand Down
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
plugin_sdk — umbrella for plugin authors (base + dialog SDK + parser SDK)
plugin_host — umbrella for host loaders (data_source/parser/toolbox/dialog)

A consuming Conan recipe declares e.g. `plotjuggler_core/0.5.1` and then:
A consuming Conan recipe declares e.g. `plotjuggler_core/0.6.0` and then:

find_package(plotjuggler_core REQUIRED COMPONENTS plugin_sdk)
target_link_libraries(my_plugin PRIVATE plotjuggler_core::plugin_sdk)
Expand All @@ -27,7 +27,7 @@

class PlotjugglerCoreConan(ConanFile):
name = "plotjuggler_core"
version = "0.5.1"
version = "0.6.0"
# Apache-2.0 covers pj_base + pj_plugins (the plugin-facing SDK);
# MPL-2.0 covers pj_datastore (the storage engine). See LICENSE.
license = "Apache-2.0 AND MPL-2.0"
Expand Down
23 changes: 23 additions & 0 deletions pj_datastore/include/pj_datastore/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "pj_base/dataset.hpp"
#include "pj_base/expected.hpp"
#include "pj_base/types.hpp"
#include "pj_datastore/replace_result.hpp"
#include "pj_datastore/topic_storage.hpp"
#include "pj_datastore/type_registry.hpp"

Expand Down Expand Up @@ -95,6 +96,22 @@ class DataEngine {
/// lockstep with the source via parallel registration at startup.
PJ::Status flushTo(DataEngine& dst);

/// In-place REPLACE of dataset `primary_id`'s data with the data ingested into
/// dataset `staged_id` of a throwaway `staged` engine — used for reload so the
/// primary DatasetId/TopicIds (and therefore all curve keys) stay stable.
/// Topics are matched by name. Matched primary topics have their chunks cleared
/// and replaced with the staged topic's chunks (each chunk's `topic_id` rewritten
/// to the primary id); their inline column layout is copied across. Staged-only
/// topics are created under `primary_id`. Primary-only topics are retired (chunks
/// cleared, id excluded from `listTopics`, storage kept so cached reader pointers
/// see an empty deque rather than dangling).
///
/// Caller MUST invalidate every reader/adapter bound to `primary_id` BEFORE this
/// call (no live `TopicChunk*` may survive into the chunk clear) and run no event
/// loop between that invalidation and this call. The staged engine is drained.
[[nodiscard]] PJ::Expected<DatasetReplaceResult> replaceDatasetFrom(
DataEngine& staged, PJ::DatasetId staged_id, PJ::DatasetId primary_id);

// Writer/Reader factories
/// Create a writer bound to this engine.
[[nodiscard]] DataWriter createWriter();
Expand All @@ -110,6 +127,12 @@ class DataEngine {
[[nodiscard]] std::vector<PJ::TopicId> listTopics(PJ::DatasetId dataset_id) const;

private:
// Move src's sealed chunks into dst, re-stamping each chunk's topic_id to
// dst's id. Shared by flushTo (append between mirrored topics) and
// replaceDatasetFrom (after dst is cleared). Needs friend access to
// TopicStorage::sealed_chunks_.
static void adoptChunksFrom(TopicStorage& dst, TopicStorage& src);

struct Impl;
std::unique_ptr<Impl> impl_;
};
Expand Down
27 changes: 27 additions & 0 deletions pj_datastore/include/pj_datastore/object_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ struct ObjectTopicDescriptor {
std::string metadata_json;
};

/// Outcome of ObjectStore::replaceDatasetFrom() — the object-side parallel of
/// DatasetReplaceResult. The remapped pairs let the caller re-register object
/// parsers under the stable primary ObjectTopicId after the swap.
struct ObjectDatasetReplaceResult {
/// (staged ObjectTopicId, primary ObjectTopicId) for every adopted topic
/// (both matched and newly added), so a parser keyed by the staged id can be
/// re-registered under the primary id. A newly added topic is simply one whose
/// primary id did not exist before — callers needing only the remap use this.
std::vector<std::pair<ObjectTopicId, ObjectTopicId>> remapped;
/// Primary object topics removed (no staged match).
std::vector<ObjectTopicId> removed_topics;
};

/// Eager payload: store-owned bytes, counted against the retention budget.
using SharedBuffer = std::shared_ptr<const std::vector<uint8_t>>;

Expand Down Expand Up @@ -178,6 +191,16 @@ class ObjectStore {
// Afterward, dst's retention budget is applied to each touched series.
Status flushTo(ObjectStore& dst);

// In-place REPLACE of object dataset `primary_id` with the topics ingested
// into `staged` dataset `staged_id` — the object-store half of reload. Topics
// are matched by name. Matched topics keep their primary ObjectTopicId and have
// their entries replaced; staged-only topics are registered under `primary_id`;
// primary-only topics are removed. Returns the staged->primary id map so the
// caller re-registers parsers under the stable ids. Either fully applies or
// (on a validation error) mutates neither store.
[[nodiscard]] Expected<ObjectDatasetReplaceResult> replaceDatasetFrom(
ObjectStore& staged, DatasetId staged_id, DatasetId primary_id);

// --- Lifecycle ---

void removeTopic(ObjectTopicId id);
Expand All @@ -196,6 +219,10 @@ class ObjectStore {
ObjectSeries* findSeries(ObjectTopicId id);
const ObjectSeries* findSeries(ObjectTopicId id) const;

// Erase a topic from topics_. Caller must already hold store_mutex_ (used by
// removeTopic under its own lock and by replaceDatasetFrom under the dual lock).
void eraseTopicLocked(ObjectTopicId id);

static std::optional<size_t> upperBoundIndex(const std::vector<Timestamp>& timestamps, Timestamp ts);
static ResolvedObjectEntry resolveEntry(const ObjectEntry& entry);

Expand Down
31 changes: 31 additions & 0 deletions pj_datastore/include/pj_datastore/replace_result.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once
// Copyright 2026 Davide Faconti
// SPDX-License-Identifier: MPL-2.0

#include <vector>

#include "pj_base/types.hpp"

namespace PJ {

/// Outcome of DataEngine::replaceDatasetFrom(): which primary topics had their
/// data swapped, which were created fresh, and which were retired (their source
/// topic vanished from the new data). All ids are PRIMARY engine ids — the whole
/// point of replace is that they stay stable so curve keys do not change.
///
/// (ObjectStore has the parallel ObjectDatasetReplaceResult, declared in
/// object_store.hpp next to ObjectTopicId.)
struct DatasetReplaceResult {
/// Primary topics whose chunks were replaced (topic name matched staged).
std::vector<TopicId> replaced_topics;
/// Primary topics newly created (name existed in staged but not primary).
std::vector<TopicId> added_topics;
/// Primary topics with no staged match: chunks cleared and the id retired
/// (excluded from listTopics; getTopicStorage still returns an empty storage
/// so any in-flight reader sees an empty deque rather than dangling).
/// Observability only — the engine self-hides these via listTopics, so callers
/// need not act on them (the catalog rebuild drops them on its own).
std::vector<TopicId> retired_topics;
};

} // namespace PJ
128 changes: 116 additions & 12 deletions pj_datastore/src/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <tsl/robin_map.h>

#include <algorithm>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "pj_base/expected.hpp"
Expand All @@ -23,6 +26,10 @@ struct DataEngine::Impl {
tsl::robin_map<PJ::DatasetId, PJ::DatasetInfo> datasets;
tsl::robin_map<PJ::TopicId, TopicStorage> topics;
tsl::robin_map<PJ::TimeDomainId, PJ::TimeDomain> time_domains;
// Topics retired by replaceDatasetFrom(): their storage is kept (so any
// cached reader pointer dereferences an empty deque, not freed memory) but
// they are hidden from listTopics so the catalog drops them.
std::unordered_set<PJ::TopicId> retired_topic_ids;
};

DataEngine::DataEngine() : impl_(std::make_unique<Impl>()) {}
Expand Down Expand Up @@ -222,23 +229,110 @@ Status DataEngine::flushTo(DataEngine& dst) {
plan.push_back({&src_storage, dst_storage});
}

// Phase 2: execute. friend access lets us move sealed_chunks_ directly
// between TopicStorage instances of different engines — the deque move
// transfers chunk ownership without copying any column data or value
// buffers. Each chunk's TopicChunkStats (t_min/t_max/row_count) rides
// along inside the chunk by value, so dst's time_min/time_max queries
// reflect the new state immediately after the move.
// Phase 2: execute. adoptChunksFrom moves sealed_chunks_ directly between
// TopicStorage instances (no column/value copy); the chunks' stats ride along
// by value, so dst's time_min/time_max reflect the new state immediately. The
// topic-id rewrite is a no-op here because flushTo's mirrored dst shares the
// source's TopicId.
for (auto& step : plan) {
auto drained = std::move(step.src->sealed_chunks_);
step.src->sealed_chunks_.clear(); // post-move state: deque is valid but empty.
for (auto& chunk : drained) {
step.dst->sealed_chunks_.push_back(std::move(chunk));
}
adoptChunksFrom(*step.dst, *step.src);
}

return {};
}

void DataEngine::adoptChunksFrom(TopicStorage& dst, TopicStorage& src) {
// friend access: drain src's deque into dst, re-stamping each chunk's topic_id
// to dst's id (a no-op when the two storages already share one, e.g. flushTo).
// Appends — callers that need replace semantics clear dst first.
std::deque<TopicChunk> drained = std::move(src.sealed_chunks_);
src.sealed_chunks_.clear(); // post-move state: deque is valid but empty.
const TopicId dst_id = dst.topic_id();
for (auto& chunk : drained) {
chunk.topic_id = dst_id;
dst.sealed_chunks_.push_back(std::move(chunk));
}
}

Expected<DatasetReplaceResult> DataEngine::replaceDatasetFrom(
DataEngine& staged, DatasetId staged_id, DatasetId primary_id) {
if (&staged == this) {
return PJ::unexpected("replaceDatasetFrom: staged and primary are the same engine");
}
auto primary_it = impl_->datasets.find(primary_id);
if (primary_it == impl_->datasets.end()) {
return PJ::unexpected(fmt::format("replaceDatasetFrom: primary dataset {} not found", primary_id));
}
if (staged.impl_->datasets.find(staged_id) == staged.impl_->datasets.end()) {
return PJ::unexpected(fmt::format("replaceDatasetFrom: staged dataset {} not found", staged_id));
}

// Snapshot primary topic name -> primary TopicId (includes ids retired by a
// previous replace, so a topic that comes back re-binds to its stable id).
std::unordered_map<std::string, TopicId> primary_by_name;
for (const TopicId tid : primary_it->second.topic_ids) {
if (const auto* storage = getTopicStorage(tid)) {
primary_by_name.emplace(storage->descriptor().name, tid);
}
}

DatasetReplaceResult result;
std::unordered_set<std::string> staged_names;

// Adopt every staged topic into the primary dataset, by name.
for (const TopicId staged_tid : staged.listTopics(staged_id)) {
TopicStorage* staged_storage = staged.getTopicStorage(staged_tid);
if (staged_storage == nullptr) {
continue;
}
const std::string name = staged_storage->descriptor().name;
staged_names.insert(name);

const auto match = primary_by_name.find(name);
const bool is_new = (match == primary_by_name.end());
TopicId primary_tid = 0;
if (is_new) {
// Schema-less by design: post-replace reads resolve columns from the moved
// chunks' own descriptors (and the copied inline layout below), so a new
// topic needs no registry schema (which would not exist in this engine).
TopicDescriptor desc = staged_storage->descriptor();
desc.dataset_id = primary_id;
desc.schema_id = 0;
auto created = createTopic(primary_id, std::move(desc));
if (!created.has_value()) {
return PJ::unexpected("replaceDatasetFrom: createTopic failed for '" + name + "': " + created.error());
}
primary_tid = *created;
result.added_topics.push_back(primary_tid);
} else {
primary_tid = match->second;
impl_->retired_topic_ids.erase(primary_tid); // un-retire if it had vanished
result.replaced_topics.push_back(primary_tid);
}

// Re-fetch AFTER any createTopic above (emplace may have rehashed impl_->topics).
TopicStorage* primary_storage = getTopicStorage(primary_tid);
primary_storage->clearChunks();
// Adopt the staged topic's inline layout (move, not copy — the staged engine
// is discarded next) then re-stamp + move its chunks onto the primary id.
primary_storage->column_descriptors_ = std::move(staged_storage->column_descriptors_);
adoptChunksFrom(*primary_storage, *staged_storage);
}

// Retire primary topics the new data no longer provides.
for (const auto& [name, primary_tid] : primary_by_name) {
if (staged_names.count(name) == 0 && impl_->retired_topic_ids.count(primary_tid) == 0) {
if (auto* storage = getTopicStorage(primary_tid)) {
storage->clearChunks();
}
impl_->retired_topic_ids.insert(primary_tid);
result.retired_topics.push_back(primary_tid);
}
}

return result;
}

// ---------------------------------------------------------------------------
// Listing helpers
// ---------------------------------------------------------------------------
Expand All @@ -257,7 +351,17 @@ std::vector<TopicId> DataEngine::listTopics(DatasetId dataset_id) const {
if (it == impl_->datasets.end()) {
return {};
}
return it->second.topic_ids;
if (impl_->retired_topic_ids.empty()) {
return it->second.topic_ids;
}
std::vector<TopicId> result;
result.reserve(it->second.topic_ids.size());
for (const TopicId tid : it->second.topic_ids) {
if (impl_->retired_topic_ids.count(tid) == 0) {
result.push_back(tid);
}
}
return result;
}

// ---------------------------------------------------------------------------
Expand Down
Loading