Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 386 additions & 0 deletions lib/net_mosq.c
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,390 @@ int net__socket_shutdown(struct mosquitto *mosq)
return rc;
}

#if !defined(WIN32) && !defined(__QNX__)
#define HAVE_PSELECT
#endif

#define timespecsub(tsp, usp, vsp) \
do { \
(vsp)->tv_sec = (tsp)->tv_sec - (usp)->tv_sec; \
(vsp)->tv_nsec = (tsp)->tv_nsec - (usp)->tv_nsec; \
if ((vsp)->tv_nsec < 0) { \
(vsp)->tv_sec--; \
(vsp)->tv_nsec += 1000000000L; \
} \
} while (0)

/*
* Happy Eyeballs (RFC 8305)
* This implementation races IPv6 candidates sequentially and starts IPv4
* after a 250ms delay, or immediately if no IPv6 candidate can be started.
* This improves connection latency when IPv6 is broken or slow.
*/
#define MOSQ_HE_DELAY_MS 250 /* Happy Eyeballs fallback delay */

/*
* Holds resolved bind addresses for IPv4/IPv6.
* ai_v4 and ai_v6 are lazily populated only when a candidate of that family
* is attempted, to avoid unnecessary getaddrinfo() calls.
*/
struct net__bind_info {
const char *bind_address;
int bind_port;
struct addrinfo *ai_v4;
struct addrinfo *ai_v6;
};

/*
* Represents a single connection attempt target.
* pending:
* 0 = not attempted
* 1 = connect() in progress (EINPROGRESS)
* 2 = connected (winner)
*/
#define MOSQ_CON_CAND_SIZE 16
struct net__conn_candidate {
struct addrinfo *ai;
mosq_sock_t sock;
int family;
int pending;
};

/*
* Resolves the bind address for a specific family (IPv4 or IPv6).
* This is only used when the caller has explicitly requested a bind address.
*/
static int net__resolve_bind_ai(const char *bind_address, int bind_port, int family, struct addrinfo **out)
{
char portstr[8];
struct addrinfo hints;

if(!bind_address || !out){
return MOSQ_ERR_INVAL;
}

memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = family;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_NUMERICSERV;

snprintf(portstr, sizeof(portstr), "%d", (bind_port > 0 && bind_port < 65535) ? bind_port : 0);

if(getaddrinfo(bind_address, portstr, &hints, out) != 0){
return MOSQ_ERR_EAI;
}

return MOSQ_ERR_SUCCESS;
}

/* build_bind_info: init bind info + resolve v4/v6 if available */
static int net__build_bind_info(const char *bind_address, struct net__bind_info *bindinfo)
{
struct addrinfo *tmp = NULL;

if(!bindinfo || bind_address == NULL){
return MOSQ_ERR_INVAL;
}

bindinfo->bind_address = bind_address;
bindinfo->bind_port = 0;
bindinfo->ai_v4 = NULL;
bindinfo->ai_v6 = NULL;

if (net__resolve_bind_ai(bindinfo->bind_address, bindinfo->bind_port, AF_INET, &tmp) == MOSQ_ERR_SUCCESS) {
((struct net__bind_info*)bindinfo)->ai_v4 = tmp;
}

if (net__resolve_bind_ai(bindinfo->bind_address, bindinfo->bind_port, AF_INET6, &tmp) == MOSQ_ERR_SUCCESS) {
((struct net__bind_info*)bindinfo)->ai_v6 = tmp;
}

if(!bindinfo->ai_v6 && !bindinfo->ai_v4) {
return MOSQ_ERR_EAI;
}
return MOSQ_ERR_SUCCESS;
}

