Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions extensions/rocksdb-repos/DatabaseContentRepository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,38 @@ bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configu

setCompactionPeriod(configuration);

auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
const auto cache_size = configuration->get(Configure::nifi_dbcontent_optimize_for_small_db_cache_size)
| utils::andThen([](const auto& cache_size_str) -> std::optional<uint64_t> {
return parsing::parseDataSize(cache_size_str) | utils::toOptional();
});

std::shared_ptr<rocksdb::Cache> cache = nullptr;
if (cache_size) {
cache = rocksdb::NewLRUCache(*cache_size);
}

auto set_db_opts = [encrypted_env, &cache] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
if (encrypted_env) {
db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
} else {
db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
}
if (cache) {
db_opts.call(&rocksdb::DBOptions::OptimizeForSmallDb, &cache);
}
};
auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) {
auto set_cf_opts = [&configuration, &cache] (rocksdb::ColumnFamilyOptions& cf_opts) {
cf_opts.OptimizeForPointLookup(4);
cf_opts.merge_operator = std::make_shared<StringAppender>();
cf_opts.max_successive_merges = 0;
if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) {
cf_opts.compression = *compression_type;
}
if (cache) {
cf_opts.OptimizeForSmallDb(&cache);
cf_opts.compression_opts.max_dict_bytes = 0;
}
};
db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_,
minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options));
Expand Down
26 changes: 13 additions & 13 deletions extensions/rocksdb-repos/database/OpenRocksDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,23 @@ std::optional<uint64_t> OpenRocksDb::getApproximateSizes() const {
return std::nullopt;
}

minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
std::string table_readers;
GetProperty("rocksdb.estimate-table-readers-mem", &table_readers);
void OpenRocksDb::fillU64FromProperty(uint64_t& member, std::string_view property_name) {
std::string property_value;
GetProperty(property_name, &property_value);
try {
stats.table_readers_size = std::stoull(table_readers);
member = std::stoull(property_value);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.estimate-table-readers-mem' property value from rocksdb content repository!");
logger_->log_warn("Could not retrieve valid '{}' property value from rocksdb content repository!", property_name);
}
}

std::string all_memtables;
GetProperty("rocksdb.cur-size-all-mem-tables", &all_memtables);
try {
stats.all_memory_tables_size = std::stoull(all_memtables);
} catch (const std::exception&) {
logger_->log_warn("Could not retrieve valid 'rocksdb.cur-size-all-mem-tables' property value from rocksdb content repository!");
}

minifi::core::RepositoryMetricsSource::RocksDbStats OpenRocksDb::getStats() {
minifi::core::RepositoryMetricsSource::RocksDbStats stats;
fillU64FromProperty(stats.table_readers_size, "rocksdb.estimate-table-readers-mem");
fillU64FromProperty(stats.all_memory_tables_size, "rocksdb.cur-size-all-mem-tables");
fillU64FromProperty(stats.block_cache_usage, "rocksdb.block-cache-usage");
fillU64FromProperty(stats.block_cache_pinned_usage, "rocksdb.block-cache-pinned-usage");

return stats;
}
Expand Down
2 changes: 2 additions & 0 deletions extensions/rocksdb-repos/database/OpenRocksDb.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class OpenRocksDb {
void handleResult(const rocksdb::Status& result);
void handleResult(const std::vector<rocksdb::Status>& results);

void fillU64FromProperty(uint64_t& member, std::string_view property_name);

gsl::not_null<RocksDbInstance*> db_;
gsl::not_null<std::shared_ptr<rocksdb::DB>> impl_;
gsl::not_null<std::shared_ptr<ColumnHandle>> column_;
Expand Down
6 changes: 6 additions & 0 deletions extensions/rocksdb-repos/database/RocksDbUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class Writable {
}
}

template <typename Method, typename... Args>
decltype(auto) call(Method method, Args&&... args) {
is_modified_ = true;
return std::invoke(method, target_, std::forward<Args>(args)...);
}

