Skip to content

Commit 987c956

Browse files
authored
libreactor: Enable SO_ATTACH_REUSEPORT_CBPF (#6244)
* libreactor: enable SO_ATTACH_REUSEPORT_CBPF SO_ATTACH_REUSEPORT_CBPF is a BPF based "program" that automatically assigns a packet to a given socket based on the core id of the CPU that initially received the packet and did the IRQ processing. This improves data locality and therefore increases performance. * libreactor: Improve SO_ATTACH_REUSEPORT_CBPF performance by controlling worker forking order Rename setup() to fork_workers() to make its purpose clearer The standard BPF program used with SO_ATTACH_REUSEPORT_CBPF automatically assigns a packet to a given socket based on the core id of the CPU that initially received the packet and did the IRQ processing, CPU 0 -> socket 0. The idea is that if the packet is passed to the userland code running on the same CPU then things are more efficient. However, contrary to my initial assumption, there isn't an automatic mapping between the id of a socket, and the id of the CPU that the userland process (which opened the socket) is running on. The "id" of the socket is determined by the order in which sockets are opened. So it works best if the order in which the sockets are opened is controlled to match the order in which processes are pinned to CPUs. Previously, the for loop in setup() (a) forked a child process, (b) pinned it to a CPU, and then (c) started up an instance of the libreactor server. However since fork() was being called inside the loop, the order in which the sockets got opened in the child processes was not deterministic. In some cases the process that was pinned to CPU 0 would actually end up being the third process to open a socket, so it would end up getting packets that had been received on the kernel side by CPU 2, which of course doesn't bring any efficiency gains. To resolve this, I am using an eventfd semaphore to communicate between the parent and child processes and ensure that the forking happens sequentially, and the order of the sockets being opened matches the order of the CPUs being pinned. Now I am seeing a much more consistent performance improvement. * libreactor: Upgrade to newly released libdynamic 2.2.0
1 parent 66a530a commit 987c956

File tree

6 files changed

+108
-32
lines changed

6 files changed

+108
-32
lines changed

frameworks/C/libreactor/libreactor-server.dockerfile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ WORKDIR /build
77

88
ENV CC=gcc-10 AR=gcc-ar-10 NM=gcc-nm-10 RANLIB=gcc-ranlib-10
99

10-
RUN git clone https://github.com/fredrikwidlund/libdynamic && \
11-
cd libdynamic && \
12-
git checkout aee8f053c113 && \
13-
./autogen.sh && \
10+
RUN wget -q https://github.com/fredrikwidlund/libdynamic/releases/download/v2.2.0/libdynamic-2.2.0.tar.gz && \
11+
tar xfz libdynamic-2.2.0.tar.gz && \
12+
cd libdynamic-2.2.0 && \
1413
./configure && \
1514
make install
1615

frameworks/C/libreactor/libreactor.dockerfile

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ WORKDIR /build
77

88
ENV CC=gcc-10 AR=gcc-ar-10 NM=gcc-nm-10 RANLIB=gcc-ranlib-10
99

10-
RUN git clone https://github.com/fredrikwidlund/libdynamic && \
11-
cd libdynamic && \
12-
git checkout aee8f053c113 && \
13-
./autogen.sh && \
10+
RUN wget -q https://github.com/fredrikwidlund/libdynamic/releases/download/v2.2.0/libdynamic-2.2.0.tar.gz && \
11+
tar xfz libdynamic-2.2.0.tar.gz && \
12+
cd libdynamic-2.2.0 && \
1413
./configure && \
1514
make install
1615

frameworks/C/libreactor/src/helpers.c

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#include <string.h>
88
#include <sched.h>
99
#include <sys/wait.h>
10+
#include <sys/eventfd.h>
11+
#include <netinet/in.h>
12+
#include <linux/filter.h>
1013
#include <err.h>
1114

1215
#include <dynamic.h>
@@ -80,35 +83,89 @@ void json(server_context *context, clo *json_object)
8083
write_response(&context->session->stream, json_preamble, segment_string(json_string));
8184
}
8285

