Skip to content

Commit a4617aa

Browse files
committed
add first ultra hacky AsyncUDP support
I am not proud of this code to say the least.
1 parent e54db5e commit a4617aa

File tree

2 files changed

+244
-0
lines changed

2 files changed

+244
-0
lines changed

cores/portduino/AsyncUDP.cpp

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include "AsyncUDP.h"
2+
3+
void _asyncudp_async_cb(uv_async_t *handle) {
4+
AsyncUDP *udp = (AsyncUDP *)handle->data;
5+
udp->_DO_NOT_CALL_async_cb();
6+
};
7+
8+
AsyncUDP::AsyncUDP() {
9+
_handler = NULL;
10+
_connected = false;
11+
uv_loop_init(&_loop);
12+
_async.data = this;
13+
uv_async_init(&_loop, &_async, _asyncudp_async_cb);
14+
};
15+
16+
void _asyncudp_alloc_buffer_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
17+
buf->base = (char *)malloc(suggested_size);
18+
buf->len = suggested_size;
19+
};
20+
21+
void _asyncudp_on_read_cb(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
22+
AsyncUDP *udp = (AsyncUDP *)handle->data;
23+
udp->_DO_NOT_CALL_uv_on_read(handle, nread, buf, addr, flags);
24+
}
25+
26+
void AsyncUDP::_DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags) {
27+
_handlerMutex.lock();
28+
auto h = _handler;
29+
_handlerMutex.unlock();
30+
if (h) {
31+
AsyncUDPPacket packet((uint8_t*)buf->base, nread);
32+
h(packet);
33+
}
34+
free(buf->base);
35+
};
36+
37+
bool AsyncUDP::listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl) {
38+
if (_connected) {
39+
return false;
40+
}
41+
// FIXME: implement error handling rather than raising SIGSEGV
42+
if (uv_udp_init(&_loop, &_socket) < 0) {
43+
raise(SIGSEGV);
44+
}
45+
_socket.data = this;
46+
// FIXME: don't do bytes → string → bytes IP conversion
47+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
48+
char addr_str[maxIpLength+1]; // +1 for null terminator
49+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
50+
addr_str[maxIpLength] = '\0';
51+
struct sockaddr uvAddr;
52+
if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) {
53+
raise(SIGSEGV);
54+
}
55+
if (uv_udp_bind(&_socket, (const struct sockaddr *)&uvAddr, 0) < 0) {
56+
raise(SIGSEGV);
57+
}
58+
if (uv_udp_set_multicast_loop(&_socket, false) < 0) {
59+
raise(SIGSEGV);
60+
}
61+
if (uv_udp_set_multicast_ttl(&_socket, ttl) < 0) {
62+
raise(SIGSEGV);
63+
}
64+
if (uv_udp_set_membership(&_socket, addr_str, NULL, UV_JOIN_GROUP) < 0) {
65+
raise(SIGSEGV);
66+
}
67+
if (uv_udp_recv_start(&_socket, _asyncudp_alloc_buffer_cb, _asyncudp_on_read_cb) < 0) {
68+
raise(SIGSEGV);
69+
}
70+
71+
_ioThread = std::thread([this](){
72+
uv_run(&_loop, UV_RUN_DEFAULT);
73+
});
74+
75+
_listenIP = addr;
76+
_connected = true;
77+
return true;
78+
};
79+
80+
size_t AsyncUDP::writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
81+
auto task = std::make_unique<asyncUDPSendTask>((uint8_t*)data, len, addr, port);
82+
_sendQueueMutex.lock();
83+
_sendQueue.push_back(std::move(task));
84+
_sendQueueMutex.unlock();
85+
uv_async_send(&_async);
86+
return len;
87+
};
88+
89+
void AsyncUDP::_DO_NOT_CALL_async_cb() {
90+
_sendQueueMutex.lock();
91+
while (!_sendQueue.empty()) {
92+
auto task = std::move(_sendQueue.back());
93+
_sendQueue.pop_back();
94+
_sendQueueMutex.unlock();
95+
_doWrite(task->data, task->len, task->addr, task->port);
96+
_sendQueueMutex.lock();
97+
}
98+
_sendQueueMutex.unlock();
99+
};
100+
101+
void _asyncudp_send_cb(uv_udp_send_t *req, int status) {
102+
free(req);
103+
};
104+
105+
void AsyncUDP::_doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port) {
106+
// FIXME: don't do bytes → string → bytes IP conversion
107+
int maxIpLength = 3*4+3; // 3 digits per octet, 4 octets, 3 dots
108+
char addr_str[maxIpLength+1]; // +1 for null terminator
109+
snprintf(addr_str, maxIpLength, "%d.%d.%d.%d", addr[0], addr[1], addr[2], addr[3]);
110+
addr_str[maxIpLength] = '\0';
111+
112+
// FIXME: implement error handling rather than raising SIGSEGV
113+
struct sockaddr uvAddr;
114+
if (uv_ip4_addr(addr_str, port, (struct sockaddr_in *)&uvAddr) < 0) {
115+
raise(SIGSEGV);
116+
}
117+
118+
uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t));
119+
uv_buf_t msg;
120+
msg.base = (char *)data;
121+
msg.len = len;
122+
if (uv_udp_send(req, &_socket, &msg, 1, (const struct sockaddr *)&uvAddr, _asyncudp_send_cb) < 0) {
123+
raise(SIGSEGV);
124+
}
125+
};

