Skip to content

Commit 6d588de

Browse files
authored
Merge pull request #618 from pguyot/w22/add-select-support-to-esp32
Add enif_select support to esp32
2 parents 867592e + c205c23 commit 6d588de

File tree

7 files changed

+397
-4
lines changed

7 files changed

+397
-4
lines changed

src/libAtomVM/erl_nif.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ ERL_NIF_TERM enif_make_resource(ErlNifEnv *env, void *obj);
201201
* Please note that `kqueue(2)` and `poll(2)` behave differently for some
202202
* objects, for example for vnodes and EOF.
203203
*
204+
* On `esp32`, this is currently implemented using `poll(2)`.
205+
*
204206
* @param env current environment
205207
* @param event event object (typically a file descriptor)
206208
* @param mode select mode (`ERL_NIF_SELECT_READ` and/or `ERL_NIF_SELECT_WRITE`)

src/platforms/esp32/components/avm_sys/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ endif()
3939
idf_component_register(
4040
SRCS ${AVM_SYS_COMPONENT_SRCS}
4141
INCLUDE_DIRS "include"
42-
REQUIRES "spi_flash" "soc" "newlib" "pthread" ${ADDITIONAL_COMPONENTS}
42+
REQUIRES "spi_flash" "soc" "newlib" "pthread" "vfs" ${ADDITIONAL_COMPONENTS}
4343
PRIV_REQUIRES "libatomvm" "esp_timer" ${ADDITIONAL_PRIV_REQUIRES}
4444
)
4545

src/platforms/esp32/components/avm_sys/include/esp32_sys.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
#include "freertos/FreeRTOS.h"
2525
#include <esp_partition.h>
2626
#include <freertos/queue.h>
27+
#include "esp_pthread.h"
2728

2829
#if ESP_IDF_VERSION_MAJOR >= 5
2930
#include <spi_flash_mmap.h>
3031
#endif
3132

3233
#include <time.h>
34+
#include <sys/poll.h>
3335

3436
#include "sys.h"
3537

@@ -81,6 +83,13 @@ struct EventListener
8183

