diff --git a/include/graybat/communicationPolicy/asio/Connection.hpp b/include/graybat/communicationPolicy/asio/Connection.hpp new file mode 100644 index 0000000..c94267f --- /dev/null +++ b/include/graybat/communicationPolicy/asio/Connection.hpp @@ -0,0 +1,193 @@ +/** + * Copyright 2017 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +#pragma once + +// STL +#include // std::int8_t +#include // EXIT_FAILURE, std::exit +#include // std::future +#include // std::cout, std::cerr +#include // std::shared_ptr +#include // std::stringstream + +// Boost.Asio +#include + +namespace graybat { +namespace communicationPolicy { +namespace asio { + +class Connection { + + using Callback = std::function; + using Future = std::future; + + public: + Connection(boost::asio::io_service&); + + public: // Blocking + auto receive(std::vector&) -> boost::system::error_code; + auto streamReceive(std::stringstream&) -> boost::system::error_code; + auto receivePrefixSize(std::vector&) -> boost::system::error_code ; + template + auto send(T_Buf&) -> boost::system::error_code ; + + + public: // Nonblocking + auto asyncReceive( + std::vector&, Callback = [](const boost::system::error_code&, std::size_t) {}) + -> Future; + + template + auto + asyncSend(const T_Buf&, const Callback& = [](const boost::system::error_code&, std::size_t) {}) + -> Future; + + auto asyncStreamSend( + const std::stringstream&, + const Callback& = [](const boost::system::error_code&, std::size_t) {}) -> Future; + + auto asyncSendPrefixSize( + const std::vector&, + const Callback& = [](const boost::system::error_code&, std::size_t) {}) -> Future; + + public: + boost::asio::ip::tcp::socket socket_; + boost::asio::ip::tcp::endpoint endpoint_; +}; + +inline Connection::Connection(boost::asio::io_service& ioService) + : socket_(ioService) +{ +} + +inline auto Connection::asyncReceive(std::vector& buffer, Callback onReceived) + -> Future +{ + std::shared_ptr> received + = std::make_shared>(); + boost::asio::async_read( + socket_, + boost::asio::buffer(buffer), + [onReceived, + received](const boost::system::error_code& error, std::size_t bytesTransferred) { + received->set_value(error); + onReceived(error, bytesTransferred); + }); + return received->get_future(); +} + +inline boost::system::error_code Connection::receive(std::vector& buffer) +{ + boost::system::error_code error; + boost::asio::read(socket_, boost::asio::buffer(buffer), error); + return error; +} + +inline auto Connection::receivePrefixSize(std::vector& payloadBuffer) + -> boost::system::error_code +{ + std::array headerBuffer; + std::stringstream headerStream; + boost::system::error_code error; + uint64_t maxLength = 0; + uint64_t remaining = 0; + + // Receive header + if (!socket_.read_some(boost::asio::buffer(headerBuffer), error)) { + return error; + } + std::copy(headerBuffer.begin(), headerBuffer.end(), std::ostream_iterator(headerStream)); + headerStream >> maxLength; + remaining = maxLength; + + // Receive payload + payloadBuffer.resize(remaining); + if (!boost::asio::read(socket_, boost::asio::buffer(payloadBuffer), error)) { + return error; + } + return {}; +} + +inline auto Connection::streamReceive(std::stringstream& payloadStream) -> boost::system::error_code +{ + std::vector buffer; + auto error = receivePrefixSize(buffer); + std::copy(buffer.begin(), buffer.end(), std::ostream_iterator(payloadStream)); + return error; +} + +template +inline boost::system::error_code Connection::send(T_Buf& buffer) +{ + boost::system::error_code error; + boost::asio::write(socket_, boost::asio::buffer(buffer.data(), buffer.size()), error); + return error; +} + +template +auto Connection::asyncSend(const T_Buf& buffer, const Callback& onSent) -> Future +{ + std::shared_ptr> sent + = std::make_shared>(); + boost::asio::async_write( + socket_, + boost::asio::buffer(buffer.data(), buffer.size()), + [onSent, sent](const boost::system::error_code& error, std::size_t transferedBytes) { + sent->set_value(error); + onSent(error, transferedBytes); + }); + return sent->get_future(); +} + +auto Connection::asyncSendPrefixSize( + const std::vector& payloadBuffer, const Callback& onSent) -> Future +{ + auto prefixedPayload = std::make_shared>(payloadBuffer.size() + 8, 0); + std::copy(payloadBuffer.begin(), payloadBuffer.end(), prefixedPayload->begin() + 8); + + std::string sizeString = std::to_string(payloadBuffer.size()); + std::copy(sizeString.begin(), sizeString.end(), prefixedPayload->begin()); + + return asyncSend( + *prefixedPayload.get(), + [prefixedPayload, + onSent](const boost::system::error_code& ec, std::size_t transferedBytes) { + onSent(ec, transferedBytes); + }); +} + +auto Connection::asyncStreamSend(const std::stringstream& payloadStream, const Callback& onSent) + -> Future +{ + std::string payloadString = payloadStream.str(); + auto payloadBuffer = std::make_shared>(payloadString.size(), 0); + std::copy(payloadString.begin(), payloadString.end(), payloadBuffer->begin()); + return asyncSendPrefixSize( + *payloadBuffer.get(), + [payloadBuffer, onSent](const boost::system::error_code& ec, std::size_t transferedBytes) { + onSent(ec, transferedBytes); + }); +} + +} /* asio */ +} /* communicationPolicy */ +} /* graybat */ \ No newline at end of file diff --git a/include/graybat/communicationPolicy/asio/Endpoint.hpp b/include/graybat/communicationPolicy/asio/Endpoint.hpp new file mode 100644 index 0000000..36c6803 --- /dev/null +++ b/include/graybat/communicationPolicy/asio/Endpoint.hpp @@ -0,0 +1,186 @@ +/** + * Copyright 2017 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +#pragma once + +// STL +#include /* std::shared_ptr */ +#include /* std::function */ +#include /* std::stoi */ +#include /* std::future */ + +// Boost.Asio +#include + +// Boost +#include + +// GRAYBAT +#include + +namespace graybat { +namespace communicationPolicy { +namespace asio { + +class Endpoint { + + using Callback = std::function&)>; + using Future = std::future; + using Port = std::string; + using Url = std::string; + +public: + Endpoint(boost::asio::io_service &, Port); + Endpoint(boost::asio::io_service &); + + auto accept() -> std::shared_ptr; + auto asyncAccept(const Callback& = [](const boost::system::error_code&, const std::shared_ptr &){}) -> Future; + + auto connect(Url, Port) -> std::shared_ptr; + auto asyncConnect(Url, Port, const Callback& = [](const boost::system::error_code&, const std::shared_ptr &){}) -> Future; + + auto bind(Port) -> boost::system::error_code; + +public: + boost::asio::ip::tcp::endpoint localEndpoint_; + +private: + boost::asio::ip::tcp::acceptor acceptor_; + boost::asio::ip::tcp::resolver resolver_; + boost::asio::io_service &ioService_; + std::vector> connections_; +}; + +inline Endpoint::Endpoint(boost::asio::io_service &ioService, Port port) + : localEndpoint_(boost::asio::ip::tcp::v4(), + static_cast(std::stoi(port))), + acceptor_(ioService), resolver_(ioService), ioService_(ioService) { + bind(port); +} + +inline Endpoint::Endpoint(boost::asio::io_service &ioService) + : acceptor_(ioService), resolver_(ioService), ioService_(ioService) { +} + +inline auto Endpoint::asyncAccept(const Callback &onAccepted) -> Future { + std::shared_ptr> accepted = + std::make_shared>(); + std::shared_ptr connection = + std::make_shared(ioService_); + + connections_.push_back(connection); + acceptor_.async_accept( + connections_.back()->socket_, connections_.back()->endpoint_, + [this, &onAccepted, accepted, connection](const boost::system::error_code &error) { + accepted->set_value(error); + onAccepted(error, connection); + asyncAccept(onAccepted); + }); + return accepted->get_future(); +} + +inline auto Endpoint::accept() -> std::shared_ptr { + std::shared_ptr connection = std::make_shared(ioService_); + boost::system::error_code error; + acceptor_.accept(connection->socket_, connection->endpoint_, error); + + if (error) { + return nullptr; + } + + connections_.push_back(connection); + return connection; +} + +inline auto Endpoint::connect(std::string url, std::string port) + -> std::shared_ptr { + + std::shared_ptr connection = std::make_shared(ioService_); + boost::asio::ip::tcp::resolver::iterator begin = + resolver_.resolve({url, port}); + boost::asio::ip::tcp::resolver::iterator end; + + boost::system::error_code error; + boost::asio::ip::tcp::resolver::iterator iter = + boost::asio::connect(connection->socket_, begin, error); + + if (error) { + return nullptr; + } + + connection->endpoint_ = iter->endpoint(); + connections_.push_back(connection); + return connection; +} + +inline auto Endpoint::asyncConnect(Url url, Port port, + const Callback &onConnected) -> Future { + + std::shared_ptr> connected = + std::make_shared>(); + + std::shared_ptr connection = + std::make_shared(ioService_); + + { + boost::asio::ip::tcp::resolver::iterator begin = + resolver_.resolve({url, port}); + boost::asio::ip::tcp::resolver::iterator end; + + boost::system::error_code error; + boost::asio::async_connect( + connection->socket_, begin, + [connected, connection, + onConnected](const boost::system::error_code &error, + boost::asio::ip::tcp::resolver::iterator iter) { + + if (!error) { + connection->endpoint_ = iter->endpoint(); + } + connected->set_value(error); + onConnected(error, connection); + }); + } + + return connected->get_future(); +} + +inline auto Endpoint::bind(Port port) -> boost::system::error_code { + boost::asio::ip::tcp::endpoint endpoint{ + boost::asio::ip::tcp::v4(), static_cast(std::stoi(port))}; + boost::system::error_code error; + + acceptor_.open(endpoint.protocol()); + acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + while (acceptor_.bind(endpoint, error) == + boost::system::errc::address_in_use) { + endpoint.port(endpoint.port() + 1); + } + + if (!error) { + acceptor_.listen(); + localEndpoint_ = endpoint; + } + + return error; +} +} +} +} /* namespace graybat::communicationPolicy::asio */ \ No newline at end of file diff --git a/test/unit/EndpointTests.cpp b/test/unit/EndpointTests.cpp new file mode 100644 index 0000000..878bc93 --- /dev/null +++ b/test/unit/EndpointTests.cpp @@ -0,0 +1,354 @@ +/** + * Copyright 2016 Erik Zenker + * + * This file is part of Graybat. + * + * Graybat is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * Graybat is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with Graybat. + * If not, see . + */ + +// BOOST +#include +#include + +// STL +#include // std::vector +#include // std::promise +#include // std::chrono::milliseconds + +// GRAYBAT +#include +#include + +const std::chrono::milliseconds successTimeout{1500}; + +/******************************************************************************* + * Test Suites + *******************************************************************************/ +BOOST_AUTO_TEST_SUITE( endpoint ) +using namespace graybat::communicationPolicy::asio; + + +/*************************************************************************** + * Test Cases + ****************************************************************************/ +BOOST_AUTO_TEST_CASE(shouldCreateEndpoint) { + boost::asio::io_service io_service; + Endpoint ep(io_service, "8888"); +} + +BOOST_AUTO_TEST_CASE(shouldRefuseConnection) { + boost::asio::io_service io_service; + Endpoint ep(io_service, "8888"); + + BOOST_REQUIRE(!ep.connect("127.0.0.1", "9999")); +} + +BOOST_AUTO_TEST_CASE(shouldBindAndConnect) { + boost::asio::io_service io_service; + Endpoint ep1(io_service); + Endpoint ep2(io_service); + ep1.bind("8888"); + ep2.bind("9999"); + + BOOST_REQUIRE(ep1.connect("127.0.0.1", "9999")); +} + +BOOST_AUTO_TEST_CASE(shouldRebindAndConnect) { + boost::asio::io_service io_service; + Endpoint ep1(io_service); + Endpoint ep2(io_service); + ep1.bind("8888"); + ep2.bind("8888"); + + BOOST_REQUIRE(ep1.connect("127.0.0.1", "8889")); +} + +BOOST_AUTO_TEST_CASE(shouldConnect) { + boost::asio::io_service io_service; + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + BOOST_REQUIRE(ep1.connect("127.0.0.1", "9999")); +} + +BOOST_AUTO_TEST_CASE(shouldAsynchronousConnect) { + boost::asio::io_service io_service; + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto connected = ep1.asyncConnect("127.0.0.1", "9999"); + ep2.accept(); + io_service.run_one(); + + BOOST_REQUIRE(connected.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!connected.get()); +} + +BOOST_AUTO_TEST_CASE(shouldTimeoutOnAsynchronousConnect) { + boost::asio::io_service io_service; + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto connected = ep1.asyncConnect("127.0.0.1", "9999"); + io_service.run_one(); + + BOOST_REQUIRE(connected.wait_for(successTimeout) == + std::future_status::timeout); +} + +BOOST_AUTO_TEST_CASE(shouldRefuseAsynchronousConnection) { + boost::asio::io_service io_service; + Endpoint ep1(io_service, "8888"); + + auto connected = ep1.asyncConnect("127.0.0.1", "9999"); + io_service.run_one(); + + BOOST_REQUIRE(connected.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(connected.get()); +} + +BOOST_AUTO_TEST_CASE(shouldAcceptConnection) { + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto ep2ToEp1 = ep2.accept(); + BOOST_REQUIRE(ep1ToEp2); + BOOST_REQUIRE(ep2ToEp1); +} + +BOOST_AUTO_TEST_CASE(shouldAsynchronousAcceptConnection) { + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + std::shared_ptr ep2ToEp1; + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto accepted = ep2.asyncAccept([&ep2ToEp1]( + const boost::system::error_code &, + const std::shared_ptr &connection) { ep2ToEp1 = connection; }); + + io_service.run_one(); + + BOOST_REQUIRE(accepted.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(ep1ToEp2); + BOOST_REQUIRE(ep2ToEp1); +} + +BOOST_AUTO_TEST_CASE(shouldAsynchronousConnectAndAsynchronousAccept) { + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + std::shared_ptr ep1ToEp2; + std::shared_ptr ep2ToEp1; + auto connected = ep1.asyncConnect( + "127.0.0.1", "9999", + [&ep1ToEp2](const boost::system::error_code&, + const std::shared_ptr& connection) { + ep1ToEp2 = connection; + }); + auto accepted = ep2.asyncAccept( + [&ep2ToEp1](const boost::system::error_code &, + const std::shared_ptr& connection) { + ep2ToEp1 = connection; + }); + + io_service.run_one(); + io_service.run_one(); + + BOOST_REQUIRE(connected.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(accepted.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(ep1ToEp2); +} + +BOOST_AUTO_TEST_CASE(shouldReceive) { + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto ep2ToEp1 = ep2.accept(); + + std::vector data1(1, 1); + std::vector data2(1, 0); + auto sent = ep1ToEp2->asyncSend(data1); + io_service.run_one(); + + ep2ToEp1->receive(data2); + + BOOST_REQUIRE(sent.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!sent.get()); + + for (auto elem : data2) { + BOOST_REQUIRE_EQUAL(elem, 1); + } +} + +BOOST_AUTO_TEST_CASE(shouldAsynchronousReceive) { + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto ep2ToEp1 = ep2.accept(); + + std::vector data1(1, 1); + std::vector data2(1, 0); + auto sent = ep1ToEp2->asyncSend(data1); + auto received = ep2ToEp1->asyncReceive(data2); + io_service.run_one(); + io_service.run_one(); + + BOOST_REQUIRE(sent.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(received.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!sent.get()); + BOOST_REQUIRE(!received.get()); + + for (auto elem : data2) { + BOOST_REQUIRE_EQUAL(elem, 1); + } +} + +BOOST_AUTO_TEST_CASE(shouldAsynchronousDoAll) { + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + std::shared_ptr ep1ToEp2; + std::shared_ptr ep2ToEp1; + auto connected = ep1.asyncConnect( + "127.0.0.1", "9999", + [&ep1ToEp2](const boost::system::error_code &, + const std::shared_ptr &connection) { + ep1ToEp2 = connection; + }); + auto accepted = ep2.asyncAccept( + [&ep2ToEp1](const boost::system::error_code &, + const std::shared_ptr &connection) { + ep2ToEp1 = connection; + }); + + io_service.run_one(); + io_service.run_one(); + + BOOST_REQUIRE(connected.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(accepted.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(ep1ToEp2); + BOOST_REQUIRE(ep2ToEp1); + + std::vector data1(100, 'P'); + std::vector data2(100, 0); + auto received = ep2ToEp1->asyncReceive(data2); + auto sent = ep1ToEp2->asyncSend(data1); + + io_service.run_one(); + io_service.run_one(); + + BOOST_REQUIRE(sent.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(received.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!sent.get()); + BOOST_REQUIRE(!received.get()); + + for (auto elem : data2) { + BOOST_REQUIRE_EQUAL(elem, 'P'); + } +} + +BOOST_AUTO_TEST_CASE(shouldReceiveStreamData) +{ + std::stringstream send; + std::stringstream recv; + + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto ep2ToEp1 = ep2.accept(); + + send << "42"; + auto sent = ep1ToEp2->asyncStreamSend(send); + io_service.run_one(); + + ep2ToEp1->streamReceive(recv); + + BOOST_REQUIRE(sent.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!sent.get()); + + BOOST_REQUIRE_EQUAL(recv.str(), "42"); + +} + +BOOST_AUTO_TEST_CASE(shouldReceivePrefixSizeData) +{ + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto ep2ToEp1 = ep2.accept(); + + std::vector data1(100, 'P'); + std::vector data2(100, 0); + auto sent = ep1ToEp2->asyncSendPrefixSize(data1); + io_service.run_one(); + ep2ToEp1->receivePrefixSize(data2); + + BOOST_REQUIRE(sent.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!sent.get()); + + for (auto elem : data2) { + BOOST_REQUIRE_EQUAL(elem, 'P'); + } +} + +BOOST_AUTO_TEST_CASE(shouldReceiveFromDifferentThread) +{ + boost::asio::io_service io_service; + + Endpoint ep1(io_service, "8888"); + Endpoint ep2(io_service, "9999"); + + auto ep1ToEp2 = ep1.connect("127.0.0.1", "9999"); + auto ep2ToEp1 = ep2.accept(); + + std::vector data1(100, 'P'); + std::vector data2(100, 0); + auto sent = ep1ToEp2->asyncSendPrefixSize(data1); + io_service.run_one(); + std::thread t([&](){ + ep2ToEp1->receivePrefixSize(data2);}); + + BOOST_REQUIRE(sent.wait_for(successTimeout) == std::future_status::ready); + BOOST_REQUIRE(!sent.get()); + + t.join(); + + for (auto elem : data2) { + BOOST_REQUIRE_EQUAL(elem, 'P'); + } +} + +BOOST_AUTO_TEST_SUITE_END()