cores/portduino/AsyncUDP.h

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// FIXME: this is a really hacky implementation that just has the things required to make the firmware compile and run.
2+
3+
#ifndef ESPASYNCUDP_H
4+
#define ESPASYNCUDP_H
5+
6+
#include <signal.h>
7+
#include "IPAddress.h"
8+
#include "Print.h"
9+
#include <functional>
10+
#include <mutex>
11+
#include <memory>
12+
#include <thread>
13+
#include <uv.h>
14+
15+
class AsyncUDP;
16+
17+
class AsyncUDPPacket final
18+
{
19+
private:
20+
uint8_t *_data;
21+
size_t _len;
22+
23+
protected:
24+
AsyncUDPPacket(uint8_t* byte, size_t len) {
25+
_data = byte;
26+
_len = len;
27+
};
28+
29+
public:
30+
uint8_t * data() {
31+
return _data;
32+
};
33+
size_t length() {
34+
return _len;
35+
};
36+
37+
friend AsyncUDP;
38+
};
39+
40+
class asyncUDPSendTask final {
41+
protected:
42+
uint8_t *data;
43+
size_t len;
44+
IPAddress addr;
45+
uint16_t port;
46+
47+
public:
48+
asyncUDPSendTask(uint8_t *data, size_t len, IPAddress addr, uint16_t port) {
49+
this->data = (uint8_t*)malloc(len);
50+
memcpy(this->data, data, len);
51+
this->len = len;
52+
this->addr = addr;
53+
this->port = port;
54+
};
55+
56+
~asyncUDPSendTask() {
57+
free(data);
58+
};
59+
60+
friend AsyncUDP;
61+
};
62+
63+
typedef std::function<void(AsyncUDPPacket& packet)> AuPacketHandlerFunction;
64+
typedef std::function<void(void * arg, AsyncUDPPacket& packet)> AuPacketHandlerFunctionWithArg;
65+
66+
class AsyncUDP final
67+
{
68+
private:
69+
std::mutex _handlerMutex;
70+
AuPacketHandlerFunction _handler;
71+
72+
std::mutex _sendQueueMutex;
73+
// the queue is used because uv_udp_send is not threadsafe and uv_async can merge multiple calls into one callback
74+
std::vector<std::unique_ptr<asyncUDPSendTask>> _sendQueue;
75+
76+
std::thread _ioThread;
77+
78+
bool _connected;
79+
IPAddress _listenIP;
80+
81+
uv_loop_t _loop;
82+
uv_udp_t _socket;
83+
uv_async_t _async;
84+
85+
public:
86+
AsyncUDP();
87+
~AsyncUDP() {
88+
raise(SIGSEGV); // FIXME: implement closing and teardown
89+
}
90+
91+
void onPacket(AuPacketHandlerFunctionWithArg cb, void * arg=NULL) {
92+
onPacket(std::bind(cb, arg, std::placeholders::_1));
93+
};
94+
void onPacket(AuPacketHandlerFunction cb) {
95+
_handlerMutex.lock();
96+
_handler = cb;
97+
_handlerMutex.unlock();
98+
};
99+
100+
bool listenMulticast(const IPAddress addr, uint16_t port, uint8_t ttl=1);
101+
102+
size_t writeTo(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
103+
104+
IPAddress listenIP() {
105+
return _listenIP;
106+
};
107+
operator bool() {
108+
return _connected;
109+
};
110+
111+
// do not call, used internally as callback from libuv's C callback
112+
void _DO_NOT_CALL_uv_on_read(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const struct sockaddr *addr, unsigned flags);
113+
void _DO_NOT_CALL_async_cb();
114+
115+
private:
116+
void _doWrite(const uint8_t *data, size_t len, const IPAddress addr, uint16_t port);
117+
};
118+
119+
#endif

0 commit comments

Comments
 (0)