diff --git a/lib/net_mosq.c b/lib/net_mosq.c index e55be79b8..b44ff7f63 100644 --- a/lib/net_mosq.c +++ b/lib/net_mosq.c @@ -279,6 +279,390 @@ int net__socket_shutdown(struct mosquitto *mosq) return rc; } +#if !defined(WIN32) && !defined(__QNX__) +#define HAVE_PSELECT +#endif + +#define timespecsub(tsp, usp, vsp) \ + do { \ + (vsp)->tv_sec = (tsp)->tv_sec - (usp)->tv_sec; \ + (vsp)->tv_nsec = (tsp)->tv_nsec - (usp)->tv_nsec; \ + if ((vsp)->tv_nsec < 0) { \ + (vsp)->tv_sec--; \ + (vsp)->tv_nsec += 1000000000L; \ + } \ + } while (0) + +/* + * Happy Eyeballs (RFC 8305) + * This implementation races IPv6 candidates sequentially and starts IPv4 + * after a 250ms delay, or immediately if no IPv6 candidate can be started. + * This improves connection latency when IPv6 is broken or slow. + */ +#define MOSQ_HE_DELAY_MS 250 /* Happy Eyeballs fallback delay */ + +/* + * Holds resolved bind addresses for IPv4/IPv6. + * ai_v4 and ai_v6 are lazily populated only when a candidate of that family + * is attempted, to avoid unnecessary getaddrinfo() calls. + */ +struct net__bind_info { + const char *bind_address; + int bind_port; + struct addrinfo *ai_v4; + struct addrinfo *ai_v6; +}; + +/* + * Represents a single connection attempt target. + * pending: + * 0 = not attempted + * 1 = connect() in progress (EINPROGRESS) + * 2 = connected (winner) + */ +#define MOSQ_CON_CAND_SIZE 16 +struct net__conn_candidate { + struct addrinfo *ai; + mosq_sock_t sock; + int family; + int pending; +}; + +/* + * Resolves the bind address for a specific family (IPv4 or IPv6). + * This is only used when the caller has explicitly requested a bind address. + */ +static int net__resolve_bind_ai(const char *bind_address, int bind_port, int family, struct addrinfo **out) +{ + char portstr[8]; + struct addrinfo hints; + + if(!bind_address || !out){ + return MOSQ_ERR_INVAL; + } + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = family; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_NUMERICSERV; + + snprintf(portstr, sizeof(portstr), "%d", (bind_port > 0 && bind_port < 65535) ? bind_port : 0); + + if(getaddrinfo(bind_address, portstr, &hints, out) != 0){ + return MOSQ_ERR_EAI; + } + + return MOSQ_ERR_SUCCESS; +} + +/* build_bind_info: init bind info + resolve v4/v6 if available */ +static int net__build_bind_info(const char *bind_address, struct net__bind_info *bindinfo) +{ + struct addrinfo *tmp = NULL; + + if(!bindinfo || bind_address == NULL){ + return MOSQ_ERR_INVAL; + } + + bindinfo->bind_address = bind_address; + bindinfo->bind_port = 0; + bindinfo->ai_v4 = NULL; + bindinfo->ai_v6 = NULL; + + if (net__resolve_bind_ai(bindinfo->bind_address, bindinfo->bind_port, AF_INET, &tmp) == MOSQ_ERR_SUCCESS) { + ((struct net__bind_info*)bindinfo)->ai_v4 = tmp; + } + + if (net__resolve_bind_ai(bindinfo->bind_address, bindinfo->bind_port, AF_INET6, &tmp) == MOSQ_ERR_SUCCESS) { + ((struct net__bind_info*)bindinfo)->ai_v6 = tmp; + } + + if(!bindinfo->ai_v6 && !bindinfo->ai_v4) { + return MOSQ_ERR_EAI; + } + return MOSQ_ERR_SUCCESS; +} + +/* + * Create a non-blocking socket and optionally bind it to a user-specified + * local address. This function may return immediately successful or mark the + * socket as pending if connect() returns EINPROGRESS. + */ +static int net__socket_connect_nb(const struct addrinfo *ai, const struct net__bind_info *bindinfo, mosq_sock_t *sock_out) +{ + mosq_sock_t s; + const struct addrinfo *bai = NULL; + + if(!ai || !sock_out){ + return MOSQ_ERR_INVAL; + } + + s = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if(s == INVALID_SOCKET){ + *sock_out = INVALID_SOCKET; + return MOSQ_ERR_ERRNO; + } + + /* Set socket to non-blocking mode. */ + if(net__socket_nonblock(&s)){ + COMPAT_CLOSE(s); + *sock_out = INVALID_SOCKET; + return MOSQ_ERR_ERRNO; + } + + /* Bind only if user provided bind_address; skip otherwise */ + if(bindinfo && bindinfo->bind_address && bindinfo->bind_address[0]){ + if(ai->ai_family == AF_INET){ + bai = bindinfo->ai_v4; + }else if(ai->ai_family == AF_INET6){ + bai = bindinfo->ai_v6; + } + + if(bai){ + if(bind(s, bai->ai_addr, (socklen_t)bai->ai_addrlen) != 0){ + COMPAT_CLOSE(s); + *sock_out = INVALID_SOCKET; + return MOSQ_ERR_ERRNO; + } + } + } + + if(connect(s, ai->ai_addr, (socklen_t)ai->ai_addrlen) == 0){ + *sock_out = s; + return MOSQ_ERR_SUCCESS; + } + + WINDOWS_SET_ERRNO(); + if(errno != EINPROGRESS && errno != COMPAT_EWOULDBLOCK){ + COMPAT_CLOSE(s); + *sock_out = INVALID_SOCKET; + return MOSQ_ERR_ERRNO; + } + + *sock_out = s; + return MOSQ_ERR_CONN_PENDING; +} + + +/* + * Build ordered candidate list: all IPv6 entries first, then IPv4. + * Each candidate contains the resolved sockaddr for connect(). + * Port is set by using the service string or by mutating ai->ai_addr. + */ +static int net__build_candidates(const char *host, uint16_t port, struct net__conn_candidate *cands, int *count, struct addrinfo **addrlist) +{ + struct addrinfo hints; + struct addrinfo *ai; + int i = 0; + + if(!addrlist || !cands || !count) { + return MOSQ_ERR_INVAL; + } + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + if(getaddrinfo(host, NULL, &hints, addrlist) != 0){ + return MOSQ_ERR_EAI; + } + + for(ai = *addrlist; ai && i < MOSQ_CON_CAND_SIZE; ai = ai->ai_next){ + if(ai->ai_family == AF_INET6){ + ((struct sockaddr_in6 *)ai->ai_addr)->sin6_port = htons(port); + + cands[i].ai = ai; + cands[i].sock = INVALID_SOCKET; + cands[i].family = AF_INET6; + cands[i].pending = 0; + i++; + } + } + for(ai = *addrlist; ai && i < MOSQ_CON_CAND_SIZE; ai = ai->ai_next){ + if(ai->ai_family == AF_INET){ + ((struct sockaddr_in *)ai->ai_addr)->sin_port = htons(port); + + cands[i].ai = ai; + cands[i].sock = INVALID_SOCKET; + cands[i].family = AF_INET; + cands[i].pending = 0; + i++; + } + } + + *count = i; + return MOSQ_ERR_SUCCESS; +} + +static int net__start_candidate(int family, struct net__conn_candidate *cands, int cand_count, struct net__bind_info *bindinfo) +{ + int rc; + int next_index = 0; + + /* + * Start the next candidate of the requested family. + * This function skips immediately failing entries and marks successful + * or EINPROGRESS sockets as pending. + */ + while (next_index < cand_count) { + if (cands[next_index].family == family) { + rc = net__socket_connect_nb(cands[next_index].ai, bindinfo, &(cands[next_index].sock)); + if ((rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_CONN_PENDING) && + cands[next_index].sock != INVALID_SOCKET) { + cands[next_index].pending = 1; + return MOSQ_ERR_SUCCESS; + } + } + next_index++; + } + + return MOSQ_ERR_NO_CONN; +} + + +/* + * Main Happy Eyeballs loop. + * Sequentially tries all IPv6 candidates. Once HE delay expires, IPv4 + * candidates begin. All pending sockets enter a select() race; first to + * complete successfully wins and all others are closed. + */ +static int net__connect_happy_eyeballs(const char *host, uint16_t port, mosq_sock_t *sock, const char *bind_address) +{ + int rc = MOSQ_ERR_SUCCESS; + + struct addrinfo *ainfo = NULL; + struct net__bind_info bindinfo; + struct net__conn_candidate cands[MOSQ_CON_CAND_SIZE]; + int cand_count = 0; + + fd_set wfds; + mosq_sock_t maxfd; + int active; + + int i; +#ifdef HAVE_PSELECT + struct timespec local_timeout; +#else + struct timeval local_timeout; +#endif + struct timespec start, end, diff; + long elapsed, wait_ms; + int v4_started = 0, v6_started = 0; + + int err = 0; + socklen_t len; + *sock = INVALID_SOCKET; + + rc = net__build_candidates(host, port, cands, &cand_count, &ainfo); + if(rc != MOSQ_ERR_SUCCESS || cand_count == 0) { + if (ainfo) { + freeaddrinfo(ainfo); + } + return MOSQ_ERR_NO_CONN; + } + + memset(&bindinfo, 0, sizeof(struct net__bind_info)); + net__build_bind_info(bind_address, &bindinfo); + + /* Start first IPv6 candidate immediately (preferred per RFC 8305). */ + rc = net__start_candidate(AF_INET6, cands, cand_count, &bindinfo); + if (rc == MOSQ_ERR_SUCCESS) { + v6_started = 1; + } + + while(*sock == INVALID_SOCKET){ + FD_ZERO(&wfds); + maxfd = 0; + active = 0; + + clock_gettime(CLOCK_MONOTONIC, &end); + timespecsub(&end, &start, &diff); + elapsed = diff.tv_sec * 1000 + diff.tv_nsec / 1000000L; + /* + * Start IPv4 once the fallback delay has elapsed, or immediately if no + * IPv6 candidate could be started. + */ + if(!v4_started && (elapsed >= MOSQ_HE_DELAY_MS || !v6_started)){ + net__start_candidate(AF_INET, cands, cand_count, &bindinfo); + v4_started = 1; + } + /* Add all currently pending sockets to fdset for connect completion. */ + for(i = 0; i < cand_count; i++){ + if(cands[i].pending && + cands[i].sock != INVALID_SOCKET){ + FD_SET(cands[i].sock, &wfds); + if(cands[i].sock > maxfd) { + maxfd = cands[i].sock; + } + active++; + } + } + + if(!active) { + rc = MOSQ_ERR_NO_CONN; + break; + } + + wait_ms = (v4_started) ? MOSQ_HE_DELAY_MS : MOSQ_HE_DELAY_MS - elapsed; + + local_timeout.tv_sec = 0; +#ifdef HAVE_PSELECT + local_timeout.tv_nsec = wait_ms * 1000000L; + rc = pselect(maxfd + 1, NULL, &wfds, NULL, &local_timeout, NULL); +#else + local_timeout.tv_usec = wait_ms * 1000; + rc = select(maxfd + 1, NULL, &wfds, NULL, &local_timeout); +#endif + if(rc <= 0 && v4_started) { + rc = MOSQ_ERR_ERRNO; + break; + } + + for(i = 0; i < cand_count; i++){ + if(cands[i].pending && cands[i].sock != INVALID_SOCKET && + FD_ISSET(cands[i].sock, &wfds)){ + + /* + * Validate the non-blocking connect completion. + * A socket becoming writable does NOT guarantee success; + * SO_ERROR must be checked to detect ECONNREFUSED, ETIMEDOUT, etc. + */ + if(!getsockopt(cands[i].sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ + if (err != 0) { + COMPAT_CLOSE(cands[i].sock); + cands[i].sock = INVALID_SOCKET; + cands[i].pending = 0; + continue; + } + } + *sock = cands[i].sock; + cands[i].pending = 2; + rc = MOSQ_ERR_SUCCESS; + break; + } + } + } + /* Close all sockets except the winner (pending == 2). */ + for(i = 0; i < cand_count; i++){ + if(cands[i].pending != 2 && cands[i].sock != INVALID_SOCKET){ + COMPAT_CLOSE(cands[i].sock); + } + } + /* Free bind-info and address resolution data. */ + if(bindinfo.ai_v4) { + freeaddrinfo(bindinfo.ai_v4); + } + if(bindinfo.ai_v6) { + freeaddrinfo(bindinfo.ai_v6); + } + freeaddrinfo(ainfo); + /* + * Return MOSQ_ERR_SUCCESS if any socket connected, + * otherwise propagate the failure code determined above. + */ + return rc; +} + #ifdef FINAL_WITH_TLS_PSK @@ -564,6 +948,8 @@ int net__try_connect(const char *host, uint16_t port, mosq_sock_t *sock, const c #else return MOSQ_ERR_NOT_SUPPORTED; #endif + }else if (!blocking){ + return net__connect_happy_eyeballs(host, port, sock, bind_address); }else{ return net__try_connect_tcp(host, port, sock, bind_address, blocking); }