template<typename F>
const F& get(F T::* member) {
return target_.*member;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ std::vector<SerializedResponseNode> RepositoryMetricsSourceStore::serialize() co
if (auto rocksdb_stats = repo->getRocksDbStats()) {
parent.children.push_back({.name = "rocksDbTableReadersSize", .value = rocksdb_stats->table_readers_size});
parent.children.push_back({.name = "rocksDbAllMemoryTablesSize", .value = rocksdb_stats->all_memory_tables_size});
parent.children.push_back({.name = "rocksDbBlockCacheUsage", .value = rocksdb_stats->block_cache_usage});
parent.children.push_back({.name = "rocksDbBlockCachePinnedUsage", .value = rocksdb_stats->block_cache_pinned_usage});
}

serialized.push_back(parent);
Expand All @@ -68,6 +70,10 @@ std::vector<PublishedMetric> RepositoryMetricsSourceStore::calculateMetrics() co
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
metrics.push_back({"rocksdb_all_memory_tables_size_bytes", static_cast<double>(rocksdb_stats->all_memory_tables_size),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
metrics.push_back({"rocksdb_block_cache_usage_bytes", static_cast<double>(rocksdb_stats->block_cache_usage),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
metrics.push_back({"rocksdb_block_cache_pinned_usage_bytes", static_cast<double>(rocksdb_stats->block_cache_pinned_usage),
{{"metric_class", name_}, {"repository_name", repo->getRepositoryName()}}});
}
}
return metrics;
Expand Down
4 changes: 3 additions & 1 deletion libminifi/test/libtest/unit/ProvenanceTestHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ class TestRocksDbRepository : public TestThreadedRepository {
std::optional<RocksDbStats> getRocksDbStats() const override {
return RocksDbStats {
.table_readers_size = 100,
.all_memory_tables_size = 200
.all_memory_tables_size = 200,
.block_cache_usage = 85,
.block_cache_pinned_usage = 50
};
}
};
Expand Down
39 changes: 39 additions & 0 deletions libminifi/test/libtest/unit/TestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,43 @@ std::error_code sendMessagesViaSSL(const std::vector<std::string_view>& contents
return {};
}

std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str) {
std::vector<LogMessageView> messages;
const std::regex header_pattern(R"((\[[\d\-\s\:\.]+\]) (\s*\[[^\]]+\]) \[(.*)\])");
struct HeaderMarker {
size_t start;
std::string_view timestamp;
std::string_view logger_class;
std::string_view log_level;
size_t end;
};

std::vector<HeaderMarker> markers = ranges::subrange<std::sregex_iterator>(std::sregex_iterator(log_str.begin(), log_str.end(), header_pattern),
std::sregex_iterator()) |
ranges::views::transform([=](const std::smatch& m) {
return HeaderMarker{.start = static_cast<size_t>(m.position(0)),
.timestamp = std::string_view{log_str.data() + m.position(1), static_cast<size_t>(m.length(1))},
.logger_class = std::string_view{log_str.data() + m.position(2), static_cast<size_t>(m.length(2))},
.log_level = std::string_view{log_str.data() + m.position(3), static_cast<size_t>(m.length(3))},
.end = static_cast<size_t>(m.position(0) + m.length(0))
};
}) | ranges::to<std::vector>();

markers.push_back(HeaderMarker{.start = log_str.size(),
.timestamp = {},
.logger_class = {},
.log_level = {},
.end = log_str.size()
});

for (auto window: markers | ranges::views::sliding(2)) {
messages.push_back(LogMessageView{.timestamp = window[0].timestamp,
.logger_class = window[0].logger_class,
.log_level = window[0].log_level,
.payload = {log_str.data() + window[0].end, window[1].start - window[0].end}});
}

return messages;
}

} // namespace org::apache::nifi::minifi::test::utils
18 changes: 16 additions & 2 deletions libminifi/test/libtest/unit/TestUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@
#include "asio.hpp"
#include "asio/ssl.hpp"
#include "utils/net/Ssl.h"
#include "range/v3/algorithm/any_of.hpp"
#include "core/Processor.h"
#include "core/logging/LoggerFactory.h"
#include <range/v3/all.hpp>
#include "./ProcessorUtils.h"

using namespace std::literals::chrono_literals;
Expand Down Expand Up @@ -127,6 +126,12 @@ bool verifyLogLineVariantPresenceInPollTime(const std::chrono::duration<Rep, Per
return verifyEventHappenedInPollTime(wait_duration, check);
}

template<class Rep, class Period>
bool verifyLogMatchesRegexInPollTime(const std::chrono::duration<Rep, Period>& wait_duration, const std::string& regex) {
auto check = [&regex] { return LogTestController::getInstance().matchesRegex(regex); };
return verifyEventHappenedInPollTime(wait_duration, check);
}

namespace internal {
struct JsonContext {
const JsonContext *parent{nullptr};
Expand Down Expand Up @@ -233,6 +238,15 @@ inline bool runningAsUnixRoot() {
#endif
}

struct LogMessageView {
std::string_view timestamp;
std::string_view logger_class;
std::string_view log_level;
std::string_view payload;
};

std::vector<LogMessageView> extractLogMessageViews(const std::string& log_str);

} // namespace org::apache::nifi::minifi::test::utils

namespace Catch {
Expand Down
Loading
Loading