Skip to content

Commit a72e5a1

Browse files
authored
feat: Choose writer by cluster communication (#2830)
Fixes #1974
1 parent 0ebbaaa commit a72e5a1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3574
-532
lines changed

src/app/ClioApplication.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
#include "etl/ETLService.hpp"
3030
#include "etl/LoadBalancer.hpp"
3131
#include "etl/NetworkValidatedLedgers.hpp"
32+
#include "etl/SystemState.hpp"
33+
#include "etl/WriterState.hpp"
3234
#include "feed/SubscriptionManager.hpp"
3335
#include "migration/MigrationInspectorFactory.hpp"
3436
#include "rpc/Counters.hpp"
@@ -121,7 +123,11 @@ ClioApplication::run(bool const useNgWebServer)
121123
// Interface to the database
122124
auto backend = data::makeBackend(config_, cache);
123125

124-
cluster::ClusterCommunicationService clusterCommunicationService{backend};
126+
auto systemState = etl::SystemState::makeSystemState(config_);
127+
128+
cluster::ClusterCommunicationService clusterCommunicationService{
129+
backend, std::make_unique<etl::WriterState>(systemState)
130+
};
125131
clusterCommunicationService.run();
126132

127133
auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(backend);
@@ -151,7 +157,9 @@ ClioApplication::run(bool const useNgWebServer)
151157
);
152158

153159
// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
154-
auto etl = etl::ETLService::makeETLService(config_, ctx, backend, subscriptions, balancer, ledgers);
160+
auto etl = etl::ETLService::makeETLService(
161+
config_, std::move(systemState), ctx, backend, subscriptions, balancer, ledgers
162+
);
155163

156164
auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
157165
auto counters = rpc::Counters::makeCounters(workQueue);
@@ -197,7 +205,16 @@ ClioApplication::run(bool const useNgWebServer)
197205
}
198206

199207
appStopper_.setOnStop(
200-
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
208+
Stopper::makeOnStopCallback(
209+
httpServer.value(),
210+
*balancer,
211+
*etl,
212+
*subscriptions,
213+
*backend,
214+
cacheSaver,
215+
clusterCommunicationService,
216+
ioc
217+
)
201218
);
202219

203220
// Blocks until stopped.
@@ -213,7 +230,9 @@ ClioApplication::run(bool const useNgWebServer)
213230

