diff --git a/AUTHORS b/AUTHORS index dc1667e342..239a340112 100644 --- a/AUTHORS +++ b/AUTHORS @@ -102,6 +102,7 @@ Philip Kovacs Pieter Hintjens Piotr Trojanek Reza Ebrahimi +Remi Jouannet Richard Newton Rik van der Heijden Robert G. Jakabosky diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e5f4929dc..9184cd85f4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1160,6 +1160,9 @@ set(cxx-sources vmci_address.hpp vmci_connecter.hpp vmci_listener.hpp + vsock_address.hpp + vsock_connecter.hpp + vsock_listener.hpp windows.hpp wire.hpp xpub.hpp @@ -1233,6 +1236,10 @@ message(STATUS "Building with VMCI") list(APPEND cxx-sources vmci_address.cpp vmci_connecter.cpp vmci_listener.cpp vmci.cpp) endif() +if(ZMQ_HAVE_VSOCK) + list(APPEND cxx-sources vsock_address.cpp vsock_connecter.cpp vsock_listener.cpp) +endif() + if(ZMQ_HAVE_TIPC) list(APPEND cxx-sources tipc_address.cpp tipc_connecter.cpp tipc_listener.cpp) endif() diff --git a/Makefile.am b/Makefile.am index 19aa168096..dc49d64a28 100755 --- a/Makefile.am +++ b/Makefile.am @@ -255,6 +255,12 @@ src_libzmq_la_SOURCES = \ src/vmci_connecter.hpp \ src/vmci_listener.cpp \ src/vmci_listener.hpp \ + src/vsock_address.cpp \ + src/vsock_address.hpp \ + src/vsock_connecter.cpp \ + src/vsock_connecter.hpp \ + src/vsock_listener.cpp \ + src/vsock_listener.hpp \ src/windows.hpp \ src/wire.hpp \ src/xpub.cpp \ @@ -1047,6 +1053,19 @@ tests_test_reqrep_vmci_CXXFLAGS = @LIBZMQ_VMCI_CXXFLAGS@ endif +if BUILD_VSOCK +test_apps += tests/test_pair_vsock tests/test_reqrep_vsock + +tests_test_pair_vsock_SOURCES = tests/test_pair_vsock.cpp +tests_test_pair_vsock_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_pair_vsock_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_reqrep_vsock_SOURCES = tests/test_reqrep_vsock.cpp +tests_test_reqrep_vsock_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_reqrep_vsock_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +endif + if ENABLE_DRAFTS test_apps += tests/test_poller \ tests/test_client_server \ diff --git a/builds/cmake/platform.hpp.in b/builds/cmake/platform.hpp.in index e3b7ab8b95..a32cd268e5 100644 --- a/builds/cmake/platform.hpp.in +++ b/builds/cmake/platform.hpp.in @@ -71,6 +71,7 @@ #cmakedefine ZMQ_HAVE_OPENPGM #cmakedefine ZMQ_HAVE_NORM #cmakedefine ZMQ_HAVE_VMCI +#cmakedefine ZMQ_HAVE_VSOCK #cmakedefine ZMQ_MAKE_VALGRIND_HAPPY diff --git a/configure.ac b/configure.ac index 7832edab99..843720b2dd 100644 --- a/configure.ac +++ b/configure.ac @@ -128,6 +128,20 @@ AC_RUN_IFELSE( AC_MSG_RESULT([$libzmq_tipc_support]) + +# check vsock support +AC_MSG_CHECKING([if vsock is available ]) + +AC_CHECK_HEADERS([linux/vm_sockets.h], + [libzmq_vsock_support=yes], + [libzmq_vsock_support=no], + [ + #include + #include + ] +) + + AC_ARG_ENABLE([pedantic], [AS_HELP_STRING([--disable-pedantic], [disable pedantic compiler checks [default=enabled]])], [libzmq_pedantic=$enableval], [libzmq_pedantic=yes]) @@ -235,6 +249,10 @@ case "${host_os}" in if test "x$libzmq_tipc_support" = "xyes"; then AC_DEFINE(ZMQ_HAVE_TIPC, 1, [Have TIPC support]) fi + + if test "x$libzmq_vsock_support" = "xyes"; then + AC_DEFINE(ZMQ_HAVE_VSOCK, 1, [Have vsock support]) + fi case "${host_os}" in *android*) AC_DEFINE(ZMQ_HAVE_ANDROID, 1, [Have Android OS]) @@ -774,6 +792,7 @@ fi AM_CONDITIONAL(HAVE_VMCI, test "x$have_vmci_ext" != "xno") + # Set -Wall, -Werror and -pedantic AC_LANG_PUSH([C++]) @@ -789,6 +808,7 @@ if test "x$libzmq_pedantic" = "xyes"; then fi AC_LANG_POP([C++]) +AM_CONDITIONAL(BUILD_VSOCK, test "x$libzmq_vsock_support" = "xyes") AM_CONDITIONAL(BUILD_TIPC, test "x$libzmq_tipc_support" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$libzmq_on_mingw" = "xyes") AM_CONDITIONAL(ON_CYGWIN, test "x$libzmq_on_cygwin" = "xyes") diff --git a/doc/Makefile.am b/doc/Makefile.am index f4e4e9c708..24bd20f054 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -26,7 +26,7 @@ MAN3 = \ MAN7 = \ zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_inproc.7 zmq_ipc.7 \ zmq_null.7 zmq_plain.7 zmq_curve.7 zmq_tipc.7 zmq_vmci.7 zmq_udp.7 \ - zmq_gssapi.7 + zmq_gssapi.7 zmq_vsock.7 # ASCIIDOC_DOC_WITHOUT_INDEX contains all the Asciidoc files checked into the git repo, except for index.adoc ASCIIDOC_DOC_WITHOUT_INDEX = $(MAN3:%.3=%.adoc) $(MAN7:%.7=%.adoc) diff --git a/doc/zmq.adoc b/doc/zmq.adoc index f728654c1d..6a950d7ea5 100644 --- a/doc/zmq.adoc +++ b/doc/zmq.adoc @@ -165,12 +165,14 @@ Local inter-process communication transport:: Local in-process (inter-thread) communication transport:: * xref:zmq_inproc.adoc[zmq_inproc] -Virtual Machine Communications Interface (VMC) transport:: +Virtual Machine Communications Interface (VMCI) transport:: * xref:zmq_vmci.adoc[zmq_vmci] Unreliable unicast and multicast using UDP:: * xref:zmq_udp.adoc[zmq_udp] +Linux VSOCK (AF_VSOCK) transport:: + * xref:zmq_vsock.adoc[zmq_vsock] Proxies ~~~~~~~ diff --git a/doc/zmq_bind.adoc b/doc/zmq_bind.adoc index af0fd09440..89766c85fc 100644 --- a/doc/zmq_bind.adoc +++ b/doc/zmq_bind.adoc @@ -25,14 +25,15 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an 'pgm', 'epgm':: reliable multicast transport using PGM, see xref:zmq_pgm.adoc[zmq_pgm] 'vmci':: virtual machine communications interface (VMCI), see xref:zmq_vmci.adoc[zmq_vmci] 'udp':: unreliable unicast and multicast using UDP, see xref:zmq_udp.adoc[zmq_udp] +'vsock':: Linux VSOCK, see xref:zmq_vsock.adoc[zmq_vsock] Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one semantics. The precise semantics depend on the socket type and are defined in xref:zmq_socket.adoc[zmq_socket] -The 'ipc', 'tcp', 'vmci' and 'udp' transports accept wildcard addresses: see -xref:zmq_ipc.adoc[zmq_ipc], xref:zmq_tcp.adoc[zmq_tcp], xref:zmq_vmci.adoc[zmq_vmci] and -xref:zmq_udp.adoc[zmq_udp] for details. +The 'ipc', 'tcp', 'vmci', 'udp' and 'vsock' transports accept wildcard addresses: see +xref:zmq_ipc.adoc[zmq_ipc], xref:zmq_tcp.adoc[zmq_tcp], xref:zmq_vmci.adoc[zmq_vmci], +xref:zmq_udp.adoc[zmq_udp] and xref:zmq_vsock.adoc[zmq_vsock] for details. NOTE: the address syntax may be different for _zmq_bind()_ and _zmq_connect()_ especially for the 'tcp', 'pgm' and 'epgm' transports. diff --git a/doc/zmq_connect.adoc b/doc/zmq_connect.adoc index 63de3c544b..cb3c27be39 100644 --- a/doc/zmq_connect.adoc +++ b/doc/zmq_connect.adoc @@ -25,6 +25,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an 'pgm', 'epgm':: reliable multicast transport using PGM, see xref:zmq_pgm.adoc[zmq_pgm] 'vmci':: virtual machine communications interface (VMCI), see xref:zmq_vmci.adoc[zmq_vmci] 'udp':: unreliable unicast and multicast using UDP, see xref:zmq_udp.adoc[zmq_udp] +'vsock':: Linux VSOCK, see xref:zmq_vsock.adoc[zmq_vsock] Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one semantics. The precise semantics depend on the socket type and are defined in diff --git a/doc/zmq_inproc.adoc b/doc/zmq_inproc.adoc index 0e7877b453..eed9f25b36 100644 --- a/doc/zmq_inproc.adoc +++ b/doc/zmq_inproc.adoc @@ -73,6 +73,7 @@ assert (rc == 0); * xref:zmq_tcp.adoc[zmq_tcp] * xref:zmq_pgm.adoc[zmq_pgm] * xref:zmq_vmci.adoc[zmq_vmci] +* xref:zmq_vsock.adoc[zmq_vsock] * xref:zmq.adoc[zmq] diff --git a/doc/zmq_ipc.adoc b/doc/zmq_ipc.adoc index 378a2039a7..23f41e7374 100644 --- a/doc/zmq_ipc.adoc +++ b/doc/zmq_ipc.adoc @@ -90,6 +90,7 @@ assert (rc == 0); * xref:zmq_tcp.adoc[zmq_tcp] * xref:zmq_pgm.adoc[zmq_pgm] * xref:zmq_vmci.adoc[zmq_vmci] +* xref:zmq_vsock.adoc[zmq_vsock] * xref:zmq_getsockopt.adoc[zmq_getsockopt] * xref:zmq.adoc[zmq] diff --git a/doc/zmq_pgm.adoc b/doc/zmq_pgm.adoc index fb5cc81d24..c26ec83cec 100644 --- a/doc/zmq_pgm.adoc +++ b/doc/zmq_pgm.adoc @@ -170,6 +170,7 @@ assert (rc == 0); * xref:zmq_ipc.adoc[zmq_ipc] * xref:zmq_inproc.adoc[zmq_inproc] * xref:zmq_vmci.adoc[zmq_vmci] +* xref:zmq_vsock.adoc[zmq_vsock] * xref:zmq.adoc[zmq] diff --git a/doc/zmq_tcp.adoc b/doc/zmq_tcp.adoc index 6f1c53d262..0e01b09850 100644 --- a/doc/zmq_tcp.adoc +++ b/doc/zmq_tcp.adoc @@ -126,6 +126,7 @@ assert (rc == 0); * xref:zmq_ipc.adoc[zmq_ipc] * xref:zmq_inproc.adoc[zmq_inproc] * xref:zmq_vmci.adoc[zmq_vmci] +* xref:zmq_vsock.adoc[zmq_vsock] * xref:zmq.adoc[zmq] diff --git a/doc/zmq_tipc.adoc b/doc/zmq_tipc.adoc index 013d4941c6..437d53b18a 100644 --- a/doc/zmq_tipc.adoc +++ b/doc/zmq_tipc.adoc @@ -68,6 +68,7 @@ assert (rc == 0); * xref:zmq_ipc.adoc[zmq_ipc] * xref:zmq_inproc.adoc[zmq_inproc] * xref:zmq_vmci.adoc[zmq_vmci] +* xref:zmq_vsock.adoc[zmq_vsock] * xref:zmq.adoc[zmq] diff --git a/doc/zmq_udp.adoc b/doc/zmq_udp.adoc index 12693280a9..bfd7b71c9c 100644 --- a/doc/zmq_udp.adoc +++ b/doc/zmq_udp.adoc @@ -100,6 +100,7 @@ assert (rc == 0); * xref:zmq_ipc.adoc[zmq_ipc] * xref:zmq_inproc.adoc[zmq_inproc] * xref:zmq_vmci.adoc[zmq_vmci] +* xref:zmq_vsock.adoc[zmq_vsock] * xref:zmq.adoc[zmq] diff --git a/doc/zmq_unbind.adoc b/doc/zmq_unbind.adoc index 88533b364a..4510026d39 100644 --- a/doc/zmq_unbind.adoc +++ b/doc/zmq_unbind.adoc @@ -24,7 +24,8 @@ The 'endpoint' argument is as described in xref:zmq_bind.adoc[zmq_bind] Unbinding wild-card address from a socket ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When wild-card `*` 'endpoint' (described in xref:zmq_tcp.adoc[zmq_tcp], -xref:zmq_ipc.adoc[zmq_ipc], xref:zmq_udp.adoc[zmq_udp] and xref:zmq_vmci.adoc[zmq_vmci]) was used in +xref:zmq_ipc.adoc[zmq_ipc], xref:zmq_udp.adoc[zmq_udp], +xref:zmq_vmci.adoc[zmq_vmci] and xref:zmq_vsock.adoc[zmq_vsock]) was used in _zmq_bind()_, the caller should use real 'endpoint' obtained from the ZMQ_LAST_ENDPOINT socket option to unbind this 'endpoint' from a socket. diff --git a/doc/zmq_vmci.adoc b/doc/zmq_vmci.adoc index bbc7c2cf67..672f8cb269 100644 --- a/doc/zmq_vmci.adoc +++ b/doc/zmq_vmci.adoc @@ -2,7 +2,7 @@ == NAME -zmq_vmci - 0MQ transport over virtual machine communicatios interface (VMCI) sockets +zmq_vmci - 0MQ transport over virtual machine communications interface (VMCI) sockets == SYNOPSIS diff --git a/doc/zmq_vsock.adoc b/doc/zmq_vsock.adoc new file mode 100644 index 0000000000..bf7283232b --- /dev/null +++ b/doc/zmq_vsock.adoc @@ -0,0 +1,90 @@ += zmq_vsock(7) + + +== NAME +zmq_vsock - 0MQ transport over Linux VSOCK (AF_VSOCK) + + +== SYNOPSIS +The AF_VSOCK address family facilitates communication between virtual machines and the host they are running on. +This address family is used by guest agents and hypervisor services that need a communications channel that is independent of virtual machine network configuration. + +NOTE: AF_VSOCK is only supported on linux, + + +== ADDRESSING +A 0MQ endpoint is a string consisting of a 'transport'`://` followed by an +'address'. The 'transport' specifies the underlying protocol to use. The +'address' specifies the transport-specific address to connect to. + +For the VSOCK transport, the transport is `vsock`, and the meaning of +the 'address' part is defined below. + + +Binding a socket +~~~~~~~~~~~~~~~~ +When binding a 'socket' to a local address using _zmq_bind()_ with the 'vsock' +transport, the 'endpoint' shall be interpreted as an 'interface' followed by a +colon and the port number to use. + +An 'interface' may be specified by either of the following: + +* The wild-card `*`, meaning all available interfaces. +* An integer returned by `VMADDR_CID_LOCAL` or `@`. + +The port may be specified by: + +* A numeric value, usually above 1024 on POSIX systems. +* The wild-card `*`, meaning a system-assigned ephemeral port. + +Unbinding wild-card address from a socket +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When wild-card `*` 'endpoint' was used in _zmq_bind()_, the caller should use +real 'endpoint' obtained from the ZMQ_LAST_ENDPOINT socket option to unbind +this 'endpoint' from a socket using _zmq_unbind()_. + +Connecting a socket +~~~~~~~~~~~~~~~~~~~ +When connecting a socket to a peer address using _zmq_connect()_ with the 'vsock' +transport, the 'endpoint' shall be interpreted as a 'peer address' followed by +a colon and the port number to use. + +A 'peer address' must be a CID of the peer. + + +== EXAMPLES +.Assigning a local address to a socket +---- +// VSOCK port 5555 on all available interfaces +rc = zmq_bind(socket, "vsock://*:5555"); +assert (rc == 0); +// VSOCK port 5555 on the local loop-back interface on all platforms +cid = VMADDR_CID_LOCAL; +sprintf(endpoint, "vsock://%d:5555", cid); +rc = zmq_bind(socket, endpoint); +assert (rc == 0); +---- + +.Connecting a socket +---- +// Connecting using a CID +sprintf(endpoint, "vsock://%d:5555", cid); +rc = zmq_connect(socket, endpoint); +assert (rc == 0); +---- + + +== SEE ALSO +* xref:zmq_bind.adoc[zmq_bind] +* xref:zmq_connect.adoc[zmq_connect] +* xref:zmq_inproc.adoc[zmq_inproc] +* xref:zmq_tcp.adoc[zmq_tcp] +* xref:zmq_pgm.adoc[zmq_pgm] +* xref:zmq_vsock.adoc[zmq_vsock] +* xref:zmq_getsockopt.adoc[zmq_getsockopt] +* xref:zmq.adoc[zmq] + + +== AUTHORS +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/src/address.cpp b/src/address.cpp index f69a75dbe7..01c767703e 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -11,6 +11,17 @@ #include "tipc_address.hpp" #include "ws_address.hpp" +#if defined ZMQ_HAVE_VSOCK +// fix header conflict with VMCI +#define sockaddr_vm linux_sockaddr_vm +#define VMADDR_PORT_ANY LINUX_VMADDR_PORT_ANY +#define VMADDR_CID_ANY LINUX_VMADDR_CID_ANY +#include "vsock_address.hpp" +#undef sockaddr_vm +#undef VMADDR_CID_ANY +#undef VMADDR_PORT_ANY +#endif + #if defined ZMQ_HAVE_VMCI #include "vmci_address.hpp" #endif @@ -60,6 +71,11 @@ zmq::address_t::~address_t () LIBZMQ_DELETE (resolved.vmci_addr); } #endif +#if defined ZMQ_HAVE_VSOCK + else if (protocol == protocol_name::vsock) { + LIBZMQ_DELETE (resolved.vsock_addr); + } +#endif } int zmq::address_t::to_string (std::string &addr_) const @@ -88,6 +104,10 @@ int zmq::address_t::to_string (std::string &addr_) const if (protocol == protocol_name::vmci && resolved.vmci_addr) return resolved.vmci_addr->to_string (addr_); #endif +#if defined ZMQ_HAVE_VSOCK + if (protocol == protocol_name::vsock && resolved.vsock_addr) + return resolved.vsock_addr->to_string (addr_); +#endif if (!protocol.empty () && !address.empty ()) { std::stringstream s; diff --git a/src/address.hpp b/src/address.hpp index 0082646200..bab3ae1347 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -31,6 +31,9 @@ class tipc_address_t; #if defined ZMQ_HAVE_VMCI class vmci_address_t; #endif +#if defined ZMQ_HAVE_VSOCK +class vsock_address_t; +#endif namespace protocol_name { @@ -59,6 +62,9 @@ static const char tipc[] = "tipc"; #if defined ZMQ_HAVE_VMCI static const char vmci[] = "vmci"; #endif +#if defined ZMQ_HAVE_VSOCK +static const char vsock[] = "vsock"; +#endif } struct address_t @@ -94,6 +100,9 @@ struct address_t #endif #if defined ZMQ_HAVE_VMCI vmci_address_t *vmci_addr; +#endif +#if defined ZMQ_HAVE_VSOCK + vsock_address_t *vsock_addr; #endif } resolved; diff --git a/src/session_base.cpp b/src/session_base.cpp index 5a81b076d4..bfc25d165f 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -13,6 +13,7 @@ #include "tipc_connecter.hpp" #include "socks_connecter.hpp" #include "vmci_connecter.hpp" +#include "vsock_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "address.hpp" @@ -641,6 +642,13 @@ void zmq::session_base_t::start_connecting (bool wait_) io_thread, this, options, _addr, wait_, true, _wss_hostname); } #endif +#if defined ZMQ_HAVE_VSOCK + else if (_addr->protocol == protocol_name::vsock) { + connecter = new (std::nothrow) + vsock_connecter_t (io_thread, this, options, _addr, wait_); + } +#endif + if (connecter != NULL) { alloc_assert (connecter); launch_child (connecter); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c76ff98d97..3b98078c22 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -49,11 +49,25 @@ #ifdef ZMQ_HAVE_WSS #include "wss_address.hpp" #endif + +#if defined ZMQ_HAVE_VSOCK +// fix header conflict with VMCI +#define sockaddr_vm linux_sockaddr_vm +#define VMADDR_PORT_ANY LINUX_VMADDR_PORT_ANY +#define VMADDR_CID_ANY LINUX_VMADDR_CID_ANY +#include "vsock_address.hpp" +#include "vsock_listener.hpp" +#undef sockaddr_vm +#undef VMADDR_CID_ANY +#undef VMADDR_PORT_ANY +#endif + #if defined ZMQ_HAVE_VMCI #include "vmci_address.hpp" #include "vmci_listener.hpp" #endif + #ifdef ZMQ_HAVE_OPENPGM #include "pgm_socket.hpp" #endif @@ -343,6 +357,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) const #endif #if defined ZMQ_HAVE_VMCI && protocol_ != protocol_name::vmci +#endif +#if defined ZMQ_HAVE_VSOCK + && protocol_ != protocol_name::vsock #endif && protocol_ != protocol_name::udp) { errno = EPROTONOSUPPORT; @@ -743,6 +760,28 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) } #endif +#if defined ZMQ_HAVE_VSOCK + if (protocol == protocol_name::vsock) { + vsock_listener_t *listener = + new (std::nothrow) vsock_listener_t (io_thread, this, options); + alloc_assert (listener); + int rc = listener->set_local_address (address.c_str ()); + if (rc != 0) { + LIBZMQ_DELETE (listener); + event_bind_failed (make_unconnected_bind_endpoint_pair (address), + zmq_errno ()); + return -1; + } + + listener->get_local_address (_last_endpoint); + + add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint), + static_cast (listener), NULL); + options.connected = true; + return 0; + } +#endif + zmq_assert (false); return -1; } @@ -1041,6 +1080,18 @@ int zmq::socket_base_t::connect_internal (const char *endpoint_uri_) } } #endif +#if defined ZMQ_HAVE_VSOCK + else if (protocol == protocol_name::vsock) { + paddr->resolved.vsock_addr = + new (std::nothrow) vsock_address_t (this->get_ctx ()); + alloc_assert (paddr->resolved.vsock_addr); + int rc = paddr->resolved.vsock_addr->resolve (address.c_str ()); + if (rc != 0) { + LIBZMQ_DELETE (paddr); + return -1; + } + } +#endif // Create session. session_base_t *session = diff --git a/src/vsock_address.cpp b/src/vsock_address.cpp new file mode 100644 index 0000000000..c57e55011f --- /dev/null +++ b/src/vsock_address.cpp @@ -0,0 +1,139 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "precompiled.hpp" + +#include "vsock_address.hpp" + +#if defined ZMQ_HAVE_VSOCK + +#include +#include +#include +#include "err.hpp" + +zmq::vsock_address_t::vsock_address_t () +{ + memset (&address, 0, sizeof address); +} + +zmq::vsock_address_t::vsock_address_t (ctx_t *parent_) : parent (parent_) +{ + memset (&address, 0, sizeof address); +} + +zmq::vsock_address_t::vsock_address_t (const sockaddr *sa, + socklen_t sa_len, + ctx_t *parent_) : + parent (parent_) +{ + zmq_assert (sa && sa_len > 0); + + memset (&address, 0, sizeof address); + if (sa->sa_family == AF_VSOCK) + memcpy (&address, sa, sa_len); +} + +int zmq::vsock_address_t::resolve (const char *path_) +{ + // Find the ':' at end that separates address from the port number. + const char *delimiter = strrchr (path_, ':'); + if (!delimiter) { + errno = EINVAL; + return -1; + } + + // Separate the address/port. + std::string addr_str (path_, delimiter - path_); + std::string port_str (delimiter + 1); + + unsigned int cid = VMADDR_CID_ANY; + unsigned int port = VMADDR_PORT_ANY; + + if (!addr_str.length ()) { + errno = EINVAL; + return -1; + } else if (addr_str == "@") { + cid = VMADDR_CID_HOST; + + if (cid == VMADDR_CID_ANY) { + errno = ENODEV; + return -1; + } + } else if (addr_str != "*" && addr_str != "-1") { + const char *begin = addr_str.c_str (); + char *end = NULL; + unsigned long l = strtoul (begin, &end, 10); + + if ((l == 0 && end == begin) || (l == ULONG_MAX && errno == ERANGE) + || l > UINT_MAX) { + errno = EINVAL; + return -1; + } + + cid = static_cast (l); + } + + if (!port_str.length ()) { + errno = EINVAL; + return -1; + } else if (port_str != "*" && port_str != "-1") { + const char *begin = port_str.c_str (); + char *end = NULL; + unsigned long l = strtoul (begin, &end, 10); + + if ((l == 0 && end == begin) || (l == ULONG_MAX && errno == ERANGE) + || l > UINT_MAX) { + errno = EINVAL; + return -1; + } + + port = static_cast (l); + } + + address.svm_family = static_cast (AF_VSOCK); + address.svm_cid = cid; + address.svm_port = port; + + return 0; +} + +int zmq::vsock_address_t::to_string (std::string &addr_) const +{ + if (address.svm_family != AF_VSOCK) { + addr_.clear (); + return -1; + } + + std::stringstream s; + + s << "vsock://"; + + if (address.svm_cid == VMADDR_CID_ANY) { + s << "*"; + } else { + s << address.svm_cid; + } + + s << ":"; + + if (address.svm_port == VMADDR_PORT_ANY) { + s << "*"; + } else { + s << address.svm_port; + } + + addr_ = s.str (); + return 0; +} + +const sockaddr *zmq::vsock_address_t::addr () const +{ + return reinterpret_cast (&address); +} + +socklen_t zmq::vsock_address_t::addrlen () const +{ + return static_cast (sizeof address); +} + +#endif diff --git a/src/vsock_address.hpp b/src/vsock_address.hpp new file mode 100644 index 0000000000..aab3bfc80d --- /dev/null +++ b/src/vsock_address.hpp @@ -0,0 +1,43 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#ifndef __ZMQ_VSOCK_ADDRESS_HPP_INCLUDED__ +#define __ZMQ_VSOCK_ADDRESS_HPP_INCLUDED__ + +#include + +#include "platform.hpp" +#include "ctx.hpp" +#if defined ZMQ_HAVE_VSOCK +#include +#include + + +namespace zmq +{ +class vsock_address_t +{ + public: + vsock_address_t (); + vsock_address_t (ctx_t *parent_); + vsock_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_); + + // This function sets up the address for VSOCK transport. + int resolve (const char *path_); + + // The opposite to resolve() + int to_string (std::string &addr_) const; + + sa_family_t family () const; + const sockaddr *addr () const; + socklen_t addrlen () const; + + private: + struct sockaddr_vm address; + ctx_t *parent; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (vsock_address_t) +}; +} + +#endif +#endif diff --git a/src/vsock_connecter.cpp b/src/vsock_connecter.cpp new file mode 100644 index 0000000000..cab8a38172 --- /dev/null +++ b/src/vsock_connecter.cpp @@ -0,0 +1,231 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "precompiled.hpp" + +#include "vsock_connecter.hpp" +#include "vsock_address.hpp" + +#if defined ZMQ_HAVE_VSOCK + +#include + +#include "io_thread.hpp" +#include "platform.hpp" +#include "random.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "address.hpp" +#include "session_base.hpp" +#include "sys/socket.h" +#include "linux/vm_sockets.h" + +zmq::vsock_connecter_t::vsock_connecter_t (class io_thread_t *io_thread_, + class session_base_t *session_, + const options_t &options_, + address_t *addr_, + bool delayed_start_) : + stream_connecter_base_t ( + io_thread_, session_, options_, addr_, delayed_start_), + _connect_timer_started (false) +{ + zmq_assert (_addr->protocol == protocol_name::vsock); +} + +zmq::vsock_connecter_t::~vsock_connecter_t () +{ + zmq_assert (!_connect_timer_started); +} + +void zmq::vsock_connecter_t::process_term (int linger_) +{ + if (_connect_timer_started) { + cancel_timer (connect_timer_id); + _connect_timer_started = false; + } + + stream_connecter_base_t::process_term (linger_); +} + +void zmq::vsock_connecter_t::in_event () +{ + // We are not polling for incoming data, so we are actually called + // because of error here. However, we can get error on out event as well + // on some platforms, so we'll simply handle both events in the same way. + out_event (); +} + +void zmq::vsock_connecter_t::out_event () +{ + if (_connect_timer_started) { + cancel_timer (connect_timer_id); + _connect_timer_started = false; + } + + // TODO this is still very similar to (t)ipc_connecter_t, maybe the + // differences can be factored out + + rm_handle (); + + const fd_t fd = connect (); + + if (fd == retired_fd + && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED) + && errno == ECONNREFUSED)) { + send_conn_failed (_session); + close (); + terminate (); + return; + } + + // Handle the error condition by attempt to reconnect. + if (fd == retired_fd) { + close (); + add_reconnect_timer (); + return; + } + + create_engine ( + fd, zmq::vsock_connecter_t::get_socket_name (fd, socket_end_local)); +} + +std::string +zmq::vsock_connecter_t::get_socket_name (zmq::fd_t fd_, + socket_end_t socket_end_) const +{ + struct sockaddr_storage ss; + const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss); + if (sl == 0) { + return std::string (); + } + + const vsock_address_t addr (reinterpret_cast (&ss), sl, + this->get_ctx ()); + std::string address_string; + addr.to_string (address_string); + return address_string; +} + +void zmq::vsock_connecter_t::timer_event (int id_) +{ + if (id_ == connect_timer_id) { + _connect_timer_started = false; + rm_handle (); + close (); + add_reconnect_timer (); + } else + stream_connecter_base_t::timer_event (id_); +} + +void zmq::vsock_connecter_t::start_connecting () +{ + // Open the connecting socket. + const int rc = open (); + + // Connect may succeed in synchronous manner. + if (rc == 0) { + _handle = add_fd (_s); + out_event (); + } + + // Connection establishment may be delayed. Poll for its completion. + else if (rc == -1 && errno == EINPROGRESS) { + _handle = add_fd (_s); + set_pollout (_handle); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); + + // add userspace connect timeout + add_connect_timer (); + } + + // Handle any other error condition by eventual reconnect. + else { + if (_s != retired_fd) + close (); + add_reconnect_timer (); + } +} + +void zmq::vsock_connecter_t::add_connect_timer () +{ + if (options.connect_timeout > 0) { + add_timer (options.connect_timeout, connect_timer_id); + _connect_timer_started = true; + } +} + +int zmq::vsock_connecter_t::open () +{ + zmq_assert (_s == retired_fd); + + // Resolve the address + if (_addr->resolved.vsock_addr != NULL) { + LIBZMQ_DELETE (_addr->resolved.vsock_addr); + } + + _addr->resolved.vsock_addr = + new (std::nothrow) vsock_address_t (this->get_ctx ()); + alloc_assert (_addr->resolved.vsock_addr); + + // Convert the textual address into address structure. + _addr->resolved.vsock_addr->resolve (_addr->address.c_str ()); + + // Create the socket. + _s = open_socket (AF_VSOCK, SOCK_STREAM, 0); + + if (_s == retired_fd) { + // TODO we should emit some event in this case! + + LIBZMQ_DELETE (_addr->resolved.vsock_addr); + return -1; + } + + zmq_assert (_addr->resolved.vsock_addr != NULL); + + // Set the socket to non-blocking mode so that we get async connect(). + unblock_socket (_s); + + const vsock_address_t *const vsock_addr = _addr->resolved.vsock_addr; + + // Connect to the remote peer. + int rc = ::connect (_s, vsock_addr->addr (), vsock_addr->addrlen ()); + // Connect was successful immediately. + if (rc == 0) { + return 0; + } + + if (errno == EINTR) + errno = EINPROGRESS; + + return -1; +} + +zmq::fd_t zmq::vsock_connecter_t::connect () +{ + // Async connect has finished. Check whether an error occurred + int err = 0; + socklen_t len = sizeof err; + + const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, + reinterpret_cast (&err), &len); + + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. + // Following code should handle both Berkeley-derived socket + // implementations and Solaris. + if (rc == -1) + err = errno; + if (err != 0) { + errno = err; + errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK + && errno != ENOBUFS); + return retired_fd; + } + + // Return the newly connected socket. + const fd_t result = _s; + _s = retired_fd; + return result; +} + +#endif diff --git a/src/vsock_connecter.hpp b/src/vsock_connecter.hpp new file mode 100644 index 0000000000..cb4723bb45 --- /dev/null +++ b/src/vsock_connecter.hpp @@ -0,0 +1,79 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#ifndef __ZMQ_VSOCK_CONNECTER_HPP_INCLUDED__ +#define __ZMQ_VSOCK_CONNECTER_HPP_INCLUDED__ + +#include "platform.hpp" +#if defined ZMQ_HAVE_VSOCK + +#include "fd.hpp" +#include "own.hpp" +#include "stdint.hpp" +#include "io_object.hpp" +#include "stream_connecter_base.hpp" + +namespace zmq +{ +class io_thread_t; +class session_base_t; +struct address_t; + +class vsock_connecter_t ZMQ_FINAL : public stream_connecter_base_t +{ + public: + // If 'delayed_start' is true connecter first waits for a while, + // then starts connection process. + vsock_connecter_t (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_, + const options_t &options_, + address_t *addr_, + bool delayed_start_); + ~vsock_connecter_t (); + + protected: + std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const; + + private: + // ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id. + enum + { + connect_timer_id = 2 + }; + + // Handlers for incoming commands. + void process_term (int linger_); + + // Handlers for I/O events. + void in_event (); + void out_event (); + void timer_event (int id_); + + // Internal function to start the actual connection establishment. + void start_connecting (); + + // Internal function to add a connect timer + void add_connect_timer (); + + // Internal function to return a reconnect backoff delay. + // Will modify the current_reconnect_ivl used for next call + // Returns the currently used interval + int get_new_reconnect_ivl (); + + // Open Vsock connecting socket. Returns -1 in case of error, + // 0 if connect was successful immediately. Returns -1 with + // EAGAIN errno if async connect was launched. + int open (); + + // Get the file descriptor of newly created connection. Returns + // retired_fd if the connection was unsuccessful. + fd_t connect (); + + // True iff a timer has been started. + bool _connect_timer_started; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (vsock_connecter_t) +}; +} + +#endif +#endif diff --git a/src/vsock_listener.cpp b/src/vsock_listener.cpp new file mode 100644 index 0000000000..320a27040f --- /dev/null +++ b/src/vsock_listener.cpp @@ -0,0 +1,129 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "precompiled.hpp" + +#include "vsock_listener.hpp" +#if defined ZMQ_HAVE_VSOCK + + +#include + +//#include "stream_engine.hpp" +#include "vsock_address.hpp" +#include "io_thread.hpp" +#include "session_base.hpp" +#include "config.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "socket_base.hpp" +#include "sys/socket.h" +#include "linux/vm_sockets.h" +#include +#include + +zmq::vsock_listener_t::vsock_listener_t (io_thread_t *io_thread_, + socket_base_t *socket_, + const options_t &options_) : + stream_listener_base_t (io_thread_, socket_, options_) +{ +} + +void zmq::vsock_listener_t::in_event () +{ + fd_t fd = accept (); + + // If connection was reset by the peer in the meantime, just ignore it. + if (fd == retired_fd) { + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); + return; + } + + // Create the engine object for this connection. + create_engine (fd); +} + +std::string +zmq::vsock_listener_t::get_socket_name (zmq::fd_t fd_, + socket_end_t socket_end_) const +{ + struct sockaddr_storage ss; + const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss); + if (sl == 0) { + return std::string (); + } + + const vsock_address_t addr (reinterpret_cast (&ss), sl, + this->get_ctx ()); + std::string address_string; + addr.to_string (address_string); + return address_string; +} + +int zmq::vsock_listener_t::set_local_address (const char *addr_) +{ + // Create addr on stack for auto-cleanup + std::string addr (addr_); + + // Initialise the address structure. + vsock_address_t address (this->get_ctx ()); + int rc = address.resolve (addr.c_str ()); + if (rc != 0) + return -1; + + // Create a listening socket. + _s = open_socket (AF_VSOCK, SOCK_STREAM, 0); + if (_s == -1) + return -1; + + address.to_string (_endpoint); + + // Bind the socket. + rc = bind (_s, address.addr (), address.addrlen ()); + if (rc != 0) + goto error; + + // Listen for incoming connections. + rc = listen (_s, options.backlog); + if (rc != 0) + goto error; + + _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint), + _s); + return 0; + +error: + int err = errno; + close (); + errno = err; + return -1; +} + +zmq::fd_t zmq::vsock_listener_t::accept () +{ + // Accept one connection and deal with different failure modes. + // The situation where connection cannot be accepted due to insufficient + // resources is considered valid and treated by ignoring the connection. + zmq_assert (_s != retired_fd); + fd_t sock = ::accept (_s, NULL, NULL); + + if (sock == -1) { + errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR + || errno == ECONNABORTED || errno == EPROTO + || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE + || errno == ENFILE); + return retired_fd; + } + + // Race condition can cause socket not to be closed (if fork happens + // between accept and this point). +#ifdef FD_CLOEXEC + int rc = fcntl (sock, F_SETFD, FD_CLOEXEC); + errno_assert (rc != -1); +#endif + + return sock; +} + + +#endif diff --git a/src/vsock_listener.hpp b/src/vsock_listener.hpp new file mode 100644 index 0000000000..47686546f4 --- /dev/null +++ b/src/vsock_listener.hpp @@ -0,0 +1,50 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#ifndef __ZMQ_VSOCK_LISTENER_HPP_INCLUDED__ +#define __ZMQ_VSOCK_LISTENER_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_VSOCK + +#include + +#include "fd.hpp" +#include "vsock_address.hpp" +#include "stream_listener_base.hpp" + +namespace zmq +{ +class vsock_listener_t ZMQ_FINAL : public stream_listener_base_t +{ + public: + vsock_listener_t (zmq::io_thread_t *io_thread_, + zmq::socket_base_t *socket_, + const options_t &options_); + + // Set address to listen on. + int set_local_address (const char *addr_); + + protected: + std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const; + + private: + // Handlers for I/O events. + void in_event (); + + // Accept the new connection. Returns the file descriptor of the + // newly created connection. The function may return retired_fd + // if the connection was dropped while waiting in the listen backlog. + fd_t accept (); + + int create_socket (const char *addr_); + + // Address to listen on. + vsock_address_t _address; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (vsock_listener_t) +}; +} + +#endif +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index e0090dcde8..6391ff9684 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1801,6 +1801,10 @@ int zmq_has (const char *capability_) #if defined(ZMQ_HAVE_WSS) if (strcmp (capability_, "WSS") == 0) return true; +#endif +#if defined(ZMQ_HAVE_VSOCK) + if (strcmp (capability_, zmq::protocol_name::vsock) == 0) + return true; #endif // Whatever the application asked for, we don't have return false; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index eb59de29f9..a6b1bb7444 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -152,6 +152,10 @@ if(WITH_VMCI) list(APPEND tests test_pair_vmci test_reqrep_vmci) endif() +if(ZMQ_HAVE_VSOCK) + list(APPEND tests test_pair_vsock test_reqrep_vsock) +endif() + if(ENABLE_DRAFTS) list( APPEND diff --git a/tests/test_capabilities.cpp b/tests/test_capabilities.cpp index 33e8ae3148..7265518c14 100644 --- a/tests/test_capabilities.cpp +++ b/tests/test_capabilities.cpp @@ -60,6 +60,12 @@ void test_capabilities () #else TEST_ASSERT_TRUE (!zmq_has ("draft")); #endif + +#if defined(ZMQ_HAVE_VSOCK) + TEST_ASSERT_TRUE (zmq_has ("vsock")); +#else + TEST_ASSERT_TRUE (!zmq_has ("vsock")); +#endif } int main () diff --git a/tests/test_pair_vsock.cpp b/tests/test_pair_vsock.cpp new file mode 100644 index 0000000000..698c76f009 --- /dev/null +++ b/tests/test_pair_vsock.cpp @@ -0,0 +1,59 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include +#include +#include "sys/socket.h" +#include "linux/vm_sockets.h" +#include +#include + +SETUP_TEARDOWN_TESTCONTEXT + +void test_pair_vsock () +{ + unsigned int cid = VMADDR_CID_ANY; + int vsock = -1; + + if ((vsock = open ("/dev/vsock", O_RDONLY, 0)) < 0) { + TEST_IGNORE_MESSAGE ("failed to open /dev/vsock, skipping test"); + } else if (ioctl (vsock, IOCTL_VM_SOCKETS_GET_LOCAL_CID, &cid) < 0) { + TEST_IGNORE_MESSAGE ("failed to get local cid, skipping test"); + } + + if (vsock >= 0) { + close (vsock); + } + + if (cid == VMADDR_CID_ANY) + TEST_IGNORE_MESSAGE ("vsock environment unavailable, skipping test"); + + std::stringstream s; + s << "vsock://" << cid << ":" << 5561; + std::string endpoint = s.str (); + + void *sb = test_context_socket (ZMQ_PAIR); + int rc = zmq_bind (sb, endpoint.c_str ()); + if (rc < 0 && (errno == EAFNOSUPPORT || errno == EPROTONOSUPPORT)) + TEST_IGNORE_MESSAGE ("VSOCK not supported"); + TEST_ASSERT_SUCCESS_ERRNO (rc); + + void *sc = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ())); + + bounce (sb, sc); + + test_context_socket_close_zero_linger (sc); + test_context_socket_close_zero_linger (sb); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_pair_vsock); + return UNITY_END (); +} diff --git a/tests/test_reqrep_vsock.cpp b/tests/test_reqrep_vsock.cpp new file mode 100644 index 0000000000..cbb8449e7a --- /dev/null +++ b/tests/test_reqrep_vsock.cpp @@ -0,0 +1,59 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include +#include +#include "sys/socket.h" +#include "linux/vm_sockets.h" +#include +#include + +SETUP_TEARDOWN_TESTCONTEXT + +void test_reqrep_vsock () +{ + unsigned int cid = VMADDR_CID_ANY; + int vsock = -1; + + if ((vsock = open ("/dev/vsock", O_RDONLY, 0)) < 0) { + TEST_IGNORE_MESSAGE ("failed to open /dev/vsock, skipping test"); + } else if (ioctl (vsock, IOCTL_VM_SOCKETS_GET_LOCAL_CID, &cid) < 0) { + TEST_IGNORE_MESSAGE ("failed to get local cid, skipping test"); + } + + if (vsock >= 0) { + close (vsock); + } + + if (cid == VMADDR_CID_ANY) + TEST_IGNORE_MESSAGE ("vsock environment unavailable, skipping test"); + + std::stringstream s; + s << "vsock://" << cid << ":" << 5561; + std::string endpoint = s.str (); + + void *sb = test_context_socket (ZMQ_DEALER); + int rc = zmq_bind (sb, endpoint.c_str ()); + if (rc < 0 && (errno == EAFNOSUPPORT || errno == EPROTONOSUPPORT)) + TEST_IGNORE_MESSAGE ("VSOCK not supported"); + TEST_ASSERT_SUCCESS_ERRNO (rc); + + void *sc = test_context_socket (ZMQ_DEALER); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ())); + + bounce (sb, sc); + + test_context_socket_close_zero_linger (sc); + test_context_socket_close_zero_linger (sb); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_reqrep_vsock); + return UNITY_END (); +}