/*
* Create a non-blocking socket and optionally bind it to a user-specified
* local address. This function may return immediately successful or mark the
* socket as pending if connect() returns EINPROGRESS.
*/
static int net__socket_connect_nb(const struct addrinfo *ai, const struct net__bind_info *bindinfo, mosq_sock_t *sock_out)
{
mosq_sock_t s;
const struct addrinfo *bai = NULL;

if(!ai || !sock_out){
return MOSQ_ERR_INVAL;
}

s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if(s == INVALID_SOCKET){
*sock_out = INVALID_SOCKET;
return MOSQ_ERR_ERRNO;
}

/* Set socket to non-blocking mode. */
if(net__socket_nonblock(&s)){
COMPAT_CLOSE(s);
*sock_out = INVALID_SOCKET;
return MOSQ_ERR_ERRNO;
}

/* Bind only if user provided bind_address; skip otherwise */
if(bindinfo && bindinfo->bind_address && bindinfo->bind_address[0]){
if(ai->ai_family == AF_INET){
bai = bindinfo->ai_v4;
}else if(ai->ai_family == AF_INET6){
bai = bindinfo->ai_v6;
}

if(bai){
if(bind(s, bai->ai_addr, (socklen_t)bai->ai_addrlen) != 0){
COMPAT_CLOSE(s);
*sock_out = INVALID_SOCKET;
return MOSQ_ERR_ERRNO;
}
}
}

if(connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) == 0){
*sock_out = s;
return MOSQ_ERR_SUCCESS;
}

WINDOWS_SET_ERRNO();
if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){
COMPAT_CLOSE(s);
*sock_out = INVALID_SOCKET;
return MOSQ_ERR_ERRNO;
}

*sock_out = s;
return MOSQ_ERR_CONN_PENDING;
}


/*
* Build ordered candidate list: all IPv6 entries first, then IPv4.
* Each candidate contains the resolved sockaddr for connect().
* Port is set by using the service string or by mutating ai->ai_addr.
*/
static int net__build_candidates(const char *host, uint16_t port, struct net__conn_candidate *cands, int *count, struct addrinfo **addrlist)
{
struct addrinfo hints;
struct addrinfo *ai;
int i = 0;

if(!addrlist || !cands || !count) {
return MOSQ_ERR_INVAL;
}

memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;

if(getaddrinfo(host, NULL, &hints, addrlist) != 0){
return MOSQ_ERR_EAI;
}

for(ai = *addrlist; ai && i < MOSQ_CON_CAND_SIZE; ai = ai->ai_next){
if(ai->ai_family == AF_INET6){
((struct sockaddr_in6 *)ai->ai_addr)->sin6_port = htons(port);

cands[i].ai = ai;
cands[i].sock = INVALID_SOCKET;
cands[i].family = AF_INET6;
cands[i].pending = 0;
i++;
}
}
for(ai = *addrlist; ai && i < MOSQ_CON_CAND_SIZE; ai = ai->ai_next){
if(ai->ai_family == AF_INET){
((struct sockaddr_in *)ai->ai_addr)->sin_port = htons(port);

cands[i].ai = ai;
cands[i].sock = INVALID_SOCKET;
cands[i].family = AF_INET;
cands[i].pending = 0;
i++;
}
}

*count = i;
return MOSQ_ERR_SUCCESS;
}

static int net__start_candidate(int family, struct net__conn_candidate *cands, int cand_count, struct net__bind_info *bindinfo)
{
int rc;
int next_index = 0;

/*
* Start the next candidate of the requested family.
* This function skips immediately failing entries and marks successful
* or EINPROGRESS sockets as pending.
*/
while (next_index < cand_count) {
if (cands[next_index].family == family) {
rc = net__socket_connect_nb(cands[next_index].ai, bindinfo, &(cands[next_index].sock));
if ((rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING) &&
cands[next_index].sock != INVALID_SOCKET) {
cands[next_index].pending = 1;
return MOSQ_ERR_SUCCESS;
}
}
next_index++;
}

return MOSQ_ERR_NO_CONN;
}


