Skip to content

Commit 955d407

Browse files
committed
multiprocess: Add IPC connectAddress and listenAddress methods
Allow listening on and connecting to unix sockets.
1 parent 4da2043 commit 955d407

File tree

6 files changed

+181
-1
lines changed

6 files changed

+181
-1
lines changed

src/interfaces/ipc.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ class Init;
4141
//! to make other proxy objects calling other remote interfaces. It can also
4242
//! destroy the initial interfaces::Init object to close the connection and
4343
//! shut down the spawned process.
44+
//!
45+
//! When connecting to an existing process, the steps are similar to spawning a
46+
//! new process, except a socket is created instead of a socketpair, and
47+
//! destroying an Init interface doesn't end the process, since there can be
48+
//! multiple connections.
4449
class Ipc
4550
{
4651
public:
@@ -54,6 +59,17 @@ class Ipc
5459
//! true. If this is not a spawned child process, return false.
5560
virtual bool startSpawnedProcess(int argc, char* argv[], int& exit_status) = 0;
5661

62+
//! Connect to a socket address and make a client interface proxy object
63+
//! using provided callback. connectAddress returns an interface pointer if
64+
//! the connection was established, returns null if address is empty ("") or
65+
//! disabled ("0") or if a connection was refused but not required ("auto"),
66+
//! and throws an exception if there was an unexpected error.
67+
virtual std::unique_ptr<Init> connectAddress(std::string& address) = 0;
68+
69+
//! Connect to a socket address and make a client interface proxy object
70+
//! using provided callback. Throws an exception if there was an error.
71+
virtual void listenAddress(std::string& address) = 0;
72+
5773
//! Add cleanup callback to remote interface that will run when the
5874
//! interface is deleted.
5975
template<typename Interface>

src/ipc/capnp/protocol.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
#include <mutex>
2424
#include <optional>
2525
#include <string>
26+
#include <sys/socket.h>
27+
#include <system_error>
2628
#include <thread>
2729

2830
namespace ipc {
@@ -51,6 +53,14 @@ class CapnpProtocol : public Protocol
5153
startLoop(exe_name);
5254
return mp::ConnectStream<messages::Init>(*m_loop, fd);
5355
}
56+
void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override
57+
{
58+
startLoop(exe_name);
59+
if (::listen(listen_fd, /*backlog=*/5) != 0) {
60+
throw std::system_error(errno, std::system_category());
61+
}
62+
mp::ListenConnections<messages::Init>(*m_loop, listen_fd, init);
63+
}
5464
void serve(int fd, const char* exe_name, interfaces::Init& init) override
5565
{
5666
assert(!m_loop);

src/ipc/interfaces.cpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Distributed under the MIT software license, see the accompanying
33
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
44

5+
#include <common/args.h>
56
#include <common/system.h>
67
#include <interfaces/init.h>
78
#include <interfaces/ipc.h>
@@ -56,6 +57,35 @@ class IpcImpl : public interfaces::Ipc
5657
exit_status = EXIT_SUCCESS;
5758
return true;
5859
}
60+
std::unique_ptr<interfaces::Init> connectAddress(std::string& address) override
61+
{
62+
if (address.empty() || address == "0") return nullptr;
63+
int fd;
64+
if (address == "auto") {
65+
// Treat "auto" the same as "unix" except don't treat it an as error
66+
// if the connection is not accepted. Just return null so the caller
67+
// can work offline without a connection, or spawn a new
68+
// bitcoin-node process and connect to it.
69+
address = "unix";
70+
try {
71+
fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address);
72+
} catch (const std::system_error& e) {
73+
// If connection type is auto and socket path isn't accepting connections, or doesn't exist, catch the error and return null;
74+
if (e.code() == std::errc::connection_refused || e.code() == std::errc::no_such_file_or_directory) {
75+
return nullptr;
76+
}
77+
throw;
78+
}
79+
} else {
80+
fd = m_process->connect(gArgs.GetDataDirNet(), "bitcoin-node", address);
81+
}
82+
return m_protocol->connect(fd, m_exe_name);
83+
}
84+
void listenAddress(std::string& address) override
85+
{
86+
int fd = m_process->bind(gArgs.GetDataDirNet(), m_exe_name, address);
87+
m_protocol->listen(fd, m_exe_name, m_init);
88+
}
5989
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
6090
{
6191
m_protocol->addCleanup(type, iface, std::move(cleanup));

src/ipc/process.cpp

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,28 @@
44

55
#include <ipc/process.h>
66
#include <ipc/protocol.h>
7+
#include <logging.h>
78
#include <mp/util.h>
89
#include <tinyformat.h>
910
#include <util/fs.h>
1011
#include <util/strencodings.h>
12+
#include <util/syserror.h>
1113

1214
#include <cstdint>
1315
#include <cstdlib>
16+
#include <errno.h>
1417
#include <exception>
1518
#include <iostream>
1619
#include <stdexcept>
1720
#include <string.h>
18-
#include <system_error>
21+
#include <sys/socket.h>
22+
#include <sys/un.h>
1923
#include <unistd.h>
2024
#include <utility>
2125
#include <vector>
2226

27+
using util::RemovePrefixView;
28+
2329
namespace ipc {
2430
namespace {
2531
class ProcessImpl : public Process
@@ -54,7 +60,95 @@ class ProcessImpl : public Process
5460
}
5561
return true;
5662
}
63+
int connect(const fs::path& data_dir,
64+
const std::string& dest_exe_name,
65+
std::string& address) override;
66+
int bind(const fs::path& data_dir, const std::string& exe_name, std::string& address) override;
5767
};
68+
69+
static bool ParseAddress(std::string& address,
70+
const fs::path& data_dir,
71+
const std::string& dest_exe_name,
72+
struct sockaddr_un& addr,
73+
std::string& error)
74+
{
75+
if (address.compare(0, 4, "unix") == 0 && (address.size() == 4 || address[4] == ':')) {
76+
fs::path path;
77+
if (address.size() <= 5) {
78+
path = data_dir / fs::PathFromString(strprintf("%s.sock", RemovePrefixView(dest_exe_name, "bitcoin-")));
79+
} else {
80+
path = data_dir / fs::PathFromString(address.substr(5));
81+
}
82+
std::string path_str = fs::PathToString(path);
83+
address = strprintf("unix:%s", path_str);
84+
if (path_str.size() >= sizeof(addr.sun_path)) {
85+
error = strprintf("Unix address path %s exceeded maximum socket path length", fs::quoted(fs::PathToString(path)));
86+
return false;
87+
}
88+
memset(&addr, 0, sizeof(addr));
89+
addr.sun_family = AF_UNIX;
90+
strncpy(addr.sun_path, path_str.c_str(), sizeof(addr.sun_path)-1);
91+
return true;
92+
}
93+
94+
error = strprintf("Unrecognized address '%s'", address);
95+
return false;
96+
}
97+
98+
int ProcessImpl::connect(const fs::path& data_dir,
99+
const std::string& dest_exe_name,
100+
std::string& address)
101+
{
102+
struct sockaddr_un addr;
103+
std::string error;
104+
if (!ParseAddress(address, data_dir, dest_exe_name, addr, error)) {
105+
throw std::invalid_argument(error);
106+
}
107+
108+
int fd;
109+
if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) {
110+
throw std::system_error(errno, std::system_category());
111+
}
112+
if (::connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
113+
return fd;
114+
}
115+
int connect_error = errno;
116+
if (::close(fd) != 0) {
117+
LogPrintf("Error closing file descriptor %i '%s': %s\n", fd, address, SysErrorString(errno));
118+
}
119+
throw std::system_error(connect_error, std::system_category());
120+
}
121+
122+
int ProcessImpl::bind(const fs::path& data_dir, const std::string& exe_name, std::string& address)
123+
{
124+
struct sockaddr_un addr;
125+
std::string error;
126+
if (!ParseAddress(address, data_dir, exe_name, addr, error)) {
127+
throw std::invalid_argument(error);
128+
}
129+
130+
if (addr.sun_family == AF_UNIX) {
131+
fs::path path = addr.sun_path;
132+
if (path.has_parent_path()) fs::create_directories(path.parent_path());
133+
if (fs::symlink_status(path).type() == fs::file_type::socket) {
134+
fs::remove(path);
135+
}
136+
}
137+
138+
int fd;
139+
if ((fd = ::socket(addr.sun_family, SOCK_STREAM, 0)) == -1) {
140+
throw std::system_error(errno, std::system_category());
141+
}
142+
143+
if (::bind(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0) {
144+
return fd;
145+
}
146+
int bind_error = errno;
147+
if (::close(fd) != 0) {
148+
LogPrintf("Error closing file descriptor %i: %s\n", fd, SysErrorString(errno));
149+
}
150+
throw std::system_error(bind_error, std::system_category());
151+
}
58152
} // namespace
59153