8284
struct ESP32PlatformData
8385
{
86+
pthread_t select_thread;
87+
bool select_thread_exit;
88+
bool eventfd_registered;
89+
int signal_fd;
90+
int ATOMIC select_events_poll_count;
91+
struct pollfd *fds;
92+
8493
// socket_driver
8594
EventListener *socket_listener;
8695
struct SyncList sockets;

src/platforms/esp32/components/avm_sys/sys.c

Lines changed: 144 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@
4040
#include <esp_log.h>
4141
#include <esp_partition.h>
4242
#include <limits.h>
43+
#include <pthread.h>
4344
#include <stdint.h>
4445
#include <sys/socket.h>
4546

46-
// introduced starting with 4.4
4747
#if ESP_IDF_VERSION_MAJOR >= 5
4848
#include "esp_chip_info.h"
4949
#endif
5050

51+
#include <esp_vfs_eventfd.h>
52+
5153
#ifdef HAVE_SOC_CPU_CORES_NUM
5254
#include "soc/soc_caps.h"
5355
#endif
@@ -61,6 +63,9 @@
6163

6264
static Context *port_driver_create_port(const char *port_name, GlobalContext *global, term opts);
6365

66+
static void *select_thread_loop(void *);
67+
static void select_thread_signal(struct ESP32PlatformData *platform);
68+
6469
// clang-format off
6570
static const char *const esp_free_heap_size_atom = "\x14" "esp32_free_heap_size";
6671
static const char *const esp_largest_free_block_atom = "\x18" "esp32_largest_free_block";
@@ -182,6 +187,29 @@ void sys_init_platform(GlobalContext *glb)
182187
{
183188
struct ESP32PlatformData *platform = malloc(sizeof(struct ESP32PlatformData));
184189
glb->platform_data = platform;
190+
platform->select_thread_exit = false;
191+
platform->select_events_poll_count = -1;
192+
esp_vfs_eventfd_config_t eventfd_config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
193+
esp_err_t err = esp_vfs_eventfd_register(&eventfd_config);
194+
if (err == ESP_OK) {
195+
platform->eventfd_registered = true;
196+
} else {
197+
if (UNLIKELY(err != ESP_ERR_INVALID_STATE)) {
198+
// Function can return fatal ESP_ERR_NO_MEM
199+
fprintf(stderr, "Cannot register eventfd, unexpected error = %d\n", err);
200+
AVM_ABORT();
201+
}
202+
platform->eventfd_registered = false;
203+
}
204+
int signal_fd = eventfd(0, 0);
205+
if (UNLIKELY(signal_fd < 0)) {
206+
fprintf(stderr, "Cannot create signal_fd\n");
207+
AVM_ABORT();
208+
}
209+
platform->signal_fd = signal_fd;
210+
if (UNLIKELY(pthread_create(&platform->select_thread, NULL, select_thread_loop, glb))) {
211+
AVM_ABORT();
212+
}
185213
#ifndef AVM_NO_SMP
186214
// Use the ESP-IDF API to change the default thread attributes
187215
// We use the current main thread priority.
@@ -208,6 +236,16 @@ void sys_init_platform(GlobalContext *glb)
208236
void sys_free_platform(GlobalContext *glb)
209237
{
210238
struct ESP32PlatformData *platform = glb->platform_data;
239+
platform->select_thread_exit = true;
240+
select_thread_signal(platform);
241+
pthread_join(platform->select_thread, NULL);
242+
close(platform->signal_fd);
243+
if (platform->eventfd_registered) {
244+
if (UNLIKELY(esp_vfs_eventfd_unregister() != ESP_OK)) {
245+
fprintf(stderr, "Cannot unregister eventfd\n");
246+
AVM_ABORT();
247+
}
248+
}
211249
free(platform);
212250
}
213251

@@ -560,16 +598,119 @@ void sys_unregister_listener(GlobalContext *global, struct EventListener *listen
560598

561599
void sys_register_select_event(GlobalContext *global, ErlNifEvent event, bool is_write)
562600
{
563-
UNUSED(global);
564601
UNUSED(event);
565602
UNUSED(is_write);
603+
604+
struct ESP32PlatformData *platform = global->platform_data;
605+
platform->select_events_poll_count = -1;
606+
select_thread_signal(platform);
566607
}
567608

568609
void sys_unregister_select_event(GlobalContext *global, ErlNifEvent event, bool is_write)
569610
{
570-
UNUSED(global);
571611
UNUSED(event);
572612
UNUSED(is_write);
613+
614+
struct ESP32PlatformData *platform = global->platform_data;
615+
platform->select_events_poll_count = -1;
616+
select_thread_signal(platform);
617+
}
618+
619+
static void *select_thread_loop(void *arg)
620+
{
621+
GlobalContext *glb = arg;
622+
struct ESP32PlatformData *platform = glb->platform_data;
623+
struct pollfd *fds = malloc(0);
624+
while (!platform->select_thread_exit) {
625+
int select_events_poll_count = platform->select_events_poll_count;
626+
int poll_count = 1;
627+
int fd_index;
628+
if (select_events_poll_count < 0) {
629+
// Means it is dirty and should be rebuilt.
630+
size_t select_events_new_count;
631+
if (select_events_poll_count < 0) {
632+
select_event_count_and_destroy_closed(NULL, NULL, &select_events_new_count, glb);
633+
} else {
634+
select_events_new_count = select_events_poll_count;
635+
}
636+
637+
fds = realloc(fds, sizeof(struct pollfd) * (poll_count + select_events_new_count));
638+
639+
fds[0].fd = platform->signal_fd;
640+
fds[0].events = POLLIN;
641+
fds[0].revents = 0;
642+
643+
fd_index = poll_count;
644+
645+
struct ListHead *item;
646+
struct ListHead *select_events = synclist_rdlock(&glb->select_events);
647+
LIST_FOR_EACH (item, select_events) {
648+
struct SelectEvent *select_event = GET_LIST_ENTRY(item, struct SelectEvent, head);
649+
if (select_event->read || select_event->write) {
650+
fds[fd_index].fd = select_event->event;
651+
fds[fd_index].events = (select_event->read ? POLLIN : 0) | (select_event->write ? POLLOUT : 0);
652+
fds[fd_index].revents = 0;
653+
654+
fd_index++;
655+
}
656+
}
657+
synclist_unlock(&glb->select_events);
658+
659+
select_events_poll_count = select_events_new_count;
660+
platform->select_events_poll_count = select_events_new_count;
661+
}
662+
663+
poll_count += select_events_poll_count;
664+
665+
int nb_descriptors = poll(fds, poll_count, -1);
666+
fd_index = 0;
667+
if (nb_descriptors > 0) {
668+
if ((fds[0].revents & fds[0].events)) {
669+
// We've been signaled
670+
uint64_t ignored;
671+
if (UNLIKELY(read(platform->signal_fd, &ignored, sizeof(ignored)) < 0)) {
672+
fprintf(stderr, "Reading event_fd failed -- errno = %d\n", errno);
673+
AVM_ABORT();
674+
}
675+
nb_descriptors--;
676+
}
677+
fd_index++;
678+
}
679+
680+
for (int i = 0; i < select_events_poll_count && nb_descriptors > 0; i++, fd_index++) {
681+
if (!(fds[fd_index].revents & fds[fd_index].events)) {
682+
continue;
683+
}
684+
bool is_read = fds[fd_index].revents & POLLIN;
685+
bool is_write = fds[fd_index].revents & POLLOUT;
686+
fds[fd_index].revents = 0;
687+
nb_descriptors--;
688+
689+
select_event_notify(fds[fd_index].fd, is_read, is_write, glb);
690+
}
691+
}
692+
693+
free((void *) fds);
694+
695+
return NULL;
696+
}
697+
698+
static void select_thread_signal(struct ESP32PlatformData *platform)
699+
{
700+
// Write can fail if the counter overflows
701+
// (very unlikely, 2^64)
702+
uint64_t val = 1;
703+
if (UNLIKELY(write(platform->signal_fd, &val, sizeof(val)) < 0)) {
704+
uint64_t ignored;
705+
if (UNLIKELY(read(platform->signal_fd, &ignored, sizeof(ignored)) < 0)) {
706+
fprintf(stderr, "Reading event_fd failed\n");
707+
AVM_ABORT();
708+
}
709+
if (UNLIKELY(write(platform->signal_fd, &val, sizeof(val)) < 0)) {
710+
fprintf(stderr, "Writing event_fd failed\n");
711+
AVM_ABORT();
712+
}
713+
}
573714
}
574715

575716
bool event_listener_is_event(EventListener *listener, listener_event_t event)

src/platforms/esp32/test/main/test_erl_sources/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ compile_erlang(test_list_to_binary)
4242
compile_erlang(test_md5)
4343
compile_erlang(test_monotonic_time)
4444
compile_erlang(test_rtc_slow)
45+
compile_erlang(test_select)
4546
compile_erlang(test_socket)
4647
compile_erlang(test_time_and_processes)
4748
compile_erlang(test_tz)
@@ -56,6 +57,7 @@ add_custom_command(
5657
test_md5.beam
5758
test_monotonic_time.beam
5859
test_rtc_slow.beam
60+
test_select.beam
5961
test_socket.beam
6062
test_time_and_processes.beam
6163
test_tz.beam
@@ -67,6 +69,7 @@ add_custom_command(
6769
"${CMAKE_CURRENT_BINARY_DIR}/test_md5.beam"
6870
"${CMAKE_CURRENT_BINARY_DIR}/test_monotonic_time.beam"
6971
"${CMAKE_CURRENT_BINARY_DIR}/test_rtc_slow.beam"
72+
"${CMAKE_CURRENT_BINARY_DIR}/test_select.beam"
7073
"${CMAKE_CURRENT_BINARY_DIR}/test_socket.beam"
7174
"${CMAKE_CURRENT_BINARY_DIR}/test_time_and_processes.beam"
7275
"${CMAKE_CURRENT_BINARY_DIR}/test_tz.beam"
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
%
2+
% This file is part of AtomVM.
3+
%
4+
% Copyright 2023 Paul Guyot <[email protected]>
5+
%
6+
% Licensed under the Apache License, Version 2.0 (the "License");
7+
% you may not use this file except in compliance with the License.
8+
% You may obtain a copy of the License at
9+
%
10+
% http://www.apache.org/licenses/LICENSE-2.0
11+
%
12+
% Unless required by applicable law or agreed to in writing, software
13+
% distributed under the License is distributed on an "AS IS" BASIS,
14+
% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
% See the License for the specific language governing permissions and
16+
% limitations under the License.
17+
%
18+
% SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
19+
%
20+
21+
-module(test_select).
22+
23+
-export([start/0]).
24+
25+
% This test relies on a special vfs registered under /pipe.
26+
27+
start() ->
28+
{ok, WrFd} = atomvm:posix_open("/pipe/0", [o_wronly]),
29+
{ok, RdFd} = atomvm:posix_open("/pipe/0", [o_rdonly]),
30+
% Make sure this test vfs works as expected
31+
{ok, 1} = atomvm:posix_write(WrFd, <<42>>),
32+
{error, eagain} = atomvm:posix_write(WrFd, <<43>>),
33+
{ok, <<42>>} = atomvm:posix_read(RdFd, 1),
34+
{error, eagain} = atomvm:posix_read(RdFd, 1),
35+
36+
% Write fd should be selectable.
37+
SelectWriteRef = make_ref(),
38+
ok = atomvm:posix_select_write(WrFd, self(), SelectWriteRef),
39+
ok =
40+
receive
41+
{select, WrFd, SelectWriteRef, ready_output} -> ok;
42+
M -> {unexpected, M}
43+
after 200 -> fail
44+
end,
45+
ok = atomvm:posix_select_stop(WrFd),
46+
47+
% Write and check that rd is selectable fd should be selectable.
48+
{ok, 1} = atomvm:posix_write(WrFd, <<42>>),
49+
SelectReadRef = make_ref(),
50+
ok = atomvm:posix_select_read(RdFd, self(), SelectReadRef),
51+
ok =
52+
receive
53+
{select, RdFd, SelectReadRef, ready_input} -> ok
54+
after 200 -> fail
55+
end,
56+
{ok, <<42>>} = atomvm:posix_read(RdFd, 1),
57+
ok = atomvm:posix_select_read(RdFd, self(), SelectReadRef),
58+
ok =
59+
receive
60+
{select, RdFd, SelectReadRef, _} -> fail
61+
after 200 -> ok
62+
end,
63+
{ok, 1} = atomvm:posix_write(WrFd, <<43>>),
64+
ok =
65+
receive
66+
{select, RdFd, SelectReadRef, ready_input} -> ok;
67+
M2 -> {unexpected, M2}
68+
after 200 -> fail
69+
end,
70+
ok = atomvm:posix_select_stop(RdFd),
71+
ok =
72+
receive
73+
Message -> {unexpected, Message}
74+
after 200 -> ok
75+
end,
76+
77+
ok = atomvm:posix_close(WrFd),
78+
ok = atomvm:posix_close(RdFd),
79+
ok.

0 commit comments

Comments
 (0)