/*
* Main Happy Eyeballs loop.
* Sequentially tries all IPv6 candidates. Once HE delay expires, IPv4
* candidates begin. All pending sockets enter a select() race; first to
* complete successfully wins and all others are closed.
*/
static int net__connect_happy_eyeballs(const char *host, uint16_t port, mosq_sock_t *sock, const char *bind_address)
{
int rc = MOSQ_ERR_SUCCESS;

struct addrinfo *ainfo = NULL;
struct net__bind_info bindinfo;
struct net__conn_candidate cands[MOSQ_CON_CAND_SIZE];
int cand_count = 0;

fd_set wfds;
mosq_sock_t maxfd;
int active;

int i;
#ifdef HAVE_PSELECT
struct timespec local_timeout;
#else
struct timeval local_timeout;
#endif
struct timespec start, end, diff;
long elapsed, wait_ms;
int v4_started = 0, v6_started = 0;

int err = 0;
socklen_t len;
*sock = INVALID_SOCKET;

rc = net__build_candidates(host, port, cands, &cand_count, &ainfo);
if(rc != MOSQ_ERR_SUCCESS || cand_count == 0) {
if (ainfo) {
freeaddrinfo(ainfo);
}
return MOSQ_ERR_NO_CONN;
}

memset(&bindinfo, 0, sizeof(struct net__bind_info));
net__build_bind_info(bind_address, &bindinfo);

/* Start first IPv6 candidate immediately (preferred per RFC 8305). */
rc = net__start_candidate(AF_INET6, cands, cand_count, &bindinfo);
if (rc == MOSQ_ERR_SUCCESS) {
v6_started = 1;
}

while(*sock == INVALID_SOCKET){
FD_ZERO(&wfds);
maxfd = 0;
active = 0;

clock_gettime(CLOCK_MONOTONIC, &end);
timespecsub(&end, &start, &diff);
elapsed = diff.tv_sec * 1000 + diff.tv_nsec / 1000000L;
/*
* Start IPv4 once the fallback delay has elapsed, or immediately if no
* IPv6 candidate could be started.
*/
if(!v4_started && (elapsed >= MOSQ_HE_DELAY_MS || !v6_started)){
net__start_candidate(AF_INET, cands, cand_count, &bindinfo);
v4_started = 1;
}
/* Add all currently pending sockets to fdset for connect completion. */
for(i = 0; i < cand_count; i++){
if(cands[i].pending &&
cands[i].sock != INVALID_SOCKET){
FD_SET(cands[i].sock, &wfds);
if(cands[i].sock > maxfd) {
maxfd = cands[i].sock;
}
active++;
}
}

if(!active) {
rc = MOSQ_ERR_NO_CONN;
break;
}

wait_ms = (v4_started) ? MOSQ_HE_DELAY_MS : MOSQ_HE_DELAY_MS - elapsed;

local_timeout.tv_sec = 0;
#ifdef HAVE_PSELECT
local_timeout.tv_nsec = wait_ms * 1000000L;
rc = pselect(maxfd + 1, NULL, &wfds, NULL, &local_timeout, NULL);
#else
local_timeout.tv_usec = wait_ms * 1000;
rc = select(maxfd + 1, NULL, &wfds, NULL, &local_timeout);
#endif
if(rc <= 0 && v4_started) {
rc = MOSQ_ERR_ERRNO;
break;
}

for(i = 0; i < cand_count; i++){
if(cands[i].pending && cands[i].sock != INVALID_SOCKET &&
FD_ISSET(cands[i].sock, &wfds)){

/*
* Validate the non-blocking connect completion.
* A socket becoming writable does NOT guarantee success;
* SO_ERROR must be checked to detect ECONNREFUSED, ETIMEDOUT, etc.
*/
if(!getsockopt(cands[i].sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){
if (err != 0) {
COMPAT_CLOSE(cands[i].sock);
cands[i].sock = INVALID_SOCKET;
cands[i].pending = 0;
continue;
}
}
*sock = cands[i].sock;
cands[i].pending = 2;
rc = MOSQ_ERR_SUCCESS;
break;
}
}
}
/* Close all sockets except the winner (pending == 2). */
for(i = 0; i < cand_count; i++){
if(cands[i].pending != 2 && cands[i].sock != INVALID_SOCKET){
COMPAT_CLOSE(cands[i].sock);
}
}
/* Free bind-info and address resolution data. */
if(bindinfo.ai_v4) {
freeaddrinfo(bindinfo.ai_v4);
}
if(bindinfo.ai_v6) {
freeaddrinfo(bindinfo.ai_v6);
}
freeaddrinfo(ainfo);
/*
* Return MOSQ_ERR_SUCCESS if any socket connected,
* otherwise propagate the failure code determined above.
*/
return rc;
}


#ifdef FINAL_WITH_TLS_PSK

Expand Down Expand Up @@ -564,6 +948,8 @@ int net__try_connect(const char *host, uint16_t port, mosq_sock_t *sock, const c
#else
return MOSQ_ERR_NOT_SUPPORTED;
#endif
}else if (!blocking){
return net__connect_happy_eyeballs(host, port, sock, bind_address);
}else{
return net__try_connect_tcp(host, port, sock, bind_address, blocking);
}
Expand Down
Loading