214231
auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler, cache);
215232
appStopper_.setOnStop(
216-
Stopper::makeOnStopCallback(*httpServer, *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
233+
Stopper::makeOnStopCallback(
234+
*httpServer, *balancer, *etl, *subscriptions, *backend, cacheSaver, clusterCommunicationService, ioc
235+
)
217236
);
218237

219238
// Blocks until stopped.

src/app/Stopper.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#pragma once
2121

22+
#include "cluster/Concepts.hpp"
2223
#include "data/BackendInterface.hpp"
2324
#include "data/LedgerCacheSaver.hpp"
2425
#include "etl/ETLServiceInterface.hpp"
@@ -82,10 +83,14 @@ class Stopper {
8283
* @param subscriptions The subscription manager to stop.
8384
* @param backend The backend to stop.
8485
* @param cacheSaver The ledger cache saver
86+
* @param clusterCommunicationService The cluster communication service to stop.
8587
* @param ioc The io_context to stop.
8688
* @return The callback to be called on application stop.
8789
*/
88-
template <web::SomeServer ServerType, data::SomeLedgerCacheSaver LedgerCacheSaverType>
90+
template <
91+
web::SomeServer ServerType,
92+
data::SomeLedgerCacheSaver LedgerCacheSaverType,
93+
cluster::SomeClusterCommunicationService ClusterCommunicationServiceType>
8994
static std::function<void(boost::asio::yield_context)>
9095
makeOnStopCallback(
9196
ServerType& server,
@@ -94,6 +99,7 @@ class Stopper {
9499
feed::SubscriptionManagerInterface& subscriptions,
95100
data::BackendInterface& backend,
96101
LedgerCacheSaverType& cacheSaver,
102+
ClusterCommunicationServiceType& clusterCommunicationService,
97103
boost::asio::io_context& ioc
98104
)
99105
{
@@ -111,6 +117,8 @@ class Stopper {
111117
});
112118
coroutineGroup.asyncWait(yield);
113119

120+
clusterCommunicationService.stop();
121+
114122
etl.stop();
115123
LOG(util::LogService::info()) << "ETL stopped";
116124

src/cluster/Backend.cpp

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of clio: https://github.com/XRPLF/clio
4+
Copyright (c) 2025, the clio developers.
5+
6+
Permission to use, copy, modify, and distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#include "cluster/Backend.hpp"
21+
22+
#include "cluster/ClioNode.hpp"
23+
#include "data/BackendInterface.hpp"
24+
#include "etl/WriterState.hpp"
25+
26+
#include <boost/asio/bind_cancellation_slot.hpp>
27+
#include <boost/asio/cancellation_type.hpp>
28+
#include <boost/asio/error.hpp>
29+
#include <boost/asio/execution_context.hpp>
30+
#include <boost/asio/executor.hpp>
31+
#include <boost/asio/spawn.hpp>
32+
#include <boost/asio/steady_timer.hpp>
33+
#include <boost/asio/thread_pool.hpp>
34+
#include <boost/asio/use_future.hpp>
35+
#include <boost/json/parse.hpp>
36+
#include <boost/json/serialize.hpp>
37+
#include <boost/json/value.hpp>
38+
#include <boost/json/value_from.hpp>
39+
#include <boost/json/value_to.hpp>
40+
#include <boost/uuid/random_generator.hpp>
41+
#include <boost/uuid/uuid.hpp>
42+
#include <fmt/format.h>
43+
44+
#include <chrono>
45+
#include <memory>
46+
#include <utility>
47+
#include <vector>
48+
49+
namespace cluster {
50+
51+
Backend::Backend(
52+
boost::asio::thread_pool& ctx,
53+
std::shared_ptr<data::BackendInterface> backend,
54+
std::unique_ptr<etl::WriterStateInterface const> writerState,
55+
std::chrono::steady_clock::duration readInterval,
56+
std::chrono::steady_clock::duration writeInterval
57+
)
58+
: backend_(std::move(backend))
59+
, writerState_(std::move(writerState))
60+
, readerTask_(readInterval, ctx)
61+
, writerTask_(writeInterval, ctx)
62+
, selfUuid_(std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator{}()))
63+
{
64+
}
65+
66+
void
67+
Backend::run()
68+
{
69+
readerTask_.run([this](boost::asio::yield_context yield) {
70+
auto clusterData = doRead(yield);
71+
onNewState_(selfUuid_, std::make_shared<ClusterData>(std::move(clusterData)));
72+
});
73+
74+
writerTask_.run([this]() { doWrite(); });
75+
}
76+
77+
Backend::~Backend()
78+
{
79+
stop();
80+
}
81+
82+
void
83+
Backend::stop()
84+
{
85+
readerTask_.stop();
86+
writerTask_.stop();
87+
}
88+
89+
ClioNode::CUuid
90+
Backend::selfId() const
91+
{
92+
return selfUuid_;
93+
}
94+
95+
Backend::ClusterData
96+
Backend::doRead(boost::asio::yield_context yield)
97+
{
98+
BackendInterface::ClioNodesDataFetchResult expectedResult;
99+
try {
100+
expectedResult = backend_->fetchClioNodesData(yield);
101+
} catch (...) {
102+
expectedResult = std::unexpected{"Failed to fetch Clio nodes data"};
103+
}
104+
105+
if (!expectedResult.has_value()) {
106+
return std::unexpected{std::move(expectedResult).error()};
107+
}
108+
109+
std::vector<ClioNode> otherNodesData;
110+
for (auto const& [uuid, nodeDataStr] : expectedResult.value()) {
111+
if (uuid == *selfUuid_) {
112+
continue;
113+
}
114+
115+
boost::system::error_code errorCode;
116+
auto const json = boost::json::parse(nodeDataStr, errorCode);
117+
if (errorCode.failed()) {
118+
return std::unexpected{fmt::format("Error parsing json from DB: {}", nodeDataStr)};
119+
}
120+
121+
auto expectedNodeData = boost::json::try_value_to<ClioNode>(json);
122+
if (expectedNodeData.has_error()) {
123+
return std::unexpected{fmt::format("Error converting json to ClioNode: {}", nodeDataStr)};
124+
}
125+
*expectedNodeData->uuid = uuid;
126+
otherNodesData.push_back(std::move(expectedNodeData).value());
127+
}
128+
otherNodesData.push_back(ClioNode::from(selfUuid_, *writerState_));
129+
return otherNodesData;
130+
}
131+
132+
void
133+
Backend::doWrite()
134+
{
135+
auto const selfData = ClioNode::from(selfUuid_, *writerState_);
136+
boost::json::value jsonValue{};
137+
boost::json::value_from(selfData, jsonValue);
138+
backend_->writeNodeMessage(*selfData.uuid, boost::json::serialize(jsonValue.as_object()));
139+
}
140+
141+
} // namespace cluster

src/cluster/Backend.hpp

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
//------------------------------------------------------------------------------
2+
/*
3+
This file is part of clio: https://github.com/XRPLF/clio
4+
Copyright (c) 2025, the clio developers.
5+
6+
Permission to use, copy, modify, and distribute this software for any
7+
purpose with or without fee is hereby granted, provided that the above
8+
copyright notice and this permission notice appear in all copies.
9+
10+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
//==============================================================================
19+
20+
#pragma once
21+
22+
#include "cluster/ClioNode.hpp"
23+
#include "cluster/impl/RepeatedTask.hpp"
24+
#include "data/BackendInterface.hpp"
25+
#include "etl/WriterState.hpp"
26+
#include "util/log/Logger.hpp"
27+
28+
#include <boost/asio/any_io_executor.hpp>
29+
#include <boost/asio/cancellation_signal.hpp>
30+
#include <boost/asio/execution_context.hpp>
31+
#include <boost/asio/executor.hpp>
32+
#include <boost/asio/spawn.hpp>
33+
#include <boost/asio/strand.hpp>
34+
#include <boost/asio/thread_pool.hpp>
35+
#include <boost/signals2/connection.hpp>
36+
#include <boost/signals2/signal.hpp>
37+
#include <boost/signals2/variadic_signal.hpp>
38+
#include <boost/uuid/uuid.hpp>
39+
40+
#include <chrono>
41+
#include <concepts>
42+
#include <memory>
43+
#include <string>
44+
#include <vector>
45+
46+
namespace cluster {
47+
48+
/**
49+
* @brief Backend communication handler for cluster state synchronization.
50+
*
51+
* This class manages reading and writing cluster state information to/from the backend database.
52+
* It periodically reads the state of other nodes in the cluster and writes the current node's state,
53+
* enabling cluster-wide coordination and awareness.
54+
*/
55+
class Backend {
56+
public:
57+
/** @brief Type representing cluster data result - either a vector of nodes or an error message */
58+
using ClusterData = std::expected<std::vector<ClioNode>, std::string>;
59+
60+
private:
61+
util::Logger log_{"ClusterCommunication"};
62+
63+
std::shared_ptr<data::BackendInterface> backend_;
64+
std::unique_ptr<etl::WriterStateInterface const> writerState_;
65+
66+
impl::RepeatedTask<boost::asio::thread_pool> readerTask_;
67+
impl::RepeatedTask<boost::asio::thread_pool> writerTask_;
68+
69+
ClioNode::Uuid selfUuid_;
70+
71+
boost::signals2::signal<void(ClioNode::CUuid, std::shared_ptr<ClusterData const>)> onNewState_;
72+
73+
public:
74+
/**
75+
* @brief Construct a Backend communication handler.
76+
*
77+
* @param ctx The execution context for asynchronous operations
78+
* @param backend Interface to the backend database
79+
* @param writerState State indicating whether this node is writing to the database
80+
* @param readInterval How often to read cluster state from the backend
81+
* @param writeInterval How often to write this node's state to the backend
82+
*/
83+
Backend(
84+
boost::asio::thread_pool& ctx,
85+
std::shared_ptr<data::BackendInterface> backend,
86+
std::unique_ptr<etl::WriterStateInterface const> writerState,
87+
std::chrono::steady_clock::duration readInterval,
88+
std::chrono::steady_clock::duration writeInterval
89+
);
90+
91+
~Backend();
92+
93+
Backend(Backend&&) = delete;
94+
Backend&
95+
operator=(Backend&&) = delete;
96+
Backend(Backend const&) = delete;
97+
Backend&
98+
operator=(Backend const&) = delete;
99+
100+
/**
101+
* @brief Start the backend read and write tasks.
102+
*
103+
* Begins periodic reading of cluster state from the backend and writing of this node's state.
104+
*/
105+
void
106+
run();
107+
108+
/**
109+
* @brief Stop the backend read and write tasks.
110+
*
111+
* Stops all periodic tasks and waits for them to complete.
112+
*/
113+
void
114+
stop();
115+
116+
/**
117+
* @brief Subscribe to new cluster state notifications.
118+
*
119+
* @tparam S Callable type accepting (ClioNode::cUUID, ClusterData)
120+
* @param s Subscriber callback to be invoked when new cluster state is available
121+
* @return A connection object that can be used to unsubscribe
122+
*/
123+
template <typename S>
124+
requires std::invocable<S, ClioNode::CUuid, std::shared_ptr<ClusterData const>>
125+
boost::signals2::connection
126+
subscribeToNewState(S&& s)
127+
{
128+
return onNewState_.connect(s);
129+
}
130+
131+
/**
132+
* @brief Get the UUID of this node in the cluster.
133+
*
134+
* @return The UUID of this node.
135+
*/
136+
ClioNode::CUuid
137+
selfId() const;
138+
139+
private:
140+
ClusterData
141+
doRead(boost::asio::yield_context yield);
142+
143+
void
144+
doWrite();
145+
};
146+
147+
} // namespace cluster

0 commit comments

Comments
 (0)