diff --git a/CMakeLists.txt b/CMakeLists.txt index eedbca4..6ca0b44 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -104,6 +104,10 @@ endforeach(EXAMPLE) add_executable(gbZMQSignaling EXCLUDE_FROM_ALL utils/zmq_signaling_server.cpp ) target_link_libraries(gbZMQSignaling ${LIBS}) +# ASIO signaling server +add_executable(gbASIOSignaling EXCLUDE_FROM_ALL utils/asio_signaling_server.cpp ) +target_link_libraries(gbASIOSignaling ${LIBS}) + # CTest enable_testing() add_test(graybat_check_build "${CMAKE_COMMAND}" --build ${CMAKE_BINARY_DIR} --target check) diff --git a/graybatConfig.cmake b/graybatConfig.cmake index 391de82..5832bba 100644 --- a/graybatConfig.cmake +++ b/graybatConfig.cmake @@ -65,7 +65,7 @@ set(graybat_LIBRARIES ${graybat_LIBRARIES} ${ZMQ_LIBRARIES}) ############################################################################### # Boost LIB ############################################################################### -find_package(Boost 1.56.0 MODULE COMPONENTS mpi serialization REQUIRED) +find_package(Boost 1.56.0 MODULE COMPONENTS mpi serialization system REQUIRED) set(graybat_INCLUDE_DIRS ${graybat_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS}) set(graybat_LIBRARIES ${graybat_LIBRARIES} ${Boost_LIBRARIES}) diff --git a/include/graybat/communicationPolicy/Asio.hpp b/include/graybat/communicationPolicy/Asio.hpp new file mode 100644 index 0000000..ceb1b1a --- /dev/null +++ b/include/graybat/communicationPolicy/Asio.hpp @@ -0,0 +1,310 @@ +/** + * Copyright 2016 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +#pragma once + +// CLIB +#include /* assert */ +#include /* strup */ + +// STL +#include /* assert */ +#include /* array */ +#include /* std::cout */ +#include /* std::map */ +#include /* std::out_of_range */ +#include /* std::stringstream, std::istringstream */ +#include /* std::string */ + +// Boost.Asio +#include + +// GrayBat +#include +#include /* Base */ +#include /* Context */ +#include /* Event */ +#include /* Message */ +#include /* Config */ + +#define CHUNK_SIZE 8096 + +namespace graybat { + + namespace communicationPolicy { + + /************************************************************************//** + * @class Asio + * + * @brief Implementation of the Cage communicationPolicy interface + * based on Asio. + * + ***************************************************************************/ + struct Asio; + + namespace traits { + + template<> + struct ContextType { + using type = graybat::communicationPolicy::socket::Context; + }; + + template<> + struct ContextIDType { + using type = unsigned; + }; + + template<> + struct EventType { + using type = graybat::communicationPolicy::socket::Event; + }; + + template<> + struct ConfigType { + using type = graybat::communicationPolicy::asio::Config; + }; + + + } + + namespace socket { + + namespace traits { + + template<> + struct UriType { + using type = std::string; + }; + + template<> + struct SocketType { + using type = boost::asio::ip::tcp::socket; + }; + + template<> + struct MessageType { + using type = graybat::communicationPolicy::asio::Message; + }; + + } + + } + + struct Asio : public graybat::communicationPolicy::socket::Base { + + // Type defs + using Tag = graybat::communicationPolicy::Tag; + using ContextID = graybat::communicationPolicy::ContextID; + using MsgID = graybat::communicationPolicy::MsgID; + using VAddr = graybat::communicationPolicy::VAddr; + using Context = graybat::communicationPolicy::Context; + using Event = graybat::communicationPolicy::Event; + using Config = graybat::communicationPolicy::Config; + using MsgType = graybat::communicationPolicy::MsgType; + using Uri = graybat::communicationPolicy::socket::Uri; + using Socket = graybat::communicationPolicy::socket::Socket; + using Message = graybat::communicationPolicy::socket::Message; + using SocketBase = graybat::communicationPolicy::socket::Base; + + // Asio Sockets + boost::asio::io_service io_service; + + Socket signalingSocket; + Socket recvSocket; + Socket ctrlSocket; + std::vector sendSockets; + std::vector ctrlSendSockets; + + + // Uris + const Uri peerUri; + const Uri ctrlUri; + + // Construct + Asio(Config const config) : + recvSocket(io_service), + ctrlSocket(io_service), + signalingSocket(io_service), + peerUri(bindToNextFreePort(recvSocket, config.peerUri)), + ctrlUri(bindToNextFreePort(ctrlSocket, config.peerUri)), + SocketBase(config) { + + //std::cout << "PeerUri: " << peerUri << std::endl; + //std::cout << "CtrlUri: " << ctrlUri << std::endl; + SocketBase::init(); + + } + + // Destructor + ~Asio() { + //SocketBase::deinit(); + } + + Asio(Asio &&other) = delete; + + Asio(Asio &other) = delete; + + /***********************************************************************//** + * + * @name Socket base utilities + * + * @{ + * + ***************************************************************************/ + + void createSocketsToPeers() { + for (unsigned vAddr = 0; vAddr < initialContext.size(); vAddr++) { + sendSockets.emplace_back(Socket(io_service)); + } + } + + template + void connectToSocket(T_Socket &socket, Uri const uri) { + Uri baseUri = uri.substr(0, uri.rfind(":")).substr(uri.rfind("//") + 2); + Uri port = uri.substr(uri.rfind(":") + 1); + + boost::asio::ip::tcp::resolver resolver(io_service); + boost::asio::ip::tcp::resolver::iterator begin = resolver.resolve({baseUri, port}); + boost::asio::ip::tcp::resolver::iterator end; + + // TODO: No error occurs but I am not sure that it really connects + boost::system::error_code error; + boost::asio::ip::tcp::resolver::iterator iter = boost::asio::connect(socket, begin, end, error); + + if(iter != end) { + std::cout << "connect to: " << baseUri << ":" << port << std::endl; + } else { + std::cout << "failed to connect to: " << baseUri << ":" << port << std::endl; + } + + // other way to check for error +// if(!error) { +// std::cout << "connect to: " << baseUri << ":" << port << std::endl; +// } else { +// std::cout << "failed to connect to: " << baseUri << ":" << port << std::endl; +// } + + } + + Uri bindToNextFreePort(Socket &socket, Uri const peerUri) { + Uri peerBaseUri = peerUri.substr(0, peerUri.rfind(":")); + Uri finalPeerUri; + unsigned peerBasePort = std::stoi(peerUri.substr(peerUri.rfind(":") + 1)); + unsigned portToBind = 5555;//peerBasePort; + bool bind = false; + + if(!socket.is_open()) + { + socket.open(boost::asio::ip::tcp::v4()); + } + + while (!bind) { + boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), portToBind); + boost::system::error_code error; + socket.bind(endpoint, error); + if(!error) + { + bind = true; + finalPeerUri = peerBaseUri + ":" + std::to_string(portToBind); + + } + else + { + //std::cout << "Could not bind to port:" << portToBind << " of PeerUri \"" << peerUri << "\". Try to increment port and rebind." << std::endl; + portToBind++; + + } + + } + + //std::cout << "FinalPeerUri: " << finalPeerUri << std::endl; + + return finalPeerUri; + + } + + template + void recvFromSocket(T_Socket &socket, std::stringstream &ss) { + const size_t size = ss.str().size(); + std::array chunk; + size_t transferred = 0; + + while (transferred != size) { + size_t remaining = size - transferred; + size_t read_size = (remaining > CHUNK_SIZE) ? CHUNK_SIZE : remaining; + boost::asio::read(socket, boost::asio::buffer(chunk, read_size)); + ss.write(chunk.data(), read_size); + transferred += read_size; + } + + } + + template + void recvFromSocket(T_Socket &socket, Message &message) { + const size_t size = message.size(); + size_t transferred = 0; + + while (transferred != size) { + size_t remaining = size - transferred; + size_t read_size = (remaining > CHUNK_SIZE) ? CHUNK_SIZE : remaining; + boost::asio::read(socket, + boost::asio::buffer(static_cast(&message.getMessage()[transferred]), + read_size)); + transferred += read_size; + } + } + + template + void sendToSocket(T_Socket &socket, std::stringstream &ss) { + const size_t size = ss.str().size(); + size_t transferred = 0; + std::array chunk; + + while (transferred != size) { + size_t remaining = size - transferred; + size_t write_size = (remaining > CHUNK_SIZE) ? CHUNK_SIZE : remaining; + ss.read(chunk.data(), CHUNK_SIZE); + boost::asio::write(socket, boost::asio::buffer(chunk, write_size)); + transferred += write_size; + } + + } + + template + void sendToSocket(T_Socket &socket, std::vector &message) { + const size_t size = message.size(); + size_t transferred = 0; + + while (transferred != size) { + size_t remaining = size - transferred; + size_t write_size = (remaining > CHUNK_SIZE) ? CHUNK_SIZE : remaining; + boost::asio::write(socket, + boost::asio::buffer(static_cast(&message.data()[transferred]), + write_size)); + transferred += write_size; + } + + } + + }; // class Asio + + } // namespace communicationPolicy + +} // namespace graybat diff --git a/include/graybat/communicationPolicy/Base.hpp b/include/graybat/communicationPolicy/Base.hpp index 924cdbd..c19a06d 100644 --- a/include/graybat/communicationPolicy/Base.hpp +++ b/include/graybat/communicationPolicy/Base.hpp @@ -45,18 +45,6 @@ namespace graybat { using Context = typename graybat::communicationPolicy::Context; using Event = typename graybat::communicationPolicy::Event; - // TODO - // ==== - // - // Is there a way to prevent a lot of functions for - // slightly different functionality regarding the - // following options: - // - // * Blocking / Non Blocking - // * Var / Non Var - // * All / Single Receive - // - /*********************************************************************** * Interface ***********************************************************************/ diff --git a/include/graybat/communicationPolicy/ZMQ.hpp b/include/graybat/communicationPolicy/ZMQ.hpp index 148746d..780b1e3 100644 --- a/include/graybat/communicationPolicy/ZMQ.hpp +++ b/include/graybat/communicationPolicy/ZMQ.hpp @@ -46,10 +46,10 @@ namespace graybat { - + namespace communicationPolicy { - /************************************************************************//** + /************************************************************************//** * @class ZMQ * * @brief Implementation of the Cage communicationPolicy interface @@ -80,7 +80,7 @@ namespace graybat { using type = graybat::communicationPolicy::zmq::Config; }; - + } namespace socket { @@ -101,14 +101,14 @@ namespace graybat { struct MessageType { using type = graybat::communicationPolicy::zmq::Message; }; - + } - + } - struct ZMQ : public graybat::communicationPolicy::socket::Base { - - // Type defs + struct ZMQ : public graybat::communicationPolicy::socket::Base { + + // Type defs using Tag = graybat::communicationPolicy::Tag; using ContextID = graybat::communicationPolicy::ContextID; using MsgID = graybat::communicationPolicy::MsgID; @@ -118,54 +118,56 @@ namespace graybat { using Config = graybat::communicationPolicy::Config; using MsgType = graybat::communicationPolicy::MsgType; using Uri = graybat::communicationPolicy::socket::Uri; - using Socket = graybat::communicationPolicy::socket::Socket; + using Socket = graybat::communicationPolicy::socket::Socket; using Message = graybat::communicationPolicy::socket::Message; using SocketBase = graybat::communicationPolicy::socket::Base; - - // ZMQ Sockets + + // ZMQ Sockets ::zmq::context_t zmqContext; Socket recvSocket; - Socket ctrlSocket; + Socket ctrlSocket; Socket signalingSocket; std::vector sendSockets; - std::vector ctrlSendSockets; + std::vector ctrlSendSockets; // Uri - const Uri peerUri; + const Uri peerUri; const Uri ctrlUri; // Construct - ZMQ(Config const config) : + ZMQ(Config const config) : SocketBase(config), - zmqContext(1), + zmqContext(1), recvSocket(zmqContext, ZMQ_PULL), - ctrlSocket(zmqContext, ZMQ_PULL), - signalingSocket(zmqContext, ZMQ_REQ), + ctrlSocket(zmqContext, ZMQ_PULL), + signalingSocket(zmqContext, ZMQ_REQ), peerUri(bindToNextFreePort(recvSocket, config.peerUri)), - ctrlUri(bindToNextFreePort(ctrlSocket, config.peerUri)) - { + ctrlUri(bindToNextFreePort(ctrlSocket, config.peerUri)) { //std::cout << "PeerUri: " << peerUri << std::endl; SocketBase::init(); - + } - // Copy constructor - ZMQ(ZMQ &) = delete; - // Copy assignment constructor - ZMQ& operator=(ZMQ &) = delete; - // Move constructor - ZMQ(ZMQ &&other) = delete; - // Move assignment constructor - ZMQ& operator=(ZMQ &&) = delete; - - // Destructor - ~ZMQ(){ - SocketBase::deinit(); - } + // Copy constructor + ZMQ(ZMQ &) = delete; + + // Copy assignment constructor + ZMQ &operator=(ZMQ &) = delete; + + // Move constructor + ZMQ(ZMQ &&other) = delete; + + // Move assignment constructor + ZMQ &operator=(ZMQ &&) = delete; + + // Destructor + ~ZMQ() { + SocketBase::deinit(); + } - /***********************************************************************//** + /***********************************************************************//** * * @name Socket base utilities * @@ -173,70 +175,70 @@ namespace graybat { * ***************************************************************************/ - void createSocketsToPeers(){ - for(auto const &vAddr : initialContext){ - (void)vAddr; - sendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH)); - ctrlSendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH)); + void createSocketsToPeers() { + for (auto const &vAddr : initialContext) { + (void) vAddr; + sendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH)); + ctrlSendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH)); + } } - } - - template - void connectToSocket(T_Socket& socket, std::string const signalingUri) { + + template + void connectToSocket(T_Socket &socket, std::string const signalingUri) { socket.connect(signalingUri.c_str()); - + } - template - void recvFromSocket(T_Socket& socket, std::stringstream& ss) { - ::zmq::message_t message; - socket.recv(&message); - ss << static_cast(message.data()); - } + template + void recvFromSocket(T_Socket &socket, std::stringstream &ss) { + ::zmq::message_t message; + socket.recv(&message); + ss << static_cast(message.data()); + } - template - void recvFromSocket(T_Socket& socket, Message & message) { - socket.recv(&message.getMessage()); - } + template + void recvFromSocket(T_Socket &socket, Message &message) { + socket.recv(&message.getMessage()); + } - template - void sendToSocket(T_Socket& socket, std::stringstream const & ss) { + template + void sendToSocket(T_Socket &socket, std::stringstream const &ss) { std::string string = ss.str(); - ::zmq::message_t message(sizeof(char) * string.size()); - memcpy (static_cast(message.data()), string.data(), sizeof(char) * string.size()); - socket.send(message); - } + ::zmq::message_t message(sizeof(char) * string.size()); + memcpy(static_cast(message.data()), string.data(), sizeof(char) * string.size()); + socket.send(message); + } - template - void sendToSocket(T_Socket& socket, ::zmq::message_t & data) { + template + void sendToSocket(T_Socket &socket, ::zmq::message_t &data) { socket.send(data); } - Uri bindToNextFreePort(Socket &socket, const std::string peerUri){ - std::string peerBaseUri = peerUri.substr(0, peerUri.rfind(":")); - unsigned peerBasePort = std::stoi(peerUri.substr(peerUri.rfind(":") + 1)); - bool connected = false; + Uri bindToNextFreePort(Socket &socket, const std::string peerUri) { + std::string peerBaseUri = peerUri.substr(0, peerUri.rfind(":")); + unsigned peerBasePort = std::stoi(peerUri.substr(peerUri.rfind(":") + 1)); + bool connected = false; - std::string uri; - while(!connected){ + std::string uri; + while (!connected) { try { uri = peerBaseUri + ":" + std::to_string(peerBasePort); socket.bind(uri.c_str()); connected = true; } - catch(::zmq::error_t e){ - //std::cout << e.what() << ". PeerUri \"" << uri << "\". Try to increment port and rebind." << std::endl; + catch (::zmq::error_t e) { + //std::cout << e.what() << ". PeerUri \"" << uri << "\". Try to increment port and rebind." << std::endl; peerBasePort++; } - + } - return uri; - + return uri; + } - - }; // class ZMQ + + }; // class ZMQ } // namespace communicationPolicy - + } // namespace graybat diff --git a/include/graybat/communicationPolicy/asio/Config.hpp b/include/graybat/communicationPolicy/asio/Config.hpp new file mode 100644 index 0000000..5c93398 --- /dev/null +++ b/include/graybat/communicationPolicy/asio/Config.hpp @@ -0,0 +1,43 @@ +/** + * Copyright 2016 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + + +#pragma once + +namespace graybat { + + namespace communicationPolicy { + + namespace asio { + + struct Config { + + std::string masterUri; + std::string peerUri; + size_t contextSize; + std::string contextName = "context"; + size_t maxBufferSize = 100 * 1000 * 1000; + }; + + } // zmq + + } // namespace communicationPolicy + +} // namespace graybat diff --git a/include/graybat/communicationPolicy/asio/Message.hpp b/include/graybat/communicationPolicy/asio/Message.hpp new file mode 100644 index 0000000..71f6efd --- /dev/null +++ b/include/graybat/communicationPolicy/asio/Message.hpp @@ -0,0 +1,137 @@ +/** + * Copyright 2016 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +#pragma once + +// CLIB +#include /* std::uint8_t */ + +// STL +#include /* std::vector */ + +// GrayBat +#include + +namespace graybat { + + namespace communicationPolicy { + + namespace asio { + + template + struct Message { + + // Types + using CommunicationPolicy = T_CommunicationPolicy; + using ContextID = typename graybat::communicationPolicy::ContextID; + using VAddr = typename graybat::communicationPolicy::VAddr; + using Tag = typename graybat::communicationPolicy::Tag; + using MsgType = typename graybat::communicationPolicy::MsgType; + using MsgID = typename graybat::communicationPolicy::MsgID; + + // Members + std::vector message; + + // Methods + Message() : message(0) { + + } + + // TODO + template + Message(MsgType const msgType, + MsgID const msgID, + ContextID const contextID, + VAddr const srcVAddr, + Tag const tag, + T_Data & data) : message(sizeof(MsgType) + + sizeof(MsgID) + + sizeof(ContextID) + + sizeof(VAddr) + + sizeof(Tag) + + data.size() * sizeof(typename T_Data::value_type)){ + + size_t msgOffset(0); + memcpy (static_cast(message.data()) + msgOffset, &msgType, sizeof(MsgType)); msgOffset += sizeof(MsgType); + memcpy (static_cast(message.data()) + msgOffset, &msgID, sizeof(MsgID)); msgOffset += sizeof(MsgID); + memcpy (static_cast(message.data()) + msgOffset, &contextID, sizeof(ContextID)); msgOffset += sizeof(ContextID); + memcpy (static_cast(message.data()) + msgOffset, &srcVAddr, sizeof(VAddr)); msgOffset += sizeof(VAddr); + memcpy (static_cast(message.data()) + msgOffset, &tag, sizeof(Tag)); msgOffset += sizeof(Tag); + memcpy (static_cast(message.data()) + msgOffset, data.data(), sizeof(typename T_Data::value_type) * data.size()); + + } + + MsgType getMsgType(){ + MsgType msgType; + memcpy (&msgType, static_cast(message.data()), sizeof(MsgType)); + return msgType; + + + } + + MsgID getMsgID(){ + MsgID msgID; + memcpy (&msgID, static_cast(message.data()) + sizeof(MsgType), sizeof(MsgID)); + return msgID; + + } + + ContextID getContextID(){ + ContextID contextID; + memcpy (&contextID, static_cast(message.data()) + sizeof(MsgType) + sizeof(MsgID), sizeof(ContextID)); + return contextID; + + } + + VAddr getVAddr(){ + VAddr vAddr; + memcpy (&vAddr, static_cast(message.data()) + sizeof(MsgType) + sizeof(MsgID) + sizeof(ContextID), sizeof(VAddr)); + return vAddr; + + } + + Tag getTag(){ + Tag tag; + memcpy (&tag, static_cast(message.data()) + sizeof(MsgType) + sizeof(MsgID) + sizeof(ContextID) + sizeof(VAddr), sizeof(Tag)); + return tag; + + } + + std::int8_t* getData(){ + return static_cast(message.data()) + sizeof(MsgType) + sizeof(MsgID) + sizeof(ContextID) + sizeof(VAddr) + sizeof(Tag); + + } + + size_t size(){ + return message.size(); + } + + std::vector& getMessage(){ + return message; + } + + }; + + + } // asio + + } // namespace communicationPolicy + +} // namespace graybat diff --git a/include/graybat/communicationPolicy/socket/Base.hpp b/include/graybat/communicationPolicy/socket/Base.hpp index 14fcb62..7355e00 100644 --- a/include/graybat/communicationPolicy/socket/Base.hpp +++ b/include/graybat/communicationPolicy/socket/Base.hpp @@ -248,58 +248,60 @@ namespace graybat { -> void { // Connect to signaling process + std::cout << "connectToSocket masterUri:" << masterUri << std::endl; static_cast(this)->connectToSocket(static_cast(this)->signalingSocket, masterUri); // Retrieve Context id for initial context from signaling process + std::cout << "getContextID contextName:" << contextName << std::endl; ContextID contextID = getContextID(static_cast(this)->signalingSocket, contextName); - contextNames[contextID] = contextName; - - // Retrieve own vAddr from signaling process for initial context - VAddr vAddr = getVAddr(static_cast(this)->signalingSocket, contextID, static_cast(this)->peerUri, static_cast(this)->ctrlUri); - initialContext = Context(contextID, vAddr, contextSize); - contexts[initialContext.getID()] = initialContext; - - // Retrieve for uris of other peers from signaling process for the initial context - for(auto const &vAddr : initialContext){ - Uri remoteUri; - Uri ctrlUri; - std::tie(remoteUri, ctrlUri) = getUri(static_cast(this)->signalingSocket, initialContext.getID(), vAddr); - phoneBook[initialContext.getID()][vAddr] = remoteUri; - ctrlPhoneBook[initialContext.getID()][vAddr] = ctrlUri; - inversePhoneBook[initialContext.getID()][remoteUri] = vAddr; - inverseCtrlPhoneBook[initialContext.getID()][ctrlUri] = vAddr; - } - - // Create socket connection to other peers - // Create socketmapping from initial context to sockets of VAddrs - static_cast(this)->createSocketsToPeers(); - - for(auto const &vAddr : initialContext){ - sendSocketMappings[initialContext.getID()][vAddr] = vAddr; - static_cast(this)->connectToSocket(static_cast(this)->sendSockets.at(sendSocketMappings.at(initialContext.getID()).at(vAddr)), phoneBook.at(initialContext.getID()).at(vAddr).c_str()); - static_cast(this)->connectToSocket(static_cast(this)->ctrlSendSockets.at(sendSocketMappings.at(initialContext.getID()).at(vAddr)), ctrlPhoneBook.at(initialContext.getID()).at(vAddr).c_str()); - - //std::cout << "sendSocket_i: " << vAddr << " --> " << phoneBook.at(initialContext.getID()).at(vAddr) << std::endl; - - } - - // Create thread which recv all messages to this peer - recvHandler = std::thread(&Base::handleRecv, this); - ctrlHandler = std::thread(&Base::handleCtrl, this); +// contextNames[contextID] = contextName; +// +// // Retrieve own vAddr from signaling process for initial context +// VAddr vAddr = getVAddr(static_cast(this)->signalingSocket, contextID, static_cast(this)->peerUri, static_cast(this)->ctrlUri); +// initialContext = Context(contextID, vAddr, contextSize); +// contexts[initialContext.getID()] = initialContext; +// +// // Retrieve for uris of other peers from signaling process for the initial context +// for(auto const &vAddr : initialContext){ +// Uri remoteUri; +// Uri ctrlUri; +// std::tie(remoteUri, ctrlUri) = getUri(static_cast(this)->signalingSocket, initialContext.getID(), vAddr); +// phoneBook[initialContext.getID()][vAddr] = remoteUri; +// ctrlPhoneBook[initialContext.getID()][vAddr] = ctrlUri; +// inversePhoneBook[initialContext.getID()][remoteUri] = vAddr; +// inverseCtrlPhoneBook[initialContext.getID()][ctrlUri] = vAddr; +// } +// +// // Create socket connection to other peers +// // Create socketmapping from initial context to sockets of VAddrs +// static_cast(this)->createSocketsToPeers(); +// +// for(auto const &vAddr : initialContext){ +// sendSocketMappings[initialContext.getID()][vAddr] = vAddr; +// static_cast(this)->connectToSocket(static_cast(this)->sendSockets.at(sendSocketMappings.at(initialContext.getID()).at(vAddr)), phoneBook.at(initialContext.getID()).at(vAddr).c_str()); +// static_cast(this)->connectToSocket(static_cast(this)->ctrlSendSockets.at(sendSocketMappings.at(initialContext.getID()).at(vAddr)), ctrlPhoneBook.at(initialContext.getID()).at(vAddr).c_str()); +// +// //std::cout << "sendSocket_i: " << vAddr << " --> " << phoneBook.at(initialContext.getID()).at(vAddr) << std::endl; +// +// } +// +// // Create thread which recv all messages to this peer +// recvHandler = std::thread(&Base::handleRecv, this); +// ctrlHandler = std::thread(&Base::handleCtrl, this); } template auto Base::deinit() -> void { - std::stringstream ss; - ss << static_cast(MsgType::DESTRUCT) << " " << contextName; - static_cast(this)->sendToSocket(static_cast(this)->signalingSocket, ss); - - std::array null; - static_cast(this)->asyncSendImpl(MsgType::DESTRUCT, 0, initialContext, initialContext.getVAddr(), 0, null); - recvHandler.join(); - ctrlHandler.join(); +// std::stringstream ss; +// ss << static_cast(MsgType::DESTRUCT) << " " << contextName; +// static_cast(this)->sendToSocket(static_cast(this)->signalingSocket, ss); +// +// std::array null; +// static_cast(this)->asyncSendImpl(MsgType::DESTRUCT, 0, initialContext, initialContext.getVAddr(), 0, null); +// recvHandler.join(); +// ctrlHandler.join(); } diff --git a/include/graybat/communicationPolicy/socket/Context.hpp b/include/graybat/communicationPolicy/socket/Context.hpp new file mode 100644 index 0000000..d9853e1 --- /dev/null +++ b/include/graybat/communicationPolicy/socket/Context.hpp @@ -0,0 +1,125 @@ +/** + * Copyright 2016 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +#pragma once + +// STL +#include + +// GRAYBAT +#include +#include + +namespace graybat { + + namespace communicationPolicy { + + namespace socket { + + /** + * @brief A context represents a set of peers which are + * able to communicate with each other. + * + */ + template + class Context { + + using ContextID = typename graybat::communicationPolicy::ContextID; + using VAddr = typename graybat::communicationPolicy::VAddr; + using Tag = typename graybat::communicationPolicy::Tag; + using MsgType = typename graybat::communicationPolicy::MsgType; + using MsgID = typename graybat::communicationPolicy::MsgID; + + public: + Context() : + contextID(0), + vAddr(0), + nPeers(1), + isValid(false), + peers(0){ + + } + + Context(ContextID contextID, VAddr vAddr, unsigned nPeers) : + contextID(contextID), + vAddr(vAddr), + nPeers(nPeers), + isValid(true), + peers(0){ + + + peers.resize(nPeers); + std::iota(peers.begin(), peers.end(), 0); + } + + Context(ContextID contextID, VAddr vAddr, std::vector peers) : + contextID(contextID), + vAddr(vAddr), + nPeers(peers.size()), + isValid(true), + peers(peers) { + } + + size_t size() const{ + return nPeers; + } + + VAddr getVAddr() const { + return vAddr; + } + + ContextID getID() const { + return contextID; + } + + bool valid() const{ + return isValid; + } + + std::vector::iterator begin(){ + return peers.begin(); + } + + std::vector::const_iterator begin() const { + return peers.cbegin(); + } + + std::vector::iterator end(){ + return peers.end(); + } + + std::vector::const_iterator end() const { + return peers.cend(); + } + + private: + ContextID contextID; + VAddr vAddr; + unsigned nPeers; + bool isValid; + std::vector peers; + }; + + + } // socket + + } // namespace communicationPolicy + +} // namespace graybat diff --git a/include/graybat/communicationPolicy/socket/Event.hpp b/include/graybat/communicationPolicy/socket/Event.hpp new file mode 100644 index 0000000..1b0a0dd --- /dev/null +++ b/include/graybat/communicationPolicy/socket/Event.hpp @@ -0,0 +1,130 @@ +/** + * Copyright 2016 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +#pragma once + +// STL +#include /* std::unique_ptr */ + +// graybat +#include + +namespace graybat { + + namespace communicationPolicy { + namespace socket { + + /** + * @brief An event is returned by non-blocking + * communication operations and can be + * asked whether an operation has finished + * or it can be waited for this operation to + * be finished. + * + */ + template + class Event { + public: + + using ContextID = typename graybat::communicationPolicy::ContextID; + using VAddr = typename graybat::communicationPolicy::VAddr; + using Tag = typename graybat::communicationPolicy::Tag; + using MsgType = typename graybat::communicationPolicy::MsgType; + using MsgID = typename graybat::communicationPolicy::MsgID; + using Context = typename graybat::communicationPolicy::Context; + + Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_CP& comm) : + msgID(msgID), + context(context), + vAddr(vAddr), + tag(tag), + buf(nullptr), + size(0), + done(false), + comm(&comm) { + + } + + template + Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_Buf & buf, bool done, T_CP& comm) : + msgID(msgID), + context(context), + vAddr(vAddr), + tag(tag), + buf(reinterpret_cast(buf.data())), + size(sizeof(typename T_Buf::value_type) * buf.size()), + done(done), + comm(&comm) { + + } + + Event& operator=(const Event&) = default; + + void wait(){ + while(!ready()); + + } + + bool ready(){ + //std::cout << "ready? size: " << size << std::endl; + if(done == true){ + return true; + } + else { + if(buf == nullptr){ + // asyncSend Event + done = comm->ready(msgID, context, vAddr, tag); + } + else { + // asyncRecv Event + done = comm->asyncRecvImpl(MsgType::PEER, context, vAddr, tag, buf, size); + } + } + return done; + } + + VAddr source(){ + return vAddr; + } + + Tag getTag(){ + return tag; + + } + + MsgID msgID; + Context context; + VAddr vAddr; + Tag tag; + std::int8_t * buf; + size_t size; + bool done; + T_CP * comm; + + + + + }; + + } // socket + + } // namespace communicationPolicy + +} // namespace graybat diff --git a/test/CageUT.cpp b/test/CageUT.cpp index 3f0fce0..43f7474 100644 --- a/test/CageUT.cpp +++ b/test/CageUT.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -104,11 +105,14 @@ BOOST_AUTO_TEST_SUITE( graybat_cage_point_to_point_test ) using ZMQ = graybat::communicationPolicy::ZMQ; using BMPI = graybat::communicationPolicy::BMPI; + using ASIO = graybat::communicationPolicy::Asio; using GP = graybat::graphPolicy::BGL<>; using ZMQCage = graybat::Cage; using BMPICage = graybat::Cage; + using ASIOCage = graybat::Cage; using ZMQConfig = ZMQ::Config; using BMPIConfig = BMPI::Config; + using ASIOConfig = ASIO::Config; ZMQConfig zmqConfig = {"tcp://127.0.0.1:5000", "tcp://127.0.0.1:5001", @@ -117,8 +121,14 @@ BOOST_AUTO_TEST_SUITE( graybat_cage_point_to_point_test ) BMPIConfig bmpiConfig; + ASIOConfig asioConfig = {"tcp://127.0.0.1:6000", + "tcp://127.0.0.1:6001", + static_cast(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE"))), + "context_cage_test"}; + ZMQCage zmqCage(zmqConfig); BMPICage bmpiCage(bmpiConfig); + ASIOCage asioCage(asioConfig); auto cages = hana::make_tuple(std::ref(zmqCage), std::ref(bmpiCage) ); diff --git a/test/CommunicationPolicyUT.cpp b/test/CommunicationPolicyUT.cpp index e1e9cdb..00841bb 100644 --- a/test/CommunicationPolicyUT.cpp +++ b/test/CommunicationPolicyUT.cpp @@ -33,6 +33,7 @@ #include #include #include +#include /******************************************************************************* * Communication Policies to Test @@ -43,21 +44,30 @@ size_t const nRuns = 1000; using ZMQ = graybat::communicationPolicy::ZMQ; using BMPI = graybat::communicationPolicy::BMPI; +using Asio = graybat::communicationPolicy::Asio; using ZMQConfig = ZMQ::Config; using BMPIConfig = BMPI::Config; +using AsioConfig = Asio::Config; ZMQConfig zmqConfig = {"tcp://127.0.0.1:5000", "tcp://127.0.0.1:5001", static_cast(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE"))), "context_cp_test"}; +AsioConfig asioConfig = {"tcp://127.0.0.1:6000", + "tcp://127.0.0.1:5001", + static_cast(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE"))), + "context_cp_test"}; + BMPIConfig bmpiConfig; ZMQ zmqCP(zmqConfig); BMPI bmpiCP(bmpiConfig); +Asio asioCP(asioConfig); -auto communicationPolicies = hana::make_tuple(std::ref(zmqCP), - std::ref(bmpiCP) ); +auto communicationPolicies = hana::make_tuple(//std::ref(zmqCP), + //std::ref(bmpiCP), + std::ref(asioCP)); /******************************************************************************* diff --git a/utils/asio_signaling_server.cpp b/utils/asio_signaling_server.cpp new file mode 100644 index 0000000..f06082d --- /dev/null +++ b/utils/asio_signaling_server.cpp @@ -0,0 +1,332 @@ + +// CLIB +#include /* strup */ + +// STL +#include /* std::cout, std::endl */ +#include /* std::map */ +#include /* std::string */ +#include /* std::stringstream */ +#include /* std::enable_shared_from_this */ +// Boost.Asio +#include + +// Boost +#include + +#define CHUNK_SIZE 4096 + +// Type defs +typedef unsigned Tag; +typedef unsigned ContextID; +typedef unsigned VAddr; +typedef unsigned MsgType; +typedef std::string Uri; + +// Message tags +static const MsgType VADDR_REQUEST = 0; +static const MsgType VADDR_LOOKUP = 1; +static const MsgType DESTRUCT = 2; +static const MsgType RETRY = 3; +static const MsgType ACK = 4; +static const MsgType CONTEXT_INIT = 5; +static const MsgType CONTEXT_REQUEST = 6; + +template +void recvFromSocket(T_Socket& socket, std::stringstream& ss) { + std::array chunk; + boost::system::error_code error; + + std::size_t length = chunk.size(); + + while (length != 0) { + length = boost::asio::read(socket, boost::asio::buffer(chunk.data(), chunk.size()), error); + if(error){ + std::cout << error.message() << std::endl; + break; + } + ss << std::string(chunk.data(), length); + } + +} + +template +void sendToSocket(T_Socket& socket, std::stringstream & ss) { + const size_t size = ss.str().size(); + size_t transferred = 0; + std::array chunk; + + while (transferred != size){ + size_t remaining = size - transferred; + size_t write_size = (remaining > CHUNK_SIZE) ? CHUNK_SIZE : remaining; + ss.read(chunk.data(), CHUNK_SIZE); + boost::asio::write(socket, boost::asio::buffer(chunk, write_size)); + transferred += write_size; + } + +} + +//// TODO: needs to be shared +//// http://stackoverflow.com/questions/11014918/boostasio-infinite-loop +//class AcceptHandler : public std::enable_shared_from_this { +// +//public: +// +// AcceptHandler(boost::asio::ip::tcp::socket & socket, boost::asio::ip::tcp::acceptor & acceptor) : +// socket(socket), +// acceptor(acceptor) +// { +// +// } +// +// void accept() { +// +// acceptor.async_accept( +// socket, +// [this](boost::system::error_code error) { +// if (error) { +// std::cerr << "Error code:" << error << ". " << error.message() << std::endl; +// } else { +// std::cerr << "Accepted connection." << std::endl; +// //std::make_shared(socket, acceptor)->read(); +// read(); +// } +// +// //accept(); +// } +// ); +// +// } +// +// +// void read() { +// auto self(shared_from_this()); +// socket.async_receive(boost::asio::buffer(data_, max_length), +// [this, self](boost::system::error_code error, std::size_t length) { +// if (!error) { +// std::string s(data_, length); +// std::cout << "Read:" << s << std::endl; +// } else { +// std::cout << "Failed to read. Error code: " << error.message() << std::endl; +// } +// socket.close(); +// }); +// } +// +// +// +//private: +// +// boost::asio::ip::tcp::socket & socket; +// boost::asio::ip::tcp::acceptor & acceptor; +// enum { max_length = 1024 }; +// char data_[max_length]; +//}; + +int main(const int argc, char **argv){ + /*************************************************************************** + * Parse Commandline + **************************************************************************/ + namespace po = boost::program_options; + po::options_description options( "Asio Signaling Server Options" ); + + options.add_options() + ("port,p", + po::value()->default_value(6000), + "Port to listen for signaling requests") + ("ip", + po::value(), + "IP to listen for signaling requests. Either ip or interface can be specified. (Example: 127.0.0.1)") + ("interface,i", + po::value(), + "Interface to listen for signaling requests. Either ip or interface can be specified. Default are all available interfaces. (Example: eth0)") + ("protocoll", + po::value()->default_value("tcp"), + "Protocoll to listen for signaling requests. Options are tcp and udp. Default is udp.") + ("help,h", + "Print this help message and exit"); + + + po::variables_map vm; + po::store(po::parse_command_line( argc, argv, options ), vm); + + if(vm.count("help")){ + std::cout << "Usage: " << argv[0] << " [options] " << std::endl; + std::cout << options << std::endl; + exit(0); + } + + if(vm.count("ip") && vm.count("interface")) { + std::cerr << "Error: Only one of the following can be specified by parameter. Either ip or interface." << std::endl; + exit(1); + } + + std::string masterUri; + if(vm.count("ip")) { + //Listen to ip + masterUri = vm["protocoll"].as() + "://" + vm["ip"].as() + ":" + std::to_string(vm["port"].as()); + } else if(vm.count("interface")) { + //Listen to interface + masterUri = vm["protocoll"].as() + "://" + vm["interface"].as() + ":" + std::to_string(vm["port"].as()); + } else { + //Listen to all interfaces + masterUri = vm["protocoll"].as() + "://*:" + std::to_string(vm["port"].as()); + } + + /*************************************************************************** + * Start signaling + **************************************************************************/ + + std::cout << "Start asio signaling server" << std::endl; + + std::map > phoneBook; + std::map maxVAddr; + + std::cout << "Listening on: " << masterUri << std::endl; + + boost::asio::io_service io_service; + + + + + // Bind to port + +// socket.open(boost::asio::ip::tcp::v4()); + +// boost::system::error_code error; +// socket.bind(endpoint, error); +// if(error){ +// std::cerr << "Could not bind to port: " << vm["port"].as() << std::endl; +// } + + //AcceptHandler acceptHandler(socket, acceptor); + //acceptHandler.accept(); + + + //acceptor.accept(socket); + + //std::cout << "accepted connection" << std::endl; + + + while (true) { + boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), vm["port"].as()); + boost::asio::ip::tcp::endpoint peerEndpoint; + boost::asio::ip::tcp::socket socket(io_service); + boost::asio::ip::tcp::acceptor acceptor(io_service, endpoint); + acceptor.accept(socket, peerEndpoint); + + std::cout << "Accepted connection peerEndpoint.address:" << peerEndpoint.address().to_string() << std::endl; + + std::stringstream ss; + recvFromSocket(socket, ss); + std::cout << ss.str() << std::endl; + + ss << "Hello World"; + + sendToSocket(socket, ss); + } + + //TODO: implement async read + io_service.run(); + + + + // ContextID maxContextID = 0; + // ContextID maxInitialContextID = maxContextID; + + // while(true){ + // std::stringstream recvStream; + // std::stringstream sendStream; + + // recvFromSocket(socket, recvStream); + + // Uri srcUri; + // MsgType type; + // ContextID contextID; + // unsigned size; + // recvStream >> type; + + // switch(type){ + + // case CONTEXT_INIT: + // { + + // recvStream >> size; + // if(maxVAddr[maxInitialContextID] == size){ + // maxInitialContextID = ++maxContextID; + // maxVAddr[maxInitialContextID] = 0; + // } + + // sendStream << std::to_string(maxInitialContextID) << " "; + // sendToSocket(socket, sendStream); + // std::cout << "CONTEXT INIT [size:" << size << "]: " << maxInitialContextID << std::endl; + // break; + // } + + // case CONTEXT_REQUEST: + // { + // recvStream >> size; + // maxVAddr[++maxContextID] = 0; + + // sendStream << std::to_string(maxContextID) << " "; + // sendToSocket(socket, sendStream); + // std::cout << "CONTEXT REQUEST [size:" << size << "]: " << maxContextID << std::endl; + // break; + + // } + + // case VADDR_REQUEST: + // { + // // Reply with correct information + // recvStream >> contextID; + // recvStream >> srcUri; + + // phoneBook[contextID][maxVAddr[contextID]] = srcUri; + + // sendStream << std::to_string(maxVAddr[contextID]) << " "; + // sendToSocket(socket, sendStream); + // std::cout << "VADDR REQUEST [contextID:" << contextID << "][srcUri:" << srcUri << "]: " << maxVAddr[contextID] << std::endl; + // maxVAddr[contextID]++; + // break; + // } + + // case VADDR_LOOKUP: + // { + // VAddr remoteVAddr; + // recvStream >> contextID; + // recvStream >> remoteVAddr; + + + // if(phoneBook[contextID].count(remoteVAddr) == 0){ + // sendStream << RETRY << " "; + // sendToSocket(socket, sendStream); + // std::cout << "VADDR LOOKUP [contextID:" << contextID << "][remoteVAddr:" << remoteVAddr << "]: " << " RETRY"<< std::endl; + // } + // else { + // sendStream << ACK << " " << phoneBook[contextID][remoteVAddr] << " "; + // sendToSocket(socket, sendStream); + // std::cout << "VADDR LOOKUP [contextID:" << contextID << "][remoteVAddr:" << remoteVAddr << "]: " << phoneBook[contextID][remoteVAddr] << std::endl; + // } + + // break; + // } + + // case DESTRUCT: + // sendStream << " "; + // sendToSocket(socket, sendStream); + // std::cout << "DESTRUCT" << std::endl; + // break; + + // default: + // // Reply empty message + // sendStream << " "; + // sendToSocket(socket, sendStream); + // std::cout << "UNKNOWN MESSAGE TYPE" << std::endl; + // exit(0); + // break; + + // }; + + // } + +}