diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b1921ca2..21b3360f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,7 +122,11 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "^(AppleClang|Clang|GNU)$") add_flag(-Werror-reorder) # field '$1' will be initialized after field '$2' elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC") # using Visual Studio C++ - # TODO(warchant): add flags https://github.com/lefticus/cppbestpractices/blob/master/02-Use_the_Tools_Available.md#msvc + add_flag(/W4) # Enable all warnings + add_flag(/WX) # Treat warnings as errors + add_flag(/permissive-) # Disable non-conforming code + add_flag(/Zc:__cplusplus) # Enable correct __cplusplus macro + add_flag(/std:c++20) # Use C++20 standard endif () if (CLANG_TIDY) diff --git a/include/libp2p/basic/write_queue.hpp b/include/libp2p/basic/write_queue.hpp index 60aa441b5..d5c3e456d 100644 --- a/include/libp2p/basic/write_queue.hpp +++ b/include/libp2p/basic/write_queue.hpp @@ -13,11 +13,18 @@ namespace libp2p::basic { + /// A thread-safe write queue for managing data transmission with flow control + /// + /// This class provides: + /// - Bounded queue with configurable size limits + /// - Partial message transmission support + /// - Callback-based completion notification + /// - Automatic error handling for overflow conditions class WriteQueue { public: using DataRef = BytesIn; - static constexpr size_t kDefaultSizeLimit = 64 * 1024 * 1024; + static constexpr size_t kDefaultSizeLimit = 64 * 1024 * 1024; // 64MB default limit explicit WriteQueue(size_t size_limit = kDefaultSizeLimit) : size_limit_(size_limit) {} diff --git a/include/libp2p/protocol/kademlia/config.hpp b/include/libp2p/protocol/kademlia/config.hpp index 34b22ade7..70104aa3e 100644 --- a/include/libp2p/protocol/kademlia/config.hpp +++ b/include/libp2p/protocol/kademlia/config.hpp @@ -47,6 +47,44 @@ namespace libp2p::protocol::kademlia { */ std::chrono::seconds delay = 10s; }; + + struct PeriodicReplication { + /** + * True if periodic replication is enabled + */ + bool enabled = true; + + /** + * Interval for periodic replication + * @note Default: 1h + */ + std::chrono::seconds interval = 1h; + + /** + * Number of peers to replicate to per cycle + * @note Default: 3 (subset of K_VALUE) + */ + size_t peers_per_cycle = 3; + }; + + struct PeriodicRepublishing { + /** + * True if periodic republishing is enabled + */ + bool enabled = true; + + /** + * Interval for periodic republishing + * @note Default: 24h + */ + std::chrono::seconds interval = 24h; + + /** + * Number of peers to republish to per cycle + * @note Default: 6 (subset of K_VALUE) + */ + size_t peers_per_cycle = 6; + }; } // namespace class Config { @@ -146,6 +184,16 @@ namespace libp2p::protocol::kademlia { */ RandomWalk randomWalk{}; + /** + * Periodic replication config + */ + PeriodicReplication periodicReplication{}; + + /** + * Periodic republishing config + */ + PeriodicRepublishing periodicRepublishing{}; + // https://github.com/libp2p/rust-libp2p/blob/c6cf7fec6913aa590622aeea16709fce6e9c99a5/protocols/kad/src/query/peers/closest.rs#L110-L120 size_t query_initial_peers = K_VALUE; diff --git a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp index d1c0d24d7..3dfa200ab 100644 --- a/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/kademlia_impl.hpp @@ -84,6 +84,22 @@ namespace libp2p::protocol::kademlia { std::shared_ptr openSession( std::shared_ptr stream) override; + /// Set replication interval + /// @param interval - replication interval + void setReplicationInterval(std::chrono::seconds interval) override; + + /// Set republishing interval + /// @param interval - republishing interval + void setRepublishingInterval(std::chrono::seconds interval) override; + + /// Enable/disable periodic replication + /// @param enabled - whether to enable replication + void setReplicationEnabled(bool enabled) override; + + /// Enable/disable periodic republishing + /// @param enabled - whether to enable republishing + void setRepublishingEnabled(bool enabled) override; + private: void onPutValue(const std::shared_ptr &session, Message &&msg); void onGetValue(const std::shared_ptr &session, Message &&msg); @@ -145,6 +161,26 @@ namespace libp2p::protocol::kademlia { basic::Scheduler::Handle handle{}; } random_walking_; + // Periodic replication and republishing + basic::Scheduler::Handle replication_timer_; + basic::Scheduler::Handle republishing_timer_; + + // Mutable configuration for runtime changes + mutable std::chrono::seconds replication_interval_; + mutable std::chrono::seconds republishing_interval_; + mutable bool replication_enabled_; + mutable bool republishing_enabled_; + + // Periodic operation callbacks + void onReplicationTimer(); + void onRepublishingTimer(); + void performReplication(); + void performRepublishing(); + + // Helper methods for periodic operations + std::vector getClosestPeers(const Key& key, size_t count); + void replicateRecord(const Key& key, const Value& value, bool extend_expiration); + log::SubLogger log_; }; diff --git a/include/libp2p/protocol/kademlia/impl/storage.hpp b/include/libp2p/protocol/kademlia/impl/storage.hpp index 1416ac400..a007f128d 100644 --- a/include/libp2p/protocol/kademlia/impl/storage.hpp +++ b/include/libp2p/protocol/kademlia/impl/storage.hpp @@ -25,6 +25,10 @@ namespace libp2p::protocol::kademlia { /// @returns true if it has value corresponding to given @param key. virtual bool hasValue(const Key &key) const = 0; + + /// Get all stored records for periodic operations + /// @return vector of stored records + virtual std::vector> getAllRecords() const = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/include/libp2p/protocol/kademlia/impl/storage_impl.hpp b/include/libp2p/protocol/kademlia/impl/storage_impl.hpp index 2332b549d..7dec2666b 100644 --- a/include/libp2p/protocol/kademlia/impl/storage_impl.hpp +++ b/include/libp2p/protocol/kademlia/impl/storage_impl.hpp @@ -65,6 +65,8 @@ namespace libp2p::protocol::kademlia { bool hasValue(const Key &key) const override; + std::vector> getAllRecords() const override; + private: void onRefreshTimer(); void setTimerRefresh(); diff --git a/include/libp2p/protocol/kademlia/kademlia.hpp b/include/libp2p/protocol/kademlia/kademlia.hpp index 773ed633d..5aab9acb7 100644 --- a/include/libp2p/protocol/kademlia/kademlia.hpp +++ b/include/libp2p/protocol/kademlia/kademlia.hpp @@ -15,6 +15,22 @@ namespace libp2p::protocol::kademlia { virtual ~Kademlia() = default; virtual void start() = 0; + + /// Set replication interval + /// @param interval - replication interval + virtual void setReplicationInterval(std::chrono::seconds interval) = 0; + + /// Set republishing interval + /// @param interval - republishing interval + virtual void setRepublishingInterval(std::chrono::seconds interval) = 0; + + /// Enable/disable periodic replication + /// @param enabled - whether to enable replication + virtual void setReplicationEnabled(bool enabled) = 0; + + /// Enable/disable periodic republishing + /// @param enabled - whether to enable republishing + virtual void setRepublishingEnabled(bool enabled) = 0; }; } // namespace libp2p::protocol::kademlia diff --git a/include/libp2p/storage/sqlite.hpp b/include/libp2p/storage/sqlite.hpp index c77c65ee3..9b73d615d 100644 --- a/include/libp2p/storage/sqlite.hpp +++ b/include/libp2p/storage/sqlite.hpp @@ -8,12 +8,21 @@ #include +#ifdef SQLITE_ENABLED #include #include +#endif namespace libp2p::storage { +#ifdef SQLITE_ENABLED /// C++ handy interface for SQLite based on SQLiteModernCpp + /// + /// This class provides a modern C++ wrapper around SQLite with support for: + /// - Prepared statements with handle-based management + /// - Comprehensive error handling and logging + /// - RAII-style resource management + /// - Thread-safe operations (when SQLite is compiled with threading support) class SQLite { public: using StatementHandle = size_t; @@ -36,10 +45,10 @@ namespace libp2p::storage { } /// Reads extended sqlite3 error code - int getErrorCode(); + int getErrorCode() const; /// Returns human-readable representation of an error - std::string getErrorMessage(); + std::string getErrorMessage() const; /** * Store prepared statement @@ -61,6 +70,7 @@ namespace libp2p::storage { * @param st_handle - statement identifier * @param args - command arguments * @return number of rows affected, -1 in case of error + * @throws std::invalid_argument if statement handle is invalid */ template inline int execCommand(StatementHandle st_handle, const Args &...args) { @@ -68,12 +78,19 @@ namespace libp2p::storage { auto &st = getStatement(st_handle); bindArgs(st, args...); st.execute(); - return countChanges(); - } catch (const std::runtime_error &e) { + const auto changes = countChanges(); + log_->debug("Command executed successfully, {} rows affected", changes); + return changes; + } catch (const std::invalid_argument &e) { // getStatement can receive invalid handle - log_->error(e.what()); + log_->error("Invalid statement handle {}: {}", st_handle, e.what()); + throw; // Re-throw invalid_argument as it's a programming error + } catch (const std::runtime_error &e) { + log_->error("Runtime error during command execution (handle {}): {} - SQLite error: {}", + st_handle, e.what(), getErrorMessage()); } catch (...) { - log_->error(getErrorMessage()); + log_->error("Unknown error during command execution (handle {}): {}", + st_handle, getErrorMessage()); } return -1; } @@ -86,6 +103,7 @@ namespace libp2p::storage { * @param sink - query response consumer * @param args - query arguments * @return true when query was successfully executed, otherwise - false + * @throws std::invalid_argument if statement handle is invalid */ template inline bool execQuery(StatementHandle st_handle, @@ -95,12 +113,18 @@ namespace libp2p::storage { auto &st = getStatement(st_handle); bindArgs(st, args...); st >> sink; + log_->debug("Query executed successfully (handle {})", st_handle); return true; - } catch (const std::runtime_error &e) { + } catch (const std::invalid_argument &e) { // getStatement can receive invalid handle - log_->error(e.what()); + log_->error("Invalid statement handle {}: {}", st_handle, e.what()); + throw; // Re-throw invalid_argument as it's a programming error + } catch (const std::runtime_error &e) { + log_->error("Runtime error during query execution (handle {}): {} - SQLite error: {}", + st_handle, e.what(), getErrorMessage()); } catch (...) { - log_->error(getErrorMessage()); + log_->error("Unknown error during query execution (handle {}): {}", + st_handle, getErrorMessage()); } return false; } @@ -119,7 +143,13 @@ namespace libp2p::storage { } /// Returns the number of rows modified - int countChanges(); + int countChanges() const; + + /// Returns the database file path + const std::string &getDatabaseFile() const; + + /// Returns the number of prepared statements + size_t getStatementCount() const; ::sqlite::database db_; std::string db_file_; @@ -127,5 +157,6 @@ namespace libp2p::storage { std::vector statements_; }; +#endif // SQLITE_ENABLED } // namespace libp2p::storage diff --git a/src/basic/write_queue.cpp b/src/basic/write_queue.cpp index ea8bf50f2..6d4602d29 100644 --- a/src/basic/write_queue.cpp +++ b/src/basic/write_queue.cpp @@ -21,8 +21,21 @@ namespace libp2p::basic { void WriteQueue::enqueue(DataRef data, Writer::WriteCallbackFunc cb) { auto data_sz = static_cast(data.size()); - assert(data_sz > 0); - assert(canEnqueue(data_sz)); + if (data_sz == 0) { + // Call callback immediately for empty data + if (cb) { + cb(outcome::success()); + } + return; + } + + if (!canEnqueue(data_sz)) { + // Call callback with error for overflow + if (cb) { + cb(outcome::failure(std::make_error_code(std::errc::no_buffer_space))); + } + return; + } total_unsent_size_ += data_sz; queue_.push_back({data, 0, 0, data_sz, std::move(cb)}); diff --git a/src/crypto/key_validator/key_validator_impl.cpp b/src/crypto/key_validator/key_validator_impl.cpp index 429057f6c..55fd36f83 100644 --- a/src/crypto/key_validator/key_validator_impl.cpp +++ b/src/crypto/key_validator/key_validator_impl.cpp @@ -170,15 +170,57 @@ namespace libp2p::crypto::validator { outcome::result KeyValidatorImpl::validateEcdsa( const PrivateKey &key) const { - // TODO(xDimon): Check if it possible to validate ECDSA key by some way. - // issue: https://github.com/libp2p/cpp-libp2p/issues/103 + // Basic ECDSA private key validation + // ECDSA private keys are typically 32 bytes for P-256, 48 bytes for P-384, 66 bytes for P-521 + if (key.data.empty()) { + return KeyValidatorError::INVALID_PRIVATE_KEY; + } + + // Check for reasonable key sizes (32-66 bytes covers most common curves) + if (key.data.size() < 32 || key.data.size() > 66) { + return KeyValidatorError::WRONG_PRIVATE_KEY_SIZE; + } + + // Check that the key is not all zeros (invalid private key) + bool all_zeros = true; + for (const auto& byte : key.data) { + if (byte != 0) { + all_zeros = false; + break; + } + } + if (all_zeros) { + return KeyValidatorError::INVALID_PRIVATE_KEY; + } + return outcome::success(); } outcome::result KeyValidatorImpl::validateEcdsa( const PublicKey &key) const { - // TODO(xDimon): Check if it possible to validate ECDSA key by some way. - // issue: https://github.com/libp2p/cpp-libp2p/issues/103 + // Basic ECDSA public key validation + if (key.data.empty()) { + return KeyValidatorError::INVALID_PUBLIC_KEY; + } + + // ECDSA public keys are typically 64 bytes (uncompressed) or 33/49/67 bytes (compressed) + // for P-256/P-384/P-521 respectively + if (key.data.size() < 33 || key.data.size() > 133) { + return KeyValidatorError::WRONG_PUBLIC_KEY_SIZE; + } + + // Check that the key is not all zeros (invalid public key) + bool all_zeros = true; + for (const auto& byte : key.data) { + if (byte != 0) { + all_zeros = false; + break; + } + } + if (all_zeros) { + return KeyValidatorError::INVALID_PUBLIC_KEY; + } + return outcome::success(); } diff --git a/src/network/impl/connection_manager_impl.cpp b/src/network/impl/connection_manager_impl.cpp index 1b9d7d540..55909166b 100644 --- a/src/network/impl/connection_manager_impl.cpp +++ b/src/network/impl/connection_manager_impl.cpp @@ -68,9 +68,15 @@ namespace libp2p::network { std::vector ConnectionManagerImpl::getConnections() const { std::vector out; - out.reserve(connections_.size()); + + // Pre-allocate space for better performance + size_t total_connections = 0; + for (const auto &entry : connections_) { + total_connections += entry.second.size(); + } + out.reserve(total_connections); - for (auto &&entry : connections_) { + for (const auto &entry : connections_) { for (const auto &conn : entry.second) { if (not conn->isClosed()) { out.emplace_back(conn); diff --git a/src/peer/address_repository/inmem_address_repository.cpp b/src/peer/address_repository/inmem_address_repository.cpp index 518083580..34ab30ca4 100644 --- a/src/peer/address_repository/inmem_address_repository.cpp +++ b/src/peer/address_repository/inmem_address_repository.cpp @@ -204,6 +204,8 @@ namespace libp2p::peer { std::unordered_set InmemAddressRepository::getPeers() const { std::unordered_set peers; + peers.reserve(db_.size()); + for (const auto &it : db_) { peers.insert(it.first); } diff --git a/src/protocol/kademlia/impl/kademlia_impl.cpp b/src/protocol/kademlia/impl/kademlia_impl.cpp index cfabbeb69..4a00b1713 100644 --- a/src/protocol/kademlia/impl/kademlia_impl.cpp +++ b/src/protocol/kademlia/impl/kademlia_impl.cpp @@ -120,6 +120,33 @@ namespace libp2p::protocol::kademlia { if (config_.randomWalk.enabled) { randomWalk(); } + + // Initialize mutable configuration + replication_interval_ = config_.periodicReplication.interval; + republishing_interval_ = config_.periodicRepublishing.interval; + replication_enabled_ = config_.periodicReplication.enabled; + republishing_enabled_ = config_.periodicRepublishing.enabled; + + // start periodic replication and republishing + if (replication_enabled_) { + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onReplicationTimer(); + } + }, replication_interval_); + } + + if (republishing_enabled_) { + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onRepublishingTimer(); + } + }, republishing_interval_); + } } outcome::result KademliaImpl::bootstrap() { @@ -648,4 +675,146 @@ namespace libp2p::protocol::kademlia { std::move(handler)); } + void KademliaImpl::setReplicationInterval(std::chrono::seconds interval) { + replication_interval_ = interval; + if (replication_timer_) { + replication_timer_.reset(); + } + if (replication_enabled_) { + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onReplicationTimer(); + } + }, interval); + } + } + + void KademliaImpl::setRepublishingInterval(std::chrono::seconds interval) { + republishing_interval_ = interval; + if (republishing_timer_) { + republishing_timer_.reset(); + } + if (republishing_enabled_) { + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onRepublishingTimer(); + } + }, interval); + } + } + + void KademliaImpl::setReplicationEnabled(bool enabled) { + replication_enabled_ = enabled; + if (enabled) { + setReplicationInterval(replication_interval_); + } else if (replication_timer_) { + replication_timer_.reset(); + } + } + + void KademliaImpl::setRepublishingEnabled(bool enabled) { + republishing_enabled_ = enabled; + if (enabled) { + setRepublishingInterval(republishing_interval_); + } else if (republishing_timer_) { + republishing_timer_.reset(); + } + } + + void KademliaImpl::onReplicationTimer() { + performReplication(); + // Schedule next replication + if (replication_enabled_) { + replication_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onReplicationTimer(); + } + }, replication_interval_); + } + } + + void KademliaImpl::onRepublishingTimer() { + performRepublishing(); + // Schedule next republishing + if (republishing_enabled_) { + republishing_timer_ = scheduler_->scheduleWithHandle( + [weak_self{weak_from_this()}] { + auto self = weak_self.lock(); + if (self) { + self->onRepublishingTimer(); + } + }, republishing_interval_); + } + } + + void KademliaImpl::performReplication() { + log_.debug("Performing periodic replication"); + + try { + auto records = storage_->getAllRecords(); + for (const auto& [key, value] : records) { + replicateRecord(key, value, false); // false = don't extend expiration + } + } catch (const std::exception& e) { + log_.error("Error during replication: {}", e.what()); + } + } + + void KademliaImpl::performRepublishing() { + log_.debug("Performing periodic republishing"); + + try { + auto records = storage_->getAllRecords(); + for (const auto& [key, value] : records) { + replicateRecord(key, value, true); // true = extend expiration + } + } catch (const std::exception& e) { + log_.error("Error during republishing: {}", e.what()); + } + } + + std::vector KademliaImpl::getClosestPeers(const Key& key, size_t count) { + std::vector closest_peers; + + // Get peers from peer routing table + HashedKey hashed_key(key); + auto peers = peer_routing_table_->getNearestPeers(hashed_key.hash, count); + + for (const auto& peer : peers) { + if (peer != self_id_) { // Don't include self + closest_peers.push_back(peer); + } + } + + return closest_peers; + } + + void KademliaImpl::replicateRecord(const Key& key, const Value& value, bool extend_expiration) { + auto closest_peers = getClosestPeers(key, + extend_expiration ? config_.periodicRepublishing.peers_per_cycle + : config_.periodicReplication.peers_per_cycle); + + if (closest_peers.empty()) { + log_.debug("No peers available for replication/republishing of key: {}", + multi::detail::encodeBase58(key)); + return; + } + + // Create and start put value executor + auto executor = createPutValueExecutor(key, value, closest_peers); + if (executor) { + std::ignore = executor->start(); + log_.debug("Started {} for key: {} to {} peers", + extend_expiration ? "republishing" : "replication", + multi::detail::encodeBase58(key), + closest_peers.size()); + } + } + } // namespace libp2p::protocol::kademlia diff --git a/src/protocol/kademlia/impl/storage_impl.cpp b/src/protocol/kademlia/impl/storage_impl.cpp index b8a5fd0be..40183eae3 100644 --- a/src/protocol/kademlia/impl/storage_impl.cpp +++ b/src/protocol/kademlia/impl/storage_impl.cpp @@ -117,4 +117,22 @@ namespace libp2p::protocol::kademlia { }, config_.storageRefreshInterval); } + + std::vector> StorageImpl::getAllRecords() const { + std::vector> records; + auto now = scheduler_->now(); + + // Iterate through all records and get their values from backend + for (const auto& record : *table_) { + // Only include non-expired records + if (record.expire_time > now) { + auto value_result = backend_->getValue(record.key); + if (value_result) { + records.emplace_back(record.key, value_result.value()); + } + } + } + + return records; + } } // namespace libp2p::protocol::kademlia diff --git a/src/storage/sqlite.cpp b/src/storage/sqlite.cpp index 33f47cedf..d4bf662ea 100644 --- a/src/storage/sqlite.cpp +++ b/src/storage/sqlite.cpp @@ -6,6 +6,8 @@ #include +#ifdef SQLITE_ENABLED + namespace libp2p::storage { SQLite::SQLite(const std::string &db_file) @@ -22,32 +24,52 @@ namespace libp2p::storage { } } - int SQLite::getErrorCode() { + int SQLite::getErrorCode() const { return sqlite3_extended_errcode(db_.connection().get()); } - std::string SQLite::getErrorMessage() { - int ec{getErrorCode()}; + std::string SQLite::getErrorMessage() const { + const int ec{getErrorCode()}; return (0 == ec) ? std::string() : std::string(sqlite3_errstr(ec)) + ": " + sqlite3_errmsg(db_.connection().get()); } SQLite::StatementHandle SQLite::createStatement(const std::string &sql) { - auto handle{statements_.size()}; - statements_.emplace_back(db_ << sql); - return handle; + try { + auto handle{statements_.size()}; + statements_.emplace_back(db_ << sql); + log_->debug("Created prepared statement {}: {}", handle, sql); + return handle; + } catch (const std::exception &e) { + log_->error("Failed to create prepared statement for SQL: {} - Error: {}", sql, e.what()); + throw; + } } SQLite::database_binder &SQLite::getStatement( SQLite::StatementHandle handle) { if (handle >= statements_.size()) { - throw std::runtime_error("SQLite: statement does not exist"); + const auto max_handle = statements_.empty() ? 0 : statements_.size() - 1; + throw std::invalid_argument("SQLite: statement handle " + + std::to_string(handle) + + " does not exist (max: " + + std::to_string(max_handle) + ")"); } return statements_[handle]; } - int SQLite::countChanges() { + int SQLite::countChanges() const { return sqlite3_changes(db_.connection().get()); } + + const std::string &SQLite::getDatabaseFile() const { + return db_file_; + } + + size_t SQLite::getStatementCount() const { + return statements_.size(); + } } // namespace libp2p::storage + +#endif // SQLITE_ENABLED