60154
std::unique_ptr<Process> MakeProcess() { return std::make_unique<ProcessImpl>(); }

src/ipc/process.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ class Process
3434
//! process. If so, return true and a file descriptor for communicating
3535
//! with the parent process.
3636
virtual bool checkSpawned(int argc, char* argv[], int& fd) = 0;
37+
38+
//! Canonicalize and connect to address, returning socket descriptor.
39+
virtual int connect(const fs::path& data_dir,
40+
const std::string& dest_exe_name,
41+
std::string& address) = 0;
42+
43+
//! Create listening socket, bind and canonicalize address, and return socket descriptor.
44+
virtual int bind(const fs::path& data_dir,
45+
const std::string& exe_name,
46+
std::string& address) = 0;
3747
};
3848

3949
//! Constructor for Process interface. Implementation will vary depending on

src/ipc/protocol.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,31 @@ class Protocol
2525

2626
//! Return Init interface that forwards requests over given socket descriptor.
2727
//! Socket communication is handled on a background thread.
28+
//!
29+
//! @note It could be potentially useful in the future to add
30+
//! std::function<void()> on_disconnect callback argument here. But there
31+
//! isn't an immediate need, because the protocol implementation can clean
32+
//! up its own state (calling ProxyServer destructors, etc) on disconnect,
33+
//! and any client calls will just throw ipc::Exception errors after a
34+
//! disconnect.
2835
virtual std::unique_ptr<interfaces::Init> connect(int fd, const char* exe_name) = 0;
2936

37+
//! Listen for connections on provided socket descriptor, accept them, and
38+
//! handle requests on accepted connections. This method doesn't block, and
39+
//! performs I/O on a background thread.
40+
virtual void listen(int listen_fd, const char* exe_name, interfaces::Init& init) = 0;
41+
3042
//! Handle requests on provided socket descriptor, forwarding them to the
3143
//! provided Init interface. Socket communication is handled on the
3244
//! current thread, and this call blocks until the socket is closed.
45+
//!
46+
//! @note: If this method is called, it needs be called before connect() or
47+
//! listen() methods, because for ease of implementation it's inflexible and
48+
//! always runs the event loop in the foreground thread. It can share its
49+
//! event loop with the other methods but can't share an event loop that was
50+
//! created by them. This isn't really a problem because serve() is only
51+
//! called by spawned child processes that call it immediately to
52+
//! communicate back with parent processes.
3353
virtual void serve(int fd, const char* exe_name, interfaces::Init& init) = 0;
3454

3555
//! Add cleanup callback to interface that will run when the interface is

0 commit comments

Comments
 (0)