Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 160 additions & 2 deletions src/app_util/AppInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,170 @@ See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/

#include <absl/strings/str_split.h>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <spdlog/spdlog.h>

#include "AppInfo.h"

namespace gringofts::app {

void AppInfo::init(const INIReader &reader) {
bool AppClusterParser::checkHasRoute(const std::string &routeStr, uint64_t clusterId, uint64_t epoch) {
using boost::property_tree::ptree;
using boost::property_tree::read_json;
using boost::property_tree::write_json;
using boost::property_tree::json_parser_error;
std::stringstream ss(routeStr);
ptree globalRoute;
try {
read_json(ss, globalRoute);
auto routeEpoch = std::stoi(globalRoute.get_child("epoch").data());
if (routeEpoch < epoch) {
SPDLOG_WARN("global epoch {} is less than local epoch {}", routeEpoch, epoch);
}
auto infos = globalRoute.get_child("routeInfos");
for (auto &[k, v] : infos) {
auto clusterNode = v.get_child("clusterId");
auto id = std::stoi(clusterNode.data());
if (clusterId == id) {
std::stringstream sout;
write_json(sout, v);
SPDLOG_INFO("find route for cluster {} : {}", clusterId, sout.str());
return true;
}
}
return false;
} catch (const json_parser_error &err) {
SPDLOG_ERROR("error when parse json {} for {}", routeStr, err.message());
return false;
} catch (const std::exception &err) {
SPDLOG_ERROR("error when parse json {} for {}", routeStr, err.what());
return false;
}
}

std::tuple<NodeId, ClusterId, ClusterParser::ClusterMap> AppClusterParser::parse(const INIReader &iniReader) {
std::string storeType = iniReader.Get("cluster", "persistence.type", "UNKNOWN");
assert(storeType == "raft");
bool externalEnabled = iniReader.GetBoolean("raft.external", "enable", false);
if (!externalEnabled) {
/// load from local config, the cluster id and node id must be specified
auto clusterConf = iniReader.Get("cluster", "cluster.conf", "");
auto allClusterInfo = parseToClusterInfo(clusterConf);
auto myClusterId = iniReader.GetInteger("cluster", "self.clusterId", 0);
auto myNodeId = iniReader.GetInteger("cluster", "self.nodeId", 0);
bool hasMe = false;
for (auto &[clusterId, info] : allClusterInfo) {
if (myClusterId == clusterId) {
for (auto &[nodeId, node] : info.getAllNodes()) {
if (nodeId == myNodeId) {
hasMe = true;
break;
}
}
}
}
assert(hasMe);

Signal::hub.handle<RouteSignal>([](const Signal &s) {
const auto &signal = dynamic_cast<const RouteSignal &>(s);
SPDLOG_WARN("for non-external controlled cluster direct start raft");
signal.passValue(true);
});

SPDLOG_INFO("read raft cluster conf from local, "
"cluster.conf={}, self.clusterId={}, self.nodeId={}",
clusterConf, myClusterId, myNodeId);
return {myClusterId, myNodeId, allClusterInfo};
} else {
// if enable external kv store for cluster info, it must have kv factory
assert(mKvFactory);
std::string externalConfigFile = iniReader.Get("raft.external", "config.file", "");
std::string clusterConfKey = iniReader.Get("raft.external", "cluster.conf.key", "");
std::string clusterRouteKey = iniReader.Get("raft.external", "cluster.route.key", "");
assert(!externalConfigFile.empty());
assert(!clusterConfKey.empty());

/// init external client
auto client = mKvFactory->produce(INIReader(externalConfigFile));
/// read raft cluster conf from external
std::string clusterConf;
auto r = client->getValue(clusterConfKey, &clusterConf);
assert(!clusterConfKey.empty());
auto allClusterInfo = parseToClusterInfo(clusterConf);
/// N.B.: when using external, the assumption is two nodes will never run on the same host,
/// otherwise below logic will break.
auto myHostname = Util::getHostname();
std::optional<ClusterId> myClusterId = std::nullopt;
std::optional<NodeId> myNodeId = std::nullopt;
for (auto &[clusterId, info] : allClusterInfo) {
for (auto &[nodeId, node] : info.getAllNodes()) {
if (myHostname == node->hostName()) {
myClusterId = clusterId;
myNodeId = nodeId;
break;
}
}
}
assert(myClusterId);
assert(myNodeId);

SPDLOG_INFO("raft cluster conf passed from external, "
"cluster.conf={}, hostname={}", clusterConf, myHostname);
auto clusterId = *myClusterId;

Signal::hub.handle<RouteSignal>([client, clusterRouteKey, clusterId](const Signal &s) {
const auto &signal = dynamic_cast<const RouteSignal &>(s);
std::string val;
SPDLOG_INFO("receive signal for query route, cluster {}, epoch {}", clusterId, signal.mEpoch);
assert(clusterId == signal.mClusterId);
assert(!clusterRouteKey.empty());
client->getValue(clusterRouteKey, &val);
signal.passValue(checkHasRoute(val, clusterId, signal.mEpoch));
});

return {*myClusterId, *myNodeId, allClusterInfo};
}
}

std::unordered_map<ClusterId, Cluster> AppClusterParser::parseToClusterInfo(const std::string &infoStr) const {
std::unordered_map<ClusterId, Cluster> result;
std::vector<std::string> clusters = absl::StrSplit(infoStr, ";");
for (auto &c : clusters) {
Cluster info;
std::pair<std::string, std::string> clusterIdWithNodes = absl::StrSplit(c, "#");
info.setClusterId(std::stoi(clusterIdWithNodes.first));
std::vector<std::string> nodes = absl::StrSplit(clusterIdWithNodes.second, ",");
for (auto &n : nodes) {
std::pair<std::string, std::string> hostWithPort = absl::StrSplit(n, ":");
std::pair<std::string, std::string> idWithHost = absl::StrSplit(hostWithPort.first, "@");
auto nodeId = std::stoi(idWithHost.first);
auto hostname = idWithHost.second;
std::shared_ptr<Node> node;
if (hostWithPort.second.empty()) {
SPDLOG_INFO("{} no specific port, using default one", hostWithPort.second);
node = std::make_shared<AppNode>(nodeId, hostname);
} else {
std::vector<std::string> ports = absl::StrSplit(hostWithPort.second, "|");
assert(ports.size() == 6);
auto portForRaft = std::stoi(ports[0]);
auto portForGateway = std::stoi(ports[1]);
auto portForDumper = std::stoi(ports[2]);
auto portForStream = std::stoi(ports[3]);
auto portForNetAdmin = std::stoi(ports[4]);
auto portForScale = std::stoi(ports[5]);
node = std::make_shared<AppNode>(nodeId, hostname, portForRaft, portForStream,
portForGateway, portForDumper, portForNetAdmin, portForScale);
}
info.addNode(node);
}
result[info.id()] = info;
}
return result;
}

void AppInfo::init(const INIReader &reader, std::unique_ptr<ClusterParser> parser) {
auto &appInfo = getInstance();

auto &initialized = appInfo.initialized;
Expand All @@ -28,7 +185,7 @@ void AppInfo::init(const INIReader &reader) {
return;
}

auto[myClusterId, myNodeId, allClusterInfo] = ClusterInfo::resolveAllClusters(reader, nullptr);
auto[myClusterId, myNodeId, allClusterInfo] = parser->parse(reader);
appInfo.mMyClusterId = myClusterId;
appInfo.mMyNodeId = myNodeId;
appInfo.mAllClusterInfo = allClusterInfo;
Expand All @@ -54,4 +211,5 @@ void AppInfo::init(const INIReader &reader) {
appInfo.mMyClusterId,
appInfo.mMyNodeId);
}

} /// namespace gringofts::app
72 changes: 59 additions & 13 deletions src/app_util/AppInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,59 @@ limitations under the License.

#include <INIReader.h>


#include "../infra/common_types.h"
#include "../infra/util/ClusterInfo.h"
#include "../infra/util/Cluster.h"
#include "../infra/util/Signal.h"
#include "../infra/util/KVClient.h"

namespace gringofts {
namespace app {

struct RouteSignal : public FutureSignal<bool> {
RouteSignal(uint64_t epoch, uint64_t clusterId) : mEpoch(epoch), mClusterId(clusterId) {}
uint64_t mEpoch;
uint64_t mClusterId;
};

class AppNode : public RaftNode {
static constexpr Port kDefaultGatewayPort = 50055;
static constexpr Port kDefaultFetchPort = 50056;
static constexpr Port kDefaultNetAdminPort = 50065;
static constexpr Port kDefaultScalePort = 61203;
public:
AppNode(NodeId id, const HostName &hostName) : RaftNode(id, hostName) {}
AppNode(NodeId id, const HostName &hostName,
Port raftPort, Port streamPort,
Port gateWayPort, Port dumperPort, Port netAdminPort, Port scalePort)
: RaftNode(id, hostName, raftPort, streamPort),
mPortForGateway(gateWayPort), mPortForFetch(dumperPort),
mPortForNetAdmin(netAdminPort), mPortForScale(scalePort) {
}
inline Port gateWayPort() const { return mPortForGateway; }
inline Port fetchPort() const { return mPortForFetch; }
inline Port netAdminPort() const { return mPortForNetAdmin; }
inline Port scalePort() const { return mPortForScale; }
private:
Port mPortForGateway = kDefaultGatewayPort;
Port mPortForFetch = kDefaultFetchPort;
Port mPortForNetAdmin = kDefaultNetAdminPort;
Port mPortForScale = kDefaultScalePort;
};

class AppClusterParser : public ClusterParser {
public:
AppClusterParser() : mKvFactory(nullptr) {}
explicit AppClusterParser(std::unique_ptr<kv::ClientFactory> factory) : mKvFactory(std::move(factory)) {}
std::tuple<NodeId, ClusterId, ClusterMap> parse(const INIReader &) override;

static bool checkHasRoute(const std::string &routeStr, uint64_t clusterId, uint64_t epoch);

private:
std::unordered_map<ClusterId, Cluster> parseToClusterInfo(const std::string &infoStr) const;

std::unique_ptr<kv::ClientFactory> mKvFactory;
};

class AppInfo final {
public:
~AppInfo() = default;
Expand All @@ -32,7 +78,8 @@ class AppInfo final {
getInstance().initialized = false;
}

static void init(const INIReader &reader);
static void init(const INIReader &reader,
std::unique_ptr<ClusterParser> parser = std::make_unique<AppClusterParser>());

/// disallow copy ctor and copy assignment
AppInfo(const AppInfo &) = delete;
Expand All @@ -44,16 +91,16 @@ class AppInfo final {
static bool stressTestEnabled() { return getInstance().mStressTestEnabled; }
static std::string appVersion() { return getInstance().mAppVersion; }

static ClusterInfo getMyClusterInfo() {
static Cluster getMyClusterInfo() {
return getInstance().mAllClusterInfo[getInstance().mMyClusterId];
}

static ClusterInfo::Node getMyNode() {
static std::shared_ptr<AppNode> getMyNode() {
assert(getInstance().initialized);
return getMyClusterInfo().getAllNodeInfo()[getMyNodeId()];
return std::dynamic_pointer_cast<AppNode>(getMyClusterInfo().getAllNodes()[getMyNodeId()]);
}

static std::optional<ClusterInfo> getClusterInfo(uint64_t clusterId) {
static std::optional<Cluster> getClusterInfo(uint64_t clusterId) {
if (getInstance().mAllClusterInfo.count(clusterId)) {
return getInstance().mAllClusterInfo[clusterId];
} else {
Expand All @@ -65,20 +112,19 @@ class AppInfo final {
static ClusterId getMyClusterId() { return getInstance().mMyClusterId; }

static Port netAdminPort() {
auto node = getMyClusterInfo().getAllNodeInfo()[getMyNodeId()];
return node.mPortForNetAdmin;
return getMyNode()->netAdminPort();
}

static Port scalePort() {
return getMyNode().mPortForScale;
return getMyNode()->scalePort();
}

static Port gatewayPort() {
return getMyNode().mPortForGateway;
return getMyNode()->gateWayPort();
}

static Port fetchPort() {
return getMyNode().mPortForFetcher;
return getMyNode()->fetchPort();
}

private:
Expand Down Expand Up @@ -123,7 +169,7 @@ class AppInfo final {
/**
* Cluster Info
*/
std::map<ClusterId, ClusterInfo> mAllClusterInfo;
std::unordered_map<ClusterId, Cluster> mAllClusterInfo;
ClusterId mMyClusterId;
NodeId mMyNodeId;
};
Expand Down
8 changes: 4 additions & 4 deletions src/app_util/NetAdminServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ using gringofts::raft::RaftRole;
class NetAdminServer final : public AppNetAdmin::Service {
public:
NetAdminServer(const INIReader &reader,
std::shared_ptr<NetAdminServiceProvider> netAdminProxy, uint64_t port = kDefaultNetAdminPort) :
std::shared_ptr<NetAdminServiceProvider> netAdminProxy, uint64_t port = AppInfo::netAdminPort()) :
mServiceProvider(netAdminProxy),
mSnapshotTakenCounter(getCounter("snapshot_taken_counter", {})),
mSnapshotFailedCounter(getCounter("snapshot_failed_counter", {})),
Expand Down Expand Up @@ -240,9 +240,9 @@ class NetAdminServer final : public AppNetAdmin::Service {
}

std::vector<std::string> targets;
for (const auto &nodeKV : fromClusterOpt->getAllNodeInfo()) {
auto &node = nodeKV.second;
auto targetHost = node.mHostName + ":" + std::to_string(node.mPortForStream);
for (const auto &nodeKV : fromClusterOpt->getAllNodes()) {
auto node = std::dynamic_pointer_cast<AppNode>(nodeKV.second);
auto targetHost = node->hostName() + ":" + std::to_string(node->streamPort());
SPDLOG_INFO("set up sync target {}", targetHost);
targets.push_back(std::move(targetHost));
}
Expand Down
3 changes: 1 addition & 2 deletions src/app_util/control/CtrlState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
#include <spdlog/spdlog.h>

#include "../AppInfo.h"
#include "../../infra/util/ClusterInfo.h"
#include "../../infra/util/Signal.h"
#include "../../infra/raft/RaftSignal.h"

Expand Down Expand Up @@ -82,7 +81,7 @@ void CtrlState::recoverForEAL(std::string_view str) {
// for cluster Id > 0, need to start raft
// for cluster = 0, direct start raft
if (hasState() && mClusterId > 0) {
auto routeSignal = std::make_shared<gringofts::RouteSignal>(mEpoch, mClusterId);
auto routeSignal = std::make_shared<gringofts::app::RouteSignal>(mEpoch, mClusterId);
// query route info to guarantee it can start raft
Signal::hub << routeSignal;
if (routeSignal->getFuture().get()) {
Expand Down
2 changes: 1 addition & 1 deletion src/infra/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ set(GRINGOFTS_MONITOR_SRC

set(GRINGOFTS_UTIL_SRC
util/BigDecimal.cpp
util/ClusterInfo.cpp
util/Cluster.cpp
util/CryptoUtil.cpp
util/FileUtil.cpp
util/TrackingMemoryResource.cpp
Expand Down
4 changes: 2 additions & 2 deletions src/infra/raft/RaftBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace raft {
inline std::shared_ptr<RaftInterface> buildRaftImpl(
const char *configPath,
const NodeId &nodeId,
const ClusterInfo &clusterInfo,
const Cluster &cluster,
std::shared_ptr<DNSResolver> dnsResolver = nullptr,
RaftRole role = RaftRole::Follower ) {
INIReader iniReader(configPath);
Expand All @@ -46,7 +46,7 @@ inline std::shared_ptr<RaftInterface> buildRaftImpl(
/// use default dns resolver
dnsResolver = std::make_shared<DNSResolver>();
}
return std::make_shared<v2::RaftCore>(configPath, nodeId, clusterInfo, dnsResolver, role);
return std::make_shared<v2::RaftCore>(configPath, nodeId, cluster, dnsResolver, role);
} else {
SPDLOG_ERROR("Unknown raft implement version {}.", version);
exit(1);
Expand Down
Loading