Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "^(AppleClang|Clang|GNU)$")
add_flag(-Werror-reorder) # field '$1' will be initialized after field '$2'
elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC")
# using Visual Studio C++
# TODO(warchant): add flags https://github.com/lefticus/cppbestpractices/blob/master/02-Use_the_Tools_Available.md#msvc
add_flag(/W4) # Enable all warnings
add_flag(/WX) # Treat warnings as errors
add_flag(/permissive-) # Disable non-conforming code
add_flag(/Zc:__cplusplus) # Enable correct __cplusplus macro
add_flag(/std:c++20) # Use C++20 standard
endif ()

if (CLANG_TIDY)
Expand Down
9 changes: 8 additions & 1 deletion include/libp2p/basic/write_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@

namespace libp2p::basic {

/// A thread-safe write queue for managing data transmission with flow control
///
/// This class provides:
/// - Bounded queue with configurable size limits
/// - Partial message transmission support
/// - Callback-based completion notification
/// - Automatic error handling for overflow conditions
class WriteQueue {
public:
using DataRef = BytesIn;

static constexpr size_t kDefaultSizeLimit = 64 * 1024 * 1024;
static constexpr size_t kDefaultSizeLimit = 64 * 1024 * 1024; // 64MB default limit

explicit WriteQueue(size_t size_limit = kDefaultSizeLimit)
: size_limit_(size_limit) {}
Expand Down
48 changes: 48 additions & 0 deletions include/libp2p/protocol/kademlia/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,44 @@ namespace libp2p::protocol::kademlia {
*/
std::chrono::seconds delay = 10s;
};

struct PeriodicReplication {
/**
* True if periodic replication is enabled
*/
bool enabled = true;

/**
* Interval for periodic replication
* @note Default: 1h
*/
std::chrono::seconds interval = 1h;

/**
* Number of peers to replicate to per cycle
* @note Default: 3 (subset of K_VALUE)
*/
size_t peers_per_cycle = 3;
};

struct PeriodicRepublishing {
/**
* True if periodic republishing is enabled
*/
bool enabled = true;

/**
* Interval for periodic republishing
* @note Default: 24h
*/
std::chrono::seconds interval = 24h;

/**
* Number of peers to republish to per cycle
* @note Default: 6 (subset of K_VALUE)
*/
size_t peers_per_cycle = 6;
};
} // namespace

class Config {
Expand Down Expand Up @@ -146,6 +184,16 @@ namespace libp2p::protocol::kademlia {
*/
RandomWalk randomWalk{};

/**
* Periodic replication config
*/
PeriodicReplication periodicReplication{};

/**
* Periodic republishing config
*/
PeriodicRepublishing periodicRepublishing{};

// https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120
size_t query_initial_peers = K_VALUE;

Expand Down
36 changes: 36 additions & 0 deletions include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,22 @@ namespace libp2p::protocol::kademlia {
std::shared_ptr<Session> openSession(
std::shared_ptr<connection::Stream> stream) override;

/// Set replication interval
/// @param interval - replication interval
void setReplicationInterval(std::chrono::seconds interval) override;

/// Set republishing interval
/// @param interval - republishing interval
void setRepublishingInterval(std::chrono::seconds interval) override;

/// Enable/disable periodic replication
/// @param enabled - whether to enable replication
void setReplicationEnabled(bool enabled) override;

/// Enable/disable periodic republishing
/// @param enabled - whether to enable republishing
void setRepublishingEnabled(bool enabled) override;

private:
void onPutValue(const std::shared_ptr<Session> &session, Message &&msg);
void onGetValue(const std::shared_ptr<Session> &session, Message &&msg);
Expand Down Expand Up @@ -145,6 +161,26 @@ namespace libp2p::protocol::kademlia {
basic::Scheduler::Handle handle{};
} random_walking_;

// Periodic replication and republishing
basic::Scheduler::Handle replication_timer_;
basic::Scheduler::Handle republishing_timer_;

// Mutable configuration for runtime changes
mutable std::chrono::seconds replication_interval_;
mutable std::chrono::seconds republishing_interval_;
mutable bool replication_enabled_;
mutable bool republishing_enabled_;
Comment on lines +168 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current code reads values from config, without duplicating to fields.
E.g. use config_.periodicReplication.enabled instead of replication_enabled_

Suggested change
// Mutable configuration for runtime changes
mutable std::chrono::seconds replication_interval_;
mutable std::chrono::seconds republishing_interval_;
mutable bool replication_enabled_;
mutable bool republishing_enabled_;


// Periodic operation callbacks
void onReplicationTimer();
void onRepublishingTimer();
void performReplication();
void performRepublishing();

// Helper methods for periodic operations
std::vector<PeerId> getClosestPeers(const Key& key, size_t count);
void replicateRecord(const Key& key, const Value& value, bool extend_expiration);

log::SubLogger log_;
};

Expand Down
4 changes: 4 additions & 0 deletions include/libp2p/protocol/kademlia/impl/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ namespace libp2p::protocol::kademlia {

/// @returns true if it has value corresponding to given @param key.
virtual bool hasValue(const Key &key) const = 0;

/// Get all stored records for periodic operations
/// @return vector of stored records
virtual std::vector<std::pair<Key, Value>> getAllRecords() const = 0;
};

} // namespace libp2p::protocol::kademlia
2 changes: 2 additions & 0 deletions include/libp2p/protocol/kademlia/impl/storage_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ namespace libp2p::protocol::kademlia {

bool hasValue(const Key &key) const override;

std::vector<std::pair<Key, Value>> getAllRecords() const override;

private:
void onRefreshTimer();
void setTimerRefresh();
Expand Down
16 changes: 16 additions & 0 deletions include/libp2p/protocol/kademlia/kademlia.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ namespace libp2p::protocol::kademlia {
virtual ~Kademlia() = default;

virtual void start() = 0;

/// Set replication interval
/// @param interval - replication interval
virtual void setReplicationInterval(std::chrono::seconds interval) = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying values in config seems sufficient.
Please remove setters from interface.


/// Set republishing interval
/// @param interval - republishing interval
virtual void setRepublishingInterval(std::chrono::seconds interval) = 0;

/// Enable/disable periodic replication
/// @param enabled - whether to enable replication
virtual void setReplicationEnabled(bool enabled) = 0;

/// Enable/disable periodic republishing
/// @param enabled - whether to enable republishing
virtual void setRepublishingEnabled(bool enabled) = 0;
};

} // namespace libp2p::protocol::kademlia
51 changes: 41 additions & 10 deletions include/libp2p/storage/sqlite.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,21 @@

#include <vector>

#ifdef SQLITE_ENABLED
#include <sqlite_modern_cpp.h>
#include <libp2p/log/logger.hpp>
#endif

namespace libp2p::storage {

#ifdef SQLITE_ENABLED
/// C++ handy interface for SQLite based on SQLiteModernCpp
///
/// This class provides a modern C++ wrapper around SQLite with support for:
/// - Prepared statements with handle-based management
/// - Comprehensive error handling and logging
/// - RAII-style resource management
/// - Thread-safe operations (when SQLite is compiled with threading support)
class SQLite {
public:
using StatementHandle = size_t;
Expand All @@ -36,10 +45,10 @@ namespace libp2p::storage {
}

/// Reads extended sqlite3 error code
int getErrorCode();
int getErrorCode() const;

/// Returns human-readable representation of an error
std::string getErrorMessage();
std::string getErrorMessage() const;

/**
* Store prepared statement
Expand All @@ -61,19 +70,27 @@ namespace libp2p::storage {
* @param st_handle - statement identifier
* @param args - command arguments
* @return number of rows affected, -1 in case of error
* @throws std::invalid_argument if statement handle is invalid
*/
template <typename... Args>
inline int execCommand(StatementHandle st_handle, const Args &...args) {
try {
auto &st = getStatement(st_handle);
bindArgs(st, args...);
st.execute();
return countChanges();
} catch (const std::runtime_error &e) {
const auto changes = countChanges();
log_->debug("Command executed successfully, {} rows affected", changes);
return changes;
} catch (const std::invalid_argument &e) {
// getStatement can receive invalid handle
log_->error(e.what());
log_->error("Invalid statement handle {}: {}", st_handle, e.what());
throw; // Re-throw invalid_argument as it's a programming error
} catch (const std::runtime_error &e) {
log_->error("Runtime error during command execution (handle {}): {} - SQLite error: {}",
st_handle, e.what(), getErrorMessage());
} catch (...) {
log_->error(getErrorMessage());
log_->error("Unknown error during command execution (handle {}): {}",
st_handle, getErrorMessage());
}
return -1;
}
Expand All @@ -86,6 +103,7 @@ namespace libp2p::storage {
* @param sink - query response consumer
* @param args - query arguments
* @return true when query was successfully executed, otherwise - false
* @throws std::invalid_argument if statement handle is invalid
*/
template <typename Sink, typename... Args>
inline bool execQuery(StatementHandle st_handle,
Expand All @@ -95,12 +113,18 @@ namespace libp2p::storage {
auto &st = getStatement(st_handle);
bindArgs(st, args...);
st >> sink;
log_->debug("Query executed successfully (handle {})", st_handle);
return true;
} catch (const std::runtime_error &e) {
} catch (const std::invalid_argument &e) {
// getStatement can receive invalid handle
log_->error(e.what());
log_->error("Invalid statement handle {}: {}", st_handle, e.what());
throw; // Re-throw invalid_argument as it's a programming error
} catch (const std::runtime_error &e) {
log_->error("Runtime error during query execution (handle {}): {} - SQLite error: {}",
st_handle, e.what(), getErrorMessage());
} catch (...) {
log_->error(getErrorMessage());
log_->error("Unknown error during query execution (handle {}): {}",
st_handle, getErrorMessage());
}
return false;
}
Expand All @@ -119,13 +143,20 @@ namespace libp2p::storage {
}

/// Returns the number of rows modified
int countChanges();
int countChanges() const;

/// Returns the database file path
const std::string &getDatabaseFile() const;

/// Returns the number of prepared statements
size_t getStatementCount() const;

::sqlite::database db_;
std::string db_file_;
log::Logger log_;

std::vector<database_binder> statements_;
};
#endif // SQLITE_ENABLED

} // namespace libp2p::storage
17 changes: 15 additions & 2 deletions src/basic/write_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,21 @@ namespace libp2p::basic {
void WriteQueue::enqueue(DataRef data, Writer::WriteCallbackFunc cb) {
auto data_sz = static_cast<size_t>(data.size());

assert(data_sz > 0);
assert(canEnqueue(data_sz));
if (data_sz == 0) {
// Call callback immediately for empty data
if (cb) {
cb(outcome::success());
}
return;
}

if (!canEnqueue(data_sz)) {
// Call callback with error for overflow
if (cb) {
cb(outcome::failure(std::make_error_code(std::errc::no_buffer_space)));
}
return;
}

total_unsent_size_ += data_sz;
queue_.push_back({data, 0, 0, data_sz, std::move(cb)});
Expand Down
50 changes: 46 additions & 4 deletions src/crypto/key_validator/key_validator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,57 @@ namespace libp2p::crypto::validator {

outcome::result<void> KeyValidatorImpl::validateEcdsa(
const PrivateKey &key) const {
// TODO(xDimon): Check if it possible to validate ECDSA key by some way.
// issue: https://github.com/libp2p/cpp-libp2p/issues/103
// Basic ECDSA private key validation
// ECDSA private keys are typically 32 bytes for P-256, 48 bytes for P-384, 66 bytes for P-521
if (key.data.empty()) {
return KeyValidatorError::INVALID_PRIVATE_KEY;
}

// Check for reasonable key sizes (32-66 bytes covers most common curves)
if (key.data.size() < 32 || key.data.size() > 66) {
return KeyValidatorError::WRONG_PRIVATE_KEY_SIZE;
}

// Check that the key is not all zeros (invalid private key)
bool all_zeros = true;
for (const auto& byte : key.data) {
if (byte != 0) {
all_zeros = false;
break;
}
}
if (all_zeros) {
return KeyValidatorError::INVALID_PRIVATE_KEY;
}

return outcome::success();
}

outcome::result<void> KeyValidatorImpl::validateEcdsa(
const PublicKey &key) const {
// TODO(xDimon): Check if it possible to validate ECDSA key by some way.
// issue: https://github.com/libp2p/cpp-libp2p/issues/103
// Basic ECDSA public key validation
if (key.data.empty()) {
return KeyValidatorError::INVALID_PUBLIC_KEY;
}

// ECDSA public keys are typically 64 bytes (uncompressed) or 33/49/67 bytes (compressed)
// for P-256/P-384/P-521 respectively
if (key.data.size() < 33 || key.data.size() > 133) {
return KeyValidatorError::WRONG_PUBLIC_KEY_SIZE;
}

// Check that the key is not all zeros (invalid public key)
bool all_zeros = true;
for (const auto& byte : key.data) {
if (byte != 0) {
all_zeros = false;
break;
}
}
if (all_zeros) {
return KeyValidatorError::INVALID_PUBLIC_KEY;
}

return outcome::success();
}

Expand Down
Loading
Loading