|
66 | 66 | #include <algorithm> |
67 | 67 | #include <atomic> |
68 | 68 | #include <chrono> |
| 69 | +#include <limits> |
69 | 70 | #define FMT_HEADER_ONLY |
70 | 71 | #include <fmt/format.h> |
71 | 72 | #include <nlohmann/json.hpp> |
@@ -202,6 +203,153 @@ namespace ccf |
202 | 203 | } |
203 | 204 | }; |
204 | 205 |
|
| 206 | + struct BackupSnapshotFetch : public ccf::tasks::BaseTask |
| 207 | + { |
| 208 | + const ccf::CCFConfig::Snapshots snapshot_config; |
| 209 | + ccf::kv::Version since_seqno; |
| 210 | + NodeState* owner; |
| 211 | + |
| 212 | + BackupSnapshotFetch( |
| 213 | + ccf::CCFConfig::Snapshots snapshot_config_, |
| 214 | + ccf::kv::Version since_seqno_, |
| 215 | + NodeState* owner_) : |
| 216 | + snapshot_config(std::move(snapshot_config_)), |
| 217 | + since_seqno(since_seqno_), |
| 218 | + owner(owner_) |
| 219 | + {} |
| 220 | + |
| 221 | + void do_task_implementation() override |
| 222 | + { |
| 223 | + struct ClearOnExit |
| 224 | + { |
| 225 | + NodeState* owner; |
| 226 | + ~ClearOnExit() |
| 227 | + { |
| 228 | + std::lock_guard<pal::Mutex> guard(owner->lock); |
| 229 | + owner->backup_snapshot_fetch_task = nullptr; |
| 230 | + } |
| 231 | + } clear_on_exit{owner}; |
| 232 | + |
| 233 | + // Resolve the primary's RPC address |
| 234 | + std::string primary_address; |
| 235 | + std::vector<uint8_t> service_cert; |
| 236 | + { |
| 237 | + auto primary_id = owner->consensus->primary(); |
| 238 | + if (!primary_id.has_value()) |
| 239 | + { |
| 240 | + LOG_INFO_FMT( |
| 241 | + "BackupSnapshotFetch: No known primary, skipping fetch"); |
| 242 | + return; |
| 243 | + } |
| 244 | + |
| 245 | + auto tx = owner->network.tables->create_read_only_tx(); |
| 246 | + auto* nodes = tx.ro<ccf::Nodes>(Tables::NODES); |
| 247 | + auto node_info = nodes->get(primary_id.value()); |
| 248 | + if (!node_info.has_value()) |
| 249 | + { |
| 250 | + LOG_INFO_FMT( |
| 251 | + "BackupSnapshotFetch: Could not find primary node {} in nodes " |
| 252 | + "table", |
| 253 | + primary_id.value()); |
| 254 | + return; |
| 255 | + } |
| 256 | + |
| 257 | + // Use the configured RPC interface to find the primary's address |
| 258 | + const auto& target_interface = |
| 259 | + snapshot_config.backup_fetch.target_rpc_interface; |
| 260 | + auto iface_it = node_info->rpc_interfaces.find(target_interface); |
| 261 | + if (iface_it == node_info->rpc_interfaces.end()) |
| 262 | + { |
| 263 | + LOG_INFO_FMT( |
| 264 | + "BackupSnapshotFetch: Primary node {} does not have RPC " |
| 265 | + "interface '{}' configured", |
| 266 | + primary_id.value(), |
| 267 | + target_interface); |
| 268 | + return; |
| 269 | + } |
| 270 | + primary_address = iface_it->second.published_address; |
| 271 | + |
| 272 | + if (owner->network.identity == nullptr) |
| 273 | + { |
| 274 | + LOG_INFO_FMT( |
| 275 | + "BackupSnapshotFetch: No service identity available, cannot " |
| 276 | + "construct TLS credentials for fetching snapshot"); |
| 277 | + return; |
| 278 | + } |
| 279 | + |
| 280 | + service_cert = owner->network.identity->cert.raw(); |
| 281 | + } |
| 282 | + |
| 283 | + LOG_INFO_FMT( |
| 284 | + "BackupSnapshotFetch: Attempting to fetch snapshot from primary at " |
| 285 | + "{}", |
| 286 | + primary_address); |
| 287 | + |
| 288 | + const auto& bf = snapshot_config.backup_fetch; |
| 289 | + |
| 290 | + auto latest_peer_snapshot = snapshots::fetch_from_peer( |
| 291 | + primary_address, |
| 292 | + service_cert, |
| 293 | + bf.max_attempts, |
| 294 | + bf.retry_interval.count_ms(), |
| 295 | + bf.max_size.count_bytes(), |
| 296 | + since_seqno); |
| 297 | + |
| 298 | + if (latest_peer_snapshot.has_value()) |
| 299 | + { |
| 300 | + LOG_INFO_FMT( |
| 301 | + "BackupSnapshotFetch: Received snapshot {} from primary (size: " |
| 302 | + "{})", |
| 303 | + latest_peer_snapshot->snapshot_name, |
| 304 | + latest_peer_snapshot->snapshot_data.size()); |
| 305 | + |
| 306 | + const auto snapshot_path = |
| 307 | + std::filesystem::path(latest_peer_snapshot->snapshot_name); |
| 308 | + |
| 309 | + if ( |
| 310 | + snapshot_path.empty() || snapshot_path.is_absolute() || |
| 311 | + snapshot_path.has_parent_path() || |
| 312 | + snapshot_path.filename() != snapshot_path) |
| 313 | + { |
| 314 | + LOG_FAIL_FMT( |
| 315 | + "BackupSnapshotFetch: Rejecting snapshot with invalid name " |
| 316 | + "'{}' from primary", |
| 317 | + latest_peer_snapshot->snapshot_name); |
| 318 | + return; |
| 319 | + } |
| 320 | + |
| 321 | + const auto dst_path = |
| 322 | + std::filesystem::path(snapshot_config.directory) / snapshot_path; |
| 323 | + |
| 324 | + if (files::exists(dst_path)) |
| 325 | + { |
| 326 | + LOG_INFO_FMT( |
| 327 | + "BackupSnapshotFetch: Snapshot {} already exists locally, " |
| 328 | + "skipping write", |
| 329 | + dst_path.string()); |
| 330 | + return; |
| 331 | + } |
| 332 | + |
| 333 | + files::dump(latest_peer_snapshot->snapshot_data, dst_path); |
| 334 | + LOG_INFO_FMT( |
| 335 | + "BackupSnapshotFetch: Wrote snapshot {} ({} bytes)", |
| 336 | + dst_path.string(), |
| 337 | + latest_peer_snapshot->snapshot_data.size()); |
| 338 | + } |
| 339 | + else |
| 340 | + { |
| 341 | + LOG_INFO_FMT( |
| 342 | + "BackupSnapshotFetch: No snapshot available from primary"); |
| 343 | + } |
| 344 | + } |
| 345 | + |
| 346 | + [[nodiscard]] const std::string& get_name() const override |
| 347 | + { |
| 348 | + static const std::string name = "BackupSnapshotFetch"; |
| 349 | + return name; |
| 350 | + } |
| 351 | + }; |
| 352 | + |
205 | 353 | private: |
206 | 354 | // |
207 | 355 | // this node's core state |
@@ -280,6 +428,7 @@ namespace ccf |
280 | 428 |
|
281 | 429 | ccf::tasks::Task join_periodic_task; |
282 | 430 | ccf::tasks::Task snapshot_fetch_task; |
| 431 | + ccf::tasks::Task backup_snapshot_fetch_task; |
283 | 432 |
|
284 | 433 | std::shared_ptr<ccf::kv::AbstractTxEncryptor> make_encryptor() |
285 | 434 | { |
@@ -2924,6 +3073,49 @@ namespace ccf |
2924 | 3073 | return {nullptr}; |
2925 | 3074 | })); |
2926 | 3075 |
|
| 3076 | + network.tables->set_global_hook( |
| 3077 | + network.snapshot_evidence.get_name(), |
| 3078 | + SnapshotEvidence::wrap_commit_hook( |
| 3079 | + [this]( |
| 3080 | + [[maybe_unused]] ccf::kv::Version version, |
| 3081 | + const SnapshotEvidence::Write& w) { |
| 3082 | + if (!w.has_value()) |
| 3083 | + { |
| 3084 | + return; |
| 3085 | + } |
| 3086 | + |
| 3087 | + auto snapshot_evidence = w.value(); |
| 3088 | + |
| 3089 | + // If backup snapshot fetching is enabled and this node is a |
| 3090 | + // backup, schedule a fetch task |
| 3091 | + if ( |
| 3092 | + config.snapshots.backup_fetch.enabled && consensus != nullptr && |
| 3093 | + !consensus->is_primary()) |
| 3094 | + { |
| 3095 | + std::lock_guard<pal::Mutex> guard(lock); |
| 3096 | + if ( |
| 3097 | + backup_snapshot_fetch_task != nullptr && |
| 3098 | + !backup_snapshot_fetch_task->is_cancelled()) |
| 3099 | + { |
| 3100 | + LOG_DEBUG_FMT( |
| 3101 | + "Backup snapshot fetch already in progress, skipping"); |
| 3102 | + } |
| 3103 | + else |
| 3104 | + { |
| 3105 | + LOG_INFO_FMT( |
| 3106 | + "Snapshot evidence detected on backup - scheduling " |
| 3107 | + "snapshot fetch from primary (since seqno: {})", |
| 3108 | + snapshot_evidence.version); |
| 3109 | + backup_snapshot_fetch_task = |
| 3110 | + std::make_shared<BackupSnapshotFetch>( |
| 3111 | + config.snapshots, |
| 3112 | + snapshot_evidence.version - 1 /* YIKES */, |
| 3113 | + this); |
| 3114 | + ccf::tasks::add_task(backup_snapshot_fetch_task); |
| 3115 | + } |
| 3116 | + } |
| 3117 | + })); |
| 3118 | + |
2927 | 3119 | setup_basic_hooks(); |
2928 | 3120 | } |
2929 | 3121 |
|
|
0 commit comments