83-
void setup()
86+
void enable_reuseport_cbpf(server *s)
8487
{
88+
struct sock_filter code[] = {{BPF_LD | BPF_W | BPF_ABS, 0, 0, SKF_AD_OFF + SKF_AD_CPU}, {BPF_RET | BPF_A, 0, 0, 0}};
89+
struct sock_fprog prog = { .len = sizeof(code)/sizeof(code[0]), .filter = code };
8590
int e;
91+
92+
e = setsockopt(s->fd, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF, &prog, sizeof(prog));
93+
if (e == -1)
94+
err(1, "SO_ATTACH_REUSEPORT_CBPF");
95+
}
96+
97+
int fork_workers()
98+
{
99+
int e, efd, worker_count = 0;
86100
pid_t pid;
87-
cpu_set_t available_cpus, cpu;
101+
eventfd_t eventfd_value;
102+
cpu_set_t online_cpus, cpu;
88103

89104
signal(SIGPIPE, SIG_IGN);
90-
CPU_ZERO(&available_cpus);
91-
sched_getaffinity(0, sizeof(available_cpus), &available_cpus); // Get set of all available CPUs
92105

93-
for (int i = 0; i < CPU_SETSIZE; i++)
106+
// Get set/count of all online CPUs
107+
CPU_ZERO(&online_cpus);
108+
sched_getaffinity(0, sizeof(online_cpus), &online_cpus);
109+
int num_online_cpus = CPU_COUNT(&online_cpus);
110+
111+
// Create a mapping between the relative cpu id and absolute cpu id for cases where the cpu ids are not contiguous
112+
// E.g if only cpus 0, 1, 8, and 9 are visible to the app because taskset was used or because some cpus are offline
113+
// then the mapping is 0 -> 0, 1 -> 1, 2 -> 8, 3 -> 9
114+
int rel_to_abs_cpu[num_online_cpus];
115+
int rel_cpu_index = 0;
116+
117+
for (int abs_cpu_index = 0; abs_cpu_index < CPU_SETSIZE; abs_cpu_index++) {
118+
if (CPU_ISSET(abs_cpu_index, &online_cpus)){
119+
rel_to_abs_cpu[rel_cpu_index] = abs_cpu_index;
120+
rel_cpu_index++;
121+
122+
if (rel_cpu_index == num_online_cpus)
123+
break;
124+
}
125+
}
126+
127+
// fork a new child/worker process for each available cpu
128+
for (int i = 0; i < num_online_cpus; i++)
94129
{
95-
if (CPU_ISSET(i, &available_cpus))
130+
// Create an eventfd to communicate with the forked child process on each iteration
131+
// This ensures that the order of forking is deterministic which is important when using SO_ATTACH_REUSEPORT_CBPF
132+
efd = eventfd(0, EFD_SEMAPHORE);
133+
if (efd == -1)
134+
err(1, "eventfd");
135+
136+
pid = fork();
137+
if (pid == -1)
138+
err(1, "fork");
139+
140+
// Parent process. Block the for loop until the child has set cpu affinity AND started listening on its socket
141+
if (pid > 0)
142+
{
143+
// Block waiting for the child process to update the eventfd semaphore as a signal to proceed
144+
eventfd_read(efd, &eventfd_value);
145+
close(efd);
146+
147+
worker_count++;
148+
(void) fprintf(stderr, "Worker running on CPU %d\n", i);
149+
continue;
150+
}
151+
152+
// Child process. Set cpu affinity and return eventfd
153+
if (pid == 0)
96154
{
97-
pid = fork();
98-
if (pid == -1)
99-
err(1, "fork");
100-
101-
if (pid == 0)
102-
{
103-
CPU_ZERO(&cpu);
104-
CPU_SET(i, &cpu);
105-
e = sched_setaffinity(0, sizeof cpu, &cpu);
106-
if (e == -1)
107-
err(1, "sched_setaffinity");
108-
109-
return;
110-
}
155+
CPU_ZERO(&cpu);
156+
CPU_SET(rel_to_abs_cpu[i], &cpu);
157+
e = sched_setaffinity(0, sizeof cpu, &cpu);
158+
if (e == -1)
159+
err(1, "sched_setaffinity");
160+
161+
// Break out of the for loop and continue running main. The child will signal the parent once the socket is open
162+
return efd;
111163
}
112164
}
113-
wait(NULL);
165+
166+
(void) fprintf(stderr, "libreactor running with %d worker processes\n", worker_count);
167+
168+
wait(NULL); // wait for children to exit
169+
(void) fprintf(stderr, "A worker process has exited unexpectedly. Shutting down.\n");
170+
exit(EXIT_FAILURE);
114171
}

frameworks/C/libreactor/src/helpers.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ void plaintext(server_context *context, char *response);
55

66
void json(server_context *context, clo *json_object);
77

8-
void setup();
8+
void enable_reuseport_cbpf(server *s);
9+
10+
int fork_workers();
911

1012
#endif /* HELPERS_H_INCLUDED */

frameworks/C/libreactor/src/libreactor-server.c

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <stdlib.h>
44
#include <unistd.h>
55
#include <err.h>
6+
#include <sys/eventfd.h>
67

78
#include <dynamic.h>
89
#include <reactor.h>
@@ -44,13 +45,21 @@ static core_status server_handler(core_event *event)
4445

4546
int main()
4647
{
48+
int parent_eventfd;
4749
server s;
4850

49-
setup();
51+
// fork_workers() forks a separate child/worker process for each available cpu and returns an eventfd from the parent
52+
// The eventfd is used to signal the parent. This guarantees the forking order needed for REUSEPORT_CBPF to work well
53+
parent_eventfd = fork_workers();
54+
5055
core_construct(NULL);
5156
server_construct(&s, server_handler, &s);
5257
server_open(&s, 0, 8080);
5358

59+
// Signal the parent process so that it can proceed with the next fork
60+
eventfd_write(parent_eventfd, (eventfd_t) 1);
61+
close(parent_eventfd);
62+
5463
core_loop(NULL);
5564
core_destruct(NULL);
5665
}

frameworks/C/libreactor/src/libreactor.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <stdlib.h>
44
#include <unistd.h>
55
#include <err.h>
6+
#include <sys/eventfd.h>
67

78
#include <dynamic.h>
89
#include <reactor.h>
@@ -42,12 +43,21 @@ static core_status server_handler(core_event *event)
4243

4344
int main()
4445
{
46+
int parent_eventfd;
4547
server s;
4648

47-
setup();
49+
// fork_workers() forks a separate child/worker process for each available cpu and returns an eventfd from the parent
50+
// The eventfd is used to signal the parent. This guarantees the forking order needed for REUSEPORT_CBPF to work well
51+
parent_eventfd = fork_workers();
52+
4853
core_construct(NULL);
4954
server_construct(&s, server_handler, &s);
5055
server_open(&s, 0, 8080);
56+
enable_reuseport_cbpf(&s);
57+
58+
// Signal the parent process so that it can proceed with the next fork
59+
eventfd_write(parent_eventfd, (eventfd_t) 1);
60+
close(parent_eventfd);
5161

5262
core_loop(NULL);
5363
core_destruct(NULL);

0 commit comments

Comments
 (0)