diff --git a/Makefile b/Makefile index b5622a9..dd68dfb 100644 --- a/Makefile +++ b/Makefile @@ -18,11 +18,20 @@ endif WARN = -Wall -Wno-unused-function -finline-functions -Wno-sign-compare #-Wconversion INCPATH = -I./src -I$(THIRD_PATH)/include +<<<<<<< HEAD +CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) +LDFLAGS += $(THIRD_LIB) -lpthread +OS := $(shell uname -s) +ifeq ($(OS),Linux) + LFLAGS += -lrt +endif +======= CFLAGS = -std=c++0x $(WARN) $(OPT) $(INCPATH) $(EXTRA_CFLAGS) ifeq ($(USE_S3), 1) CFLAGS += -DUSE_S3=1 endif LDFLAGS = $(EXTRA_LDFLAGS) $(THIRD_LIB) -lpthread # -lrt +>>>>>>> upstream/master PS_LIB = build/libps.a PS_MAIN = build/libpsmain.a diff --git a/src/system/dashboard.cc b/src/system/dashboard.cc index 3e66962..a1348e5 100644 --- a/src/system/dashboard.cc +++ b/src/system/dashboard.cc @@ -3,7 +3,7 @@ namespace PS { -bool NodeIDCmp::operator()(const NodeID& a, const NodeID& b) { +bool NodeIDCmp::operator()(const NodeID& a, const NodeID& b) const { string a_primary, a_secondary; splitNodeID(a, a_primary, a_secondary); string b_primary, b_secondary; @@ -17,7 +17,7 @@ bool NodeIDCmp::operator()(const NodeID& a, const NodeID& b) { } } -void NodeIDCmp::splitNodeID(const NodeID& in, string& primary, string& secondary) { +void NodeIDCmp::splitNodeID(const NodeID& in, string& primary, string& secondary) const { size_t tailing_alpha_idx = in.find_last_not_of("0123456789"); if (std::string::npos == tailing_alpha_idx) { primary = in; diff --git a/src/system/dashboard.h b/src/system/dashboard.h index 1cff3b8..5054257 100644 --- a/src/system/dashboard.h +++ b/src/system/dashboard.h @@ -5,8 +5,8 @@ namespace PS { struct NodeIDCmp { - void splitNodeID(const NodeID& in, string& primary, string& secondary); - bool operator()(const NodeID& a, const NodeID& b); + void splitNodeID(const NodeID& in, string& primary, string& secondary) const; + bool operator()(const NodeID& a, const NodeID& b) const; }; class Dashboard { diff --git a/src/system/postoffice.cc b/src/system/postoffice.cc index 9b47588..5dfc8dd 100644 --- a/src/system/postoffice.cc +++ b/src/system/postoffice.cc @@ -115,6 +115,59 @@ void Postoffice::Recv() { } } +<<<<<<< HEAD +void Postoffice::manageNode(Task& tk) { + CHECK(tk.has_mng_node()); + auto& mng = tk.mng_node(); + switch (mng.cmd()) { + case ManageNode::CONNECT: { + CHECK(IamScheduler()); + CHECK_EQ(mng.node_size(), 1); + // first add this node into app + Task add = tk; + add.set_customer(CHECK_NOTNULL(app_)->name()); + add.mutable_mng_node()->set_cmd(ManageNode::ADD); + manageNode(add); + // create the app in this node + Task task; + task.set_request(true); + task.set_customer(app_->name()); + task.set_type(Task::MANAGE); + task.set_time(1); + task.mutable_mng_app()->set_cmd(ManageApp::ADD); + task.mutable_mng_app()->set_conf(app_conf_); + app_->port(mng.node(0).id())->submit(task); + // check if all nodes are connected + if (yp().num_workers() >= FLAGS_num_workers && + yp().num_servers() >= FLAGS_num_servers) { + nodes_are_ready_.set_value(); + } + tk.set_customer(app_->name()); // otherwise the remote node doesn't know + // how to find the according customer + break; + } + case ManageNode::ADD: + case ManageNode::UPDATE: { + auto obj = yp().customer(tk.customer()); + CHECK(obj) << "customer [" << tk.customer() << "] doesn't exists"; + for (int i = 0; i < mng.node_size(); ++i) { + auto node = mng.node(i); + yp().addNode(node); + obj->exec().add(node); + for (auto c : yp().children(obj->name())) { + auto child = yp().customer(c); + if (child) child->exec().add(node); + } + } + break; + } + case ManageNode::REPLACE: { + break; + } + case ManageNode::REMOVE: { + break; + } +======= bool Postoffice::Process(Message* msg) { if (!msg->task.request()) manager_.AddResponse(msg); // process this message @@ -126,6 +179,7 @@ bool Postoffice::Process(Message* msg) { int id = msg->task.customer_id(); // let the executor to delete "msg" manager_.customer(id)->executor()->Accept(msg); +>>>>>>> upstream/master } return true; } diff --git a/src/system/yellow_pages.h b/src/system/yellow_pages.h new file mode 100644 index 0000000..81c652e --- /dev/null +++ b/src/system/yellow_pages.h @@ -0,0 +1,57 @@ +#pragma once + +#include "util/common.h" +#include "system/proto/node.pb.h" +#include "system/van.h" + +namespace PS { + +class Customer; +typedef shared_ptr CustomerPtr; +class NodeGroup; + +// maintain inforamations about nodes and customers +class YellowPages { + public: + YellowPages() { } + ~YellowPages(); + void init() { van_.init(); } + + // manage customers + void addCustomer(Customer* obj); + // ask the system to delete the customer + void depositCustomer(const string& name); + void removeCustomer(const string& name); + Customer* customer(const string& name); + + void addRelation(const string& child, const string& parent) { + relations_[parent].push_back(child); + } + const std::vector& children(const string& parent) { + return relations_[parent]; + } + + // manage nodes + void addNode(const Node& node); + void removeNode(const Node& node); + + int num_workers() { return num_workers_; } + int num_servers() { return num_servers_; } + std::vector nodes(); + + Van& van() { return van_; } + + private: + DISALLOW_COPY_AND_ASSIGN(YellowPages); + int num_workers_ = 0; + int num_servers_ = 0; + + std::map nodes_; + std::map> customers_; + + // parent vs children + std::unordered_map> relations_; + Van van_; +}; + +} // namespace PS