Skip to content

Commit 99cb4b0

Browse files
committed
increase timeout and make connecting in parallel
1 parent 1bf47ac commit 99cb4b0

File tree

2 files changed

+75
-19
lines changed

2 files changed

+75
-19
lines changed

QubicServer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ namespace {
270270

271271
// Set accept timeout for periodic cleanup
272272
struct timeval tv;
273-
tv.tv_sec = 1;
273+
tv.tv_sec = 10;
274274
tv.tv_usec = 0;
275275
::setsockopt(listen_fd_, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv));
276276

connection/connection.cpp

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
#include <stdexcept>
44
#include <algorithm> // For std::min
55
#include <thread>
6-
76
#include <sys/socket.h>
87
#include <netinet/in.h>
98
#include <arpa/inet.h>
109
#include <unistd.h>
10+
#include <atomic>
1111
#include <cstring>
1212
#include <cerrno> // for errno
1313
#include <fcntl.h>
@@ -35,7 +35,7 @@ static int do_connect(const char* nodeIp, int nodePort)
3535

3636
// Configure timeouts (best-effort)
3737
struct timeval tv;
38-
tv.tv_sec = 2;
38+
tv.tv_sec = 10;
3939
tv.tv_usec = 0;
4040
if (setsockopt(serverSocket, SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof tv) < 0) {
4141
Logger::get()->warn("setsockopt(SO_RCVTIMEO) failed: {} ({})", errno, strerror(errno));
@@ -360,7 +360,7 @@ QubicConnection::QubicConnection(int existingSocket)
360360
if (mSocket >= 0) {
361361
// Configure timeouts (best-effort)
362362
struct timeval tv;
363-
tv.tv_sec = 2;
363+
tv.tv_sec = 10;
364364
tv.tv_usec = 0;
365365
if (setsockopt(mSocket, SOL_SOCKET, SO_RCVTIMEO, (const void*)&tv, sizeof tv) < 0) {
366366
Logger::get()->warn("setsockopt(SO_RCVTIMEO) failed: {} ({})", errno, strerror(errno));
@@ -380,7 +380,18 @@ QubicConnection::QubicConnection(int existingSocket)
380380
void parseConnection(ConnectionPool& connPoolAll,
381381
std::vector<std::string>& endpoints)
382382
{
383-
// Try endpoints in order, connect to the first that works
383+
struct ParsedEndpoint
384+
{
385+
std::string endpoint;
386+
std::string nodeType;
387+
std::string ip;
388+
int port = 0;
389+
bool has_passcode = false;
390+
uint64_t passcode_arr[4] = {0, 0, 0, 0};
391+
};
392+
393+
std::vector<ParsedEndpoint> parsedEndpoints;
394+
parsedEndpoints.reserve(endpoints.size());
384395

385396
for (const auto& endpoint : endpoints) {
386397
// Expected format: nodeType:ip:port[:pass0-pass1-pass2-pass3]
@@ -430,47 +441,92 @@ void parseConnection(ConnectionPool& connPoolAll,
430441
continue;
431442
}
432443

444+
ParsedEndpoint parsed;
445+
parsed.endpoint = endpoint;
446+
parsed.nodeType = nodeType;
447+
parsed.ip = ip;
448+
parsed.port = port;
449+
433450
// Optional passcode parsing
434-
bool has_passcode = false;
435-
uint64_t passcode_arr[4] = {0,0,0,0};
436451
if (!passcode_str.empty()) {
437-
// Split by '-'
438-
uint64_t parsed[4];
452+
uint64_t parsedPasscode[4] = {0,0,0,0};
439453
size_t start = 0;
440454
int idx = 0;
441455
while (idx < 4 && start <= passcode_str.size()) {
442456
size_t dash = passcode_str.find('-', start);
443457
auto token = passcode_str.substr(start, (dash == std::string::npos) ? std::string::npos : (dash - start));
444458
if (token.empty()) break;
445459
try {
446-
parsed[idx] = static_cast<uint64_t>(std::stoull(token, nullptr, 10));
460+
parsedPasscode[idx] = static_cast<uint64_t>(std::stoull(token, nullptr, 10));
447461
} catch (...) {
448-
idx = -1; // mark error
462+
idx = -1;
449463
break;
450464
}
451465
idx++;
452466
if (dash == std::string::npos) break;
453467
start = dash + 1;
454468
}
455469
if (idx == 4) {
456-
memcpy(passcode_arr, parsed, sizeof(parsed));
457-
has_passcode = true;
470+
memcpy(parsed.passcode_arr, parsedPasscode, sizeof(parsedPasscode));
471+
parsed.has_passcode = true;
458472
} else {
459473
Logger::get()->warn("Skipping endpoint '{}': invalid passcode format, expected 4 uint64 separated by '-'", endpoint);
460474
continue;
461475
}
462476
}
463477

464-
QCPtr conn = make_qc(ip.c_str(), port);
465-
conn->setNodeType(nodeType);
466-
if (has_passcode) {
467-
conn->updatePasscode(passcode_arr);
478+
parsedEndpoints.push_back(std::move(parsed));
479+
}
480+
481+
if (parsedEndpoints.empty()) {
482+
return;
483+
}
484+
485+
std::vector<QCPtr> createdConnections(parsedEndpoints.size());
486+
std::atomic<size_t> nextIndex{0};
487+
488+
unsigned int workerCount = parsedEndpoints.size();
489+
490+
std::vector<std::thread> workers;
491+
workers.reserve(workerCount);
492+
493+
for (unsigned int worker = 0; worker < workerCount; ++worker) {
494+
workers.emplace_back([&]() {
495+
while (true) {
496+
size_t index = nextIndex.fetch_add(1);
497+
if (index >= parsedEndpoints.size()) {
498+
break;
499+
}
500+
501+
const auto& ep = parsedEndpoints[index];
502+
QCPtr conn = make_qc(ep.ip.c_str(), ep.port);
503+
conn->setNodeType(ep.nodeType);
504+
if (ep.has_passcode) {
505+
conn->updatePasscode(const_cast<uint64_t*>(ep.passcode_arr));
506+
}
507+
createdConnections[index] = conn;
508+
}
509+
});
510+
}
511+
512+
for (auto& worker : workers) {
513+
if (worker.joinable()) {
514+
worker.join();
515+
}
516+
}
517+
518+
for (size_t i = 0; i < parsedEndpoints.size(); ++i) {
519+
const auto& ep = parsedEndpoints[i];
520+
QCPtr& conn = createdConnections[i];
521+
if (!conn) {
522+
Logger::get()->warn("Failed to create connection object for node {}", ep.endpoint);
523+
continue;
468524
}
469525
if (!conn->isSocketValid()) {
470-
Logger::get()->warn("Failed to connect to node {}. Will try to reconnect later", endpoint);
526+
Logger::get()->warn("Failed to connect to node {}. Will try to reconnect later", ep.endpoint);
471527
}
472528
connPoolAll.add(conn);
473-
Logger::get()->info("Added {} node {}:{}{}", nodeType, ip, port, has_passcode ? " (trusted)" : "");
529+
Logger::get()->info("Added {} node {}:{}{}", ep.nodeType, ep.ip, ep.port, ep.has_passcode ? " (trusted)" : "");
474530
}
475531
}
476532

0 commit comments

Comments
 (0)