Skip to content

Commit eb38ba3

Browse files
committed
Refactor remote connection into separate function
1 parent 5254495 commit eb38ba3

File tree

1 file changed

+115
-106
lines changed

1 file changed

+115
-106
lines changed

core/src/dataio.cxx

Lines changed: 115 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -21,119 +21,128 @@
2121

2222
#include "counter64.hpp"
2323

24+
static int
25+
connect_remote(const std::string &path, float timeout)
26+
{
27+
// TCP Socket. Two syntaxes:
28+
// - tcp://host:port -> connect to "host" on "port" and read
29+
// until EOF
30+
// - tcp://*:port -> listen on "port" for the first connection
31+
// and read until EOF
32+
33+
std::string host = path.substr(path.find("://") + 3);
34+
if (host.find(":") == host.npos)
35+
log_fatal("Could not open URL %s: unspecified port",
36+
path.c_str());
37+
std::string port = host.substr(host.find(":") + 1);
38+
host = host.substr(0, host.find(":"));
39+
40+
log_debug("Opening connection to %s, port %s", host.c_str(),
41+
port.c_str());
42+
43+
int fd = -1;
44+
45+
if (strcmp(host.c_str(), "*") == 0) {
46+
// Listen for incoming connections
47+
struct sockaddr_in6 sin;
48+
int no = 0, yes = 1;
49+
int lfd;
50+
51+
bzero(&sin, sizeof(sin));
52+
sin.sin6_family = AF_INET6;
53+
sin.sin6_port = htons(strtol(port.c_str(), NULL, 10));
54+
#ifdef SIN6_LEN
55+
sin.sin6_len = sizeof(sin);
56+
#endif
57+
58+
lfd = socket(PF_INET6, SOCK_STREAM, 0);
59+
if (lfd <= 0)
60+
log_fatal("Could not listen on %s (%s)",
61+
path.c_str(), strerror(errno));
62+
setsockopt(lfd, IPPROTO_IPV6, IPV6_V6ONLY, &no,
63+
sizeof(no));
64+
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &yes,
65+
sizeof(yes));
66+
67+
if (bind(lfd, (struct sockaddr *)&sin, sizeof(sin)) < 0)
68+
log_fatal("Could not bind on port %s (%s)",
69+
port.c_str(), strerror(errno));
70+
if (listen(lfd, 1) < 0)
71+
log_fatal("Could not listen on port %s (%s)",
72+
port.c_str(), strerror(errno));
73+
74+
log_debug("Waiting for connection on port %s",
75+
port.c_str());
76+
fd = accept(lfd, NULL, NULL);
77+
log_debug("Accepted connection on port %s",
78+
port.c_str());
79+
close(lfd);
80+
81+
return fd;
82+
}
83+
84+
// Connect to a listening host elsewhere
85+
86+
struct addrinfo hints, *info, *r;
87+
int err;
88+
89+
bzero(&hints, sizeof(hints));
90+
hints.ai_family = AF_UNSPEC;
91+
hints.ai_socktype = SOCK_STREAM;
92+
93+
err = getaddrinfo(host.c_str(), port.c_str(), &hints,
94+
&info);
95+
if (err != 0)
96+
log_fatal("Could not find host %s (%s)",
97+
host.c_str(), gai_strerror(err));
98+
99+
// Loop through possible addresses until we find one
100+
// that works.
101+
fd = -1;
102+
for (r = info; r != NULL; r = r->ai_next) {
103+
fd = socket(r->ai_family, r->ai_socktype,
104+
r->ai_protocol);
105+
if (fd == -1)
106+
continue;
107+
108+
if (connect(fd, r->ai_addr, r->ai_addrlen) ==
109+
-1) {
110+
close(fd);
111+
fd = -1;
112+
continue;
113+
}
114+
115+
break;
116+
}
117+
118+
if (fd == -1)
119+
log_fatal("Could not connect to %s (%s)",
120+
path.c_str(), strerror(errno));
121+
122+
if (timeout >= 0) {
123+
struct timeval tv;
124+
tv.tv_sec = (int)timeout;
125+
tv.tv_usec = (int)(1e6 * (timeout - tv.tv_sec));
126+
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,
127+
(char *)&tv, sizeof(tv)) < 0)
128+
log_fatal("Failed to set timeout on socket; errno=%i",
129+
errno);
130+
}
131+
132+
if (info != NULL)
133+
freeaddrinfo(info);
134+
135+
return fd;
136+
}
137+
24138
std::shared_ptr<std::istream>
25139
g3_istream_from_path(const std::string &path, float timeout, size_t buffersize)
26140
{
27141
auto stream = std::make_shared<boost::iostreams::filtering_istream>();
28142

29143
// Figure out what kind of ultimate data source this is
30144
if (path.find("tcp://") == 0) {
31-
// TCP Socket. Two syntaxes:
32-
// - tcp://host:port -> connect to "host" on "port" and read
33-
// until EOF
34-
// - tcp://*:port -> listen on "port" for the first connection
35-
// and read until EOF
36-
37-
std::string host = path.substr(path.find("://") + 3);
38-
if (host.find(":") == host.npos)
39-
log_fatal("Could not open URL %s: unspecified port",
40-
path.c_str());
41-
std::string port = host.substr(host.find(":") + 1);
42-
host = host.substr(0, host.find(":"));
43-
44-
log_debug("Opening connection to %s, port %s", host.c_str(),
45-
port.c_str());
46-
47-
int fd = -1;
48-
49-
if (strcmp(host.c_str(), "*") == 0) {
50-
// Listen for incoming connections
51-
struct sockaddr_in6 sin;
52-
int no = 0, yes = 1;
53-
int lfd;
54-
55-
bzero(&sin, sizeof(sin));
56-
sin.sin6_family = AF_INET6;
57-
sin.sin6_port = htons(strtol(port.c_str(), NULL, 10));
58-
#ifdef SIN6_LEN
59-
sin.sin6_len = sizeof(sin);
60-
#endif
61-
62-
lfd = socket(PF_INET6, SOCK_STREAM, 0);
63-
if (lfd <= 0)
64-
log_fatal("Could not listen on %s (%s)",
65-
path.c_str(), strerror(errno));
66-
setsockopt(lfd, IPPROTO_IPV6, IPV6_V6ONLY, &no,
67-
sizeof(no));
68-
setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &yes,
69-
sizeof(yes));
70-
71-
if (bind(lfd, (struct sockaddr *)&sin, sizeof(sin)) < 0)
72-
log_fatal("Could not bind on port %s (%s)",
73-
port.c_str(), strerror(errno));
74-
if (listen(lfd, 1) < 0)
75-
log_fatal("Could not listen on port %s (%s)",
76-
port.c_str(), strerror(errno));
77-
78-
log_debug("Waiting for connection on port %s",
79-
port.c_str());
80-
fd = accept(lfd, NULL, NULL);
81-
log_debug("Accepted connection on port %s",
82-
port.c_str());
83-
close(lfd);
84-
} else {
85-
// Connect to a listening host elsewhere
86-
87-
struct addrinfo hints, *info, *r;
88-
int err;
89-
90-
bzero(&hints, sizeof(hints));
91-
hints.ai_family = AF_UNSPEC;
92-
hints.ai_socktype = SOCK_STREAM;
93-
94-
err = getaddrinfo(host.c_str(), port.c_str(), &hints,
95-
&info);
96-
if (err != 0)
97-
log_fatal("Could not find host %s (%s)",
98-
host.c_str(), gai_strerror(err));
99-
100-
// Loop through possible addresses until we find one
101-
// that works.
102-
fd = -1;
103-
for (r = info; r != NULL; r = r->ai_next) {
104-
fd = socket(r->ai_family, r->ai_socktype,
105-
r->ai_protocol);
106-
if (fd == -1)
107-
continue;
108-
109-
if (connect(fd, r->ai_addr, r->ai_addrlen) ==
110-
-1) {
111-
close(fd);
112-
fd = -1;
113-
continue;
114-
}
115-
116-
break;
117-
}
118-
119-
if (fd == -1)
120-
log_fatal("Could not connect to %s (%s)",
121-
path.c_str(), strerror(errno));
122-
123-
if (timeout >= 0) {
124-
struct timeval tv;
125-
tv.tv_sec = (int)timeout;
126-
tv.tv_usec = (int)(1e6 * (timeout - tv.tv_sec));
127-
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO,
128-
(char *)&tv, sizeof(tv)) < 0)
129-
log_fatal("Failed to set timeout on socket; errno=%i",
130-
errno);
131-
}
132-
133-
if (info != NULL)
134-
freeaddrinfo(info);
135-
}
136-
145+
int fd = connect_remote(path, timeout);
137146
boost::iostreams::file_descriptor_source fs(fd,
138147
boost::iostreams::close_handle);
139148
stream->push(fs, buffersize);

0 commit comments

Comments
 (0)