Skip to content

Commit 20055c8

Browse files
committed
Move decryption workers into their own thread
1 parent be6362c commit 20055c8

File tree

6 files changed

+169
-102
lines changed

6 files changed

+169
-102
lines changed

src/platforms/macos/daemon/wgsessionmacos.cpp

Lines changed: 54 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ WgSessionMacos::~WgSessionMacos() {
7474
close(m_netSocket);
7575
}
7676
if (m_encryptWorker) {
77-
m_encryptWorker->shutdown();
77+
m_encryptWorker->stop();
7878
m_encryptWorker->wait();
7979
}
80-
81-
// Shut down the worker pools.
82-
m_decryptPool.clear();
83-
m_decryptPool.waitForDone();
80+
if (m_decryptWorker) {
81+
m_decryptWorker->stop();
82+
m_decryptWorker->wait();
83+
}
8484
}
8585

8686
void WgSessionMacos::processResult(int op, const QByteArray& buf) const {
@@ -137,22 +137,6 @@ void WgSessionMacos::renegotiate() {
137137
processResult(res.op, output.first(res.size));
138138
}
139139

140-
void WgSessionMacos::mhopInput(const QByteArray& data) {
141-
//logger.debug() << name() << "mhop:" << QString(data.toBase64());
142-
quint32 family;
143-
if (data.length() <= sizeof(family)) {
144-
return;
145-
}
146-
147-
QByteArray packet = data.mid(sizeof(family));
148-
quint8 version = (packet[0] >> 4);
149-
if (version == 4) {
150-
mhopInputV4(packet);
151-
} else if (version == 6) {
152-
mhopInputV6(packet);
153-
}
154-
}
155-
156140
WireguardUtils::PeerStatus WgSessionMacos::status() const {
157141
WireguardUtils::PeerStatus result(m_config.m_serverPublicKey);
158142

@@ -239,33 +223,51 @@ QByteArray WgSessionMacos::mhopHeader(const QByteArray& packet) const {
239223
return result;
240224
}
241225

242-
void WgSessionMacos::mhopInputV4(const QByteArray& packet) {
226+
QByteArray WgSessionMacos::mhopUnwrap(const QByteArray& packet) {
227+
if (packet.length() <= sizeof(quint32)) {
228+
return QByteArray();
229+
}
230+
231+
quint32 header = qFromBigEndian<quint32>(packet.constData());
232+
if (header == AF_INET) {
233+
return mhopInputV4(packet.sliced(sizeof(header)));
234+
} else if (header == AF_INET6) {
235+
return mhopInputV6(packet.sliced(sizeof(header)));
236+
} else {
237+
return QByteArray();
238+
}
239+
}
240+
241+
QByteArray WgSessionMacos::mhopInputV4(const QByteArray& packet) {
243242
// Parse the IPv4 header
244243
auto header = reinterpret_cast<const struct ipv4header*>(packet.constData());
245244
quint16 hlen = (header->ihl & 0xF) * 4;
245+
if ((header->ihl >> 4) != 4) {
246+
return QByteArray();
247+
}
246248
if ((hlen < sizeof(struct ipv4header)) || (hlen > packet.length())) {
247-
return;
249+
return QByteArray();
248250
}
249251

250252
// Validate the header.
251253
if ((qFromBigEndian(header->source) != m_serverIpv4.toIPv4Address()) ||
252254
(qFromBigEndian(header->dest) != m_innerIpv4.toIPv4Address()) ||
253255
(header->proto != IPPROTO_UDP) || (header->ttl == 0) ||
254256
inetChecksum(header, sizeof(struct ipv4header)) != 0x0000) {
255-
return;
257+
return QByteArray();
256258
}
257259

258260
// Handle IPv4 defragmentation
259-
QByteArray dgram = packet.mid(hlen);
261+
QByteArray dgram = packet.sliced(hlen);
260262
if (header->frag & qToBigEndian<quint16>(0x3fff)) {
261263
dgram = mhopDefragV4(header, dgram);
262264
if (dgram.isEmpty()) {
263-
return;
265+
return QByteArray();
264266
}
265267
}
266268

267269
// Process the UDP header
268-
mhopInputUDP(m_serverIpv4, m_innerIpv4, dgram);
270+
return mhopInputUDP(m_serverIpv4, m_innerIpv4, dgram);
269271
}
270272

271273
QByteArray WgSessionMacos::mhopDefragV4(const struct ipv4header* header,
@@ -301,33 +303,32 @@ QByteArray WgSessionMacos::mhopDefragV4(const struct ipv4header* header,
301303
return result;
302304
}
303305

304-
void WgSessionMacos::mhopInputV6(const QByteArray& packet) {
306+
QByteArray WgSessionMacos::mhopInputV6(const QByteArray& packet) {
305307
// TODO: Implement Me!
308+
return QByteArray();
306309
}
307310

308-
void WgSessionMacos::mhopInputUDP(const QHostAddress& src,
309-
const QHostAddress& dst,
310-
const QByteArray& dgram) {
311+
QByteArray WgSessionMacos::mhopInputUDP(const QHostAddress& src,
312+
const QHostAddress& dst,
313+
const QByteArray& dgram) {
311314
const quint16* hdr = reinterpret_cast<const quint16*>(dgram.constData());
312315
if ((dgram.length() < 8) || (hdr[0] != htons(m_serverPort)) ||
313316
(hdr[1] != htons(m_innerPort)) || (htons(hdr[2]) > dgram.length())) {
314317
logger.debug() << "mhop drop udp:" << dgram.toHex();
315-
return;
318+
return QByteArray();
316319
}
317320

318-
QByteArray data = dgram.mid(8);
321+
QByteArray data = dgram.sliced(8);
319322
if (hdr[3] != 0x0000) {
320323
// Validate the checksum
321324
quint16 cksum = udpChecksum(src, dst, htons(hdr[0]), htons(hdr[1]), data);
322325
if (hdr[3] != cksum) {
323326
logger.debug() << "mhop drop cksum:" << dgram.toHex();
324-
return;
327+
return QByteArray();
325328
}
326329
}
327330

328-
// At last - we can handle the payload.
329-
auto worker = new WgDecryptWorker(this, data);
330-
m_decryptPool.start(worker);
331+
return data;
331332
}
332333

333334
bool WgSessionMacos::start(const struct sockaddr* addr, int len) {
@@ -354,9 +355,15 @@ void WgSessionMacos::start(qintptr sd) {
354355
fcntl(sd, F_SETFL, flags | O_NONBLOCK);
355356

356357
m_netSocket = sd;
357-
auto notifier = new QSocketNotifier(m_netSocket, QSocketNotifier::Read, this);
358-
connect(notifier, &QSocketNotifier::activated, this,
359-
&WgSessionMacos::netReadyRead);
358+
359+
// Start the decryption worker.
360+
if (m_config.m_hopType == InterfaceConfig::MultiHopExit) {
361+
m_decryptWorker = new WgMultihopWorker(this, sd);
362+
} else {
363+
m_decryptWorker = new WgDecryptWorker(this, sd);
364+
}
365+
m_decryptWorker->setMtu(m_tunmtu);
366+
m_decryptWorker->start();
360367

361368
renegotiate();
362369
}
@@ -382,6 +389,13 @@ void WgSessionMacos::setMtu(int mtu) {
382389
}
383390
m_tunmtu = mtu;
384391

392+
if (m_encryptWorker) {
393+
m_encryptWorker->setMtu(mtu);
394+
}
395+
if (m_decryptWorker) {
396+
m_decryptWorker->setMtu(mtu);
397+
}
398+
385399
// Set the MTU if it's a utun device.
386400
struct ifreq ifr;
387401
socklen_t ifnamesize = sizeof(ifr.ifr_name);
@@ -394,29 +408,6 @@ void WgSessionMacos::setMtu(int mtu) {
394408
}
395409
}
396410

397-
void WgSessionMacos::netReadyRead() {
398-
QByteArray rxbuf;
399-
rxbuf.resize(m_tunmtu + WG_MTU_OVERHEAD);
400-
401-
while (true) {
402-
// Try to read a packet from the network.
403-
int rxlen = recv(m_netSocket, (void*)rxbuf.data(), rxbuf.length(), MSG_DONTWAIT);
404-
if (rxlen < 0) {
405-
if (errno == EAGAIN) return;
406-
logger.debug() << "Recv error:" << strerror(errno);
407-
return;
408-
}
409-
410-
QByteArray packet = rxbuf.first(rxlen);
411-
if (m_config.m_hopType == InterfaceConfig::MultiHopExit) {
412-
mhopInput(packet);
413-
} else {
414-
auto worker = new WgDecryptWorker(this, packet);
415-
m_decryptPool.start(worker);
416-
}
417-
}
418-
}
419-
420411
void WgSessionMacos::tunWrite(qintptr fd, const QByteArray& packet, const QByteArray& append) const {
421412
//logger.debug() << name() << "decrypt:" << QString(packet.toBase64());
422413
quint32 family = ((packet.at(0) >> 4) == 4) ? AF_INET : AF_INET6;

src/platforms/macos/daemon/wgsessionmacos.h

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ struct wireguard_tunnel;
1818
struct ipv4header;
1919
struct sockaddr;
2020

21-
class WgEncryptWorker;
22-
class WgDecryptWorker;
21+
class WgSessionWorker;
2322

2423
class WgSessionMacos final : public QObject {
2524
Q_OBJECT
@@ -42,22 +41,22 @@ class WgSessionMacos final : public QObject {
4241
static inline constexpr int WG_MTU_OVERHEAD = 80;
4342
static inline constexpr int WG_PACKET_OVERHEAD = 32;
4443

45-
protected slots:
46-
void netReadyRead();
44+
protected:
45+
QByteArray mhopUnwrap(const QByteArray& packet);
46+
void processResult(int op, const QByteArray& buf) const;
4747

4848
private:
4949
void timeout();
50-
void processResult(int op, const QByteArray& buf) const;
5150

5251
void tunWrite(qintptr fd, const QByteArray& packet,
5352
const QByteArray& append = QByteArray()) const;
5453

5554
QByteArray mhopHeader(const QByteArray& packet) const;
56-
void mhopInput(const QByteArray& packet);
57-
void mhopInputV4(const QByteArray& packet);
58-
void mhopInputV6(const QByteArray& packet);
59-
void mhopInputUDP(const QHostAddress& src, const QHostAddress& dst,
60-
const QByteArray& packet);
55+
56+
QByteArray mhopInputV4(const QByteArray& packet);
57+
QByteArray mhopInputV6(const QByteArray& packet);
58+
QByteArray mhopInputUDP(const QHostAddress& src, const QHostAddress& dst,
59+
const QByteArray& packet);
6160

6261
QByteArray mhopDefragV4(const struct ipv4header* header,
6362
const QByteArray& packet);
@@ -94,11 +93,12 @@ class WgSessionMacos final : public QObject {
9493

9594
protected:
9695
struct wireguard_tunnel* m_tunnel = nullptr;
97-
WgEncryptWorker* m_encryptWorker = nullptr;
98-
QThreadPool m_decryptPool;
96+
WgSessionWorker* m_encryptWorker = nullptr;
97+
WgSessionWorker* m_decryptWorker = nullptr;
9998

10099
friend class WgEncryptWorker;
101100
friend class WgDecryptWorker;
101+
friend class WgMultihopWorker;
102102
};
103103

104104
#endif // WGSESSIONMACOS_H

src/platforms/macos/daemon/wgsessionworker.cpp

Lines changed: 66 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,37 @@ namespace {
2323
Logger logger("WgSessionWorker");
2424
}; // namespace
2525

26-
WgEncryptWorker::WgEncryptWorker(WgSessionMacos* session, qintptr socket)
26+
WgSessionWorker::WgSessionWorker(WgSessionMacos* session, qintptr socket)
2727
: QThread(session), m_session(session) {
2828
MZ_COUNT_CTOR(WgSessionMacos);
29+
logger.debug() << metaObject()->className() << "created.";
30+
2931
m_socket = dup(socket);
3032
m_mtu = IPV6_MMTU;
3133

3234
int flags = fcntl(m_socket, F_GETFL, 0);
3335
fcntl(m_socket, F_SETFL, flags & ~O_NONBLOCK);
3436
}
3537

36-
WgEncryptWorker::~WgEncryptWorker() {
38+
WgSessionWorker::~WgSessionWorker() {
3739
MZ_COUNT_DTOR(WgSessionMacos);
40+
logger.debug() << metaObject()->className() << "destroyed.";
41+
3842
if (m_socket >= 0) {
3943
close(m_socket);
4044
}
4145
}
4246

47+
void WgSessionWorker::setMtu(int mtu) {
48+
logger.debug() << "set mtu:" << mtu;
49+
m_mtu = mtu;
50+
}
51+
52+
void WgSessionWorker::stop() {
53+
requestInterruption();
54+
shutdown(m_socket, SHUT_RD);
55+
}
56+
4357
void WgEncryptWorker::run() {
4458
quint32 header = 0;
4559
QByteArray packet;
@@ -90,18 +104,56 @@ void WgEncryptWorker::run() {
90104
}
91105
}
92106

93-
void WgEncryptWorker::shutdown() {
94-
requestInterruption();
95-
::shutdown(m_socket, SHUT_RD);
107+
void WgDecryptWorker::run() {
108+
QByteArray dgram;
109+
QByteArray decrypt;
110+
111+
while (!isInterruptionRequested()) {
112+
// Resize in case the MTU changed.
113+
int mtu = m_mtu.loadAcquire();
114+
dgram.resize(mtu + WgSessionMacos::WG_PACKET_OVERHEAD);
115+
decrypt.resize(mtu + WgSessionMacos::WG_PACKET_OVERHEAD);
116+
117+
// Try to read a packet from the network.
118+
uint8_t* ptr = reinterpret_cast<uint8_t*>(dgram.data());
119+
int len = recv(m_socket, ptr, dgram.length(), 0);
120+
if (len < 0) {
121+
if (errno == EAGAIN) continue;
122+
if (errno == EINTR) continue;
123+
logger.debug() << "Recv error:" << strerror(errno);
124+
return;
125+
}
126+
127+
uint8_t* dec = reinterpret_cast<uint8_t*>(decrypt.data());
128+
auto res = wireguard_read(m_session->m_tunnel, ptr, len, dec, decrypt.size());
129+
m_session->processResult(res.op, decrypt.first(res.size));
130+
}
96131
}
97132

98-
void WgDecryptWorker::run() {
99-
QByteArray output;
100-
output.resize(m_packet.size());
101-
102-
const uint8_t* ptr = reinterpret_cast<const uint8_t*>(m_packet.constData());
103-
uint8_t* outptr = reinterpret_cast<uint8_t*>(output.data());
104-
auto res = wireguard_read(m_session->m_tunnel, ptr, m_packet.size(), outptr,
105-
output.size());
106-
m_session->processResult(res.op, output.first(res.size));
133+
void WgMultihopWorker::run() {
134+
QByteArray dgram;
135+
QByteArray decrypt;
136+
137+
while (!isInterruptionRequested()) {
138+
// Resize in case the MTU changed.
139+
int mtu = m_mtu.loadAcquire();
140+
dgram.resize(mtu + WgSessionMacos::WG_PACKET_OVERHEAD);
141+
decrypt.resize(mtu + WgSessionMacos::WG_PACKET_OVERHEAD);
142+
143+
// Try to read a packet from the network.
144+
int len = recv(m_socket, (void*)dgram.data(), dgram.length(), 0);
145+
if (len < 0) {
146+
if (errno == EAGAIN) continue;
147+
if (errno == EINTR) continue;
148+
logger.debug() << "Recv error:" << strerror(errno);
149+
return;
150+
}
151+
152+
QByteArray packet = m_session->mhopUnwrap(dgram.first(len));
153+
const uint8_t* ptr = reinterpret_cast<const uint8_t*>(packet.constData());
154+
uint8_t* dec = reinterpret_cast<uint8_t*>(decrypt.data());
155+
auto res = wireguard_read(m_session->m_tunnel, ptr, packet.size(), dec,
156+
decrypt.size());
157+
m_session->processResult(res.op, decrypt.first(res.size));
158+
}
107159
}

0 commit comments

Comments
 (0)