Skip to content

Commit b0e2f4a

Browse files
committed
libtailscale: make tailscale_listener pollable
Use a socketpair(2) and sendmsg/recvmsg to pass a connection fd from Go to C. This lets people write non-blocking C by polling on a tailscale_listener for when they should tailscale_accept. Signed-off-by: David Crawshaw <[email protected]>
1 parent 42597d5 commit b0e2f4a

File tree

8 files changed

+158
-98
lines changed

8 files changed

+158
-98
lines changed

example/echo_server.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ int main(void) {
4646
}
4747
close(conn);
4848
}
49-
tailscale_listener_close(ln);
49+
close(ln);
5050
tailscale_close(ts);
5151

5252
return 0;

python/src/main.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ PYBIND11_MODULE(_tailscale, m) {
4444
Listen on a given protocol and port
4545
)pbdoc");
4646

47-
m.def("close_listener", &TsnetListenerClose, R"pbdoc(
48-
Create a new tsnet server
49-
)pbdoc");
50-
5147
m.def("accept", [](int ld) { int connOut; int rv = TsnetAccept(ld, &connOut); return std::make_tuple(connOut, rv);}, R"pbdoc(
5248
Accept a given listener and connection
5349
)pbdoc");

ruby/lib/tailscale.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ module Libtailscale
3636
attach_function :TsnetSetLogFD, [:int, :int], :int
3737
attach_function :TsnetDial, [:int, :string, :string, :pointer], :int, blocking: true
3838
attach_function :TsnetListen, [:int, :string, :string, :pointer], :int
39-
attach_function :TsnetListenerClose, [:int], :int
40-
attach_function :TsnetAccept, [:int, :pointer], :int, blocking: true
39+
attach_function :close, [:int], :int
40+
attach_function :tailscale_accept, [:int, :pointer], :int, blocking: true
4141
attach_function :TsnetErrmsg, [:int, :pointer, :size_t], :int
4242
attach_function :TsnetLoopback, [:int, :pointer, :size_t, :pointer, :pointer], :int
4343
end
@@ -90,7 +90,7 @@ def accept
9090
# Close the listener.
9191
def close
9292
@ts.assert_open
93-
Error.check @ts, Libtailscale::TsnetListenerClose(@listener)
93+
Error.check @ts, Libtailscale::close(@listener)
9494
end
9595
end
9696

tailscale.c

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
// SPDX-License-Identifier: BSD-3-Clause
33

44
#include "tailscale.h"
5+
#include <sys/socket.h>
6+
#include <stdio.h>
7+
#include <unistd.h>
58

69
// Functions exported by Go.
710
extern int TsnetNewServer();
@@ -17,8 +20,6 @@ extern int TsnetSetControlURL(int sd, char* str);
1720
extern int TsnetSetEphemeral(int sd, int ephemeral);
1821
extern int TsnetSetLogFD(int sd, int fd);
1922
extern int TsnetListen(int sd, char* net, char* addr, int* listenerOut);
20-
extern int TsnetListenerClose(int ld);
21-
extern int TsnetAccept(int ld, int* connOut);
2223
extern int TsnetLoopback(int sd, char* addrOut, size_t addrLen, char* proxyOut, char* localOut);
2324

2425
tailscale tailscale_new() {
@@ -45,12 +46,28 @@ int tailscale_listen(tailscale sd, const char* network, const char* addr, tailsc
4546
return TsnetListen(sd, (char*)network, (char*)addr, (int*)listener_out);
4647
}
4748

48-
int tailscale_listener_close(tailscale_listener ld) {
49-
return TsnetListenerClose(ld);
50-
}
51-
5249
int tailscale_accept(tailscale_listener ld, tailscale_conn* conn_out) {
53-
return TsnetAccept(ld, (int*)conn_out);
50+
struct msghdr msg = {0};
51+
52+
char mbuf[256];
53+
struct iovec io = { .iov_base = mbuf, .iov_len = sizeof(mbuf) };
54+
msg.msg_iov = &io;
55+
msg.msg_iovlen = 1;
56+
57+
char cbuf[256];
58+
msg.msg_control = cbuf;
59+
msg.msg_controllen = sizeof(cbuf);
60+
61+
if (recvmsg(ld, &msg, 0) == -1) {
62+
return -1;
63+
}
64+
65+
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
66+
unsigned char* data = CMSG_DATA(cmsg);
67+
68+
int fd = *(int*)data;
69+
*conn_out = fd;
70+
return 0;
5471
}
5572

5673
int tailscale_set_dir(tailscale sd, const char* dir) {

tailscale.go

Lines changed: 83 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"net"
1515
"os"
1616
"sync"
17-
"sync/atomic"
1817
"syscall"
1918
"unsafe"
2019

@@ -50,14 +49,14 @@ func getServer(sd C.int) (*server, error) {
5049

5150
// listeners tracks all the tsnet_listener objects allocated via tsnet_listen.
5251
var listeners struct {
53-
mu sync.Mutex
54-
next C.int
55-
m map[C.int]*listener
52+
mu sync.Mutex
53+
m map[C.int]*listener
5654
}
5755

5856
type listener struct {
5957
s *server
6058
ln net.Listener
59+
fd int // go side fd of socketpair sent to C
6160
}
6261

6362
// conns tracks all the pipe(2)s allocated via tsnet_dial.
@@ -180,46 +179,85 @@ func TsnetListen(sd C.int, network, addr *C.char, listenerOut *C.int) C.int {
180179
return s.recErr(err)
181180
}
182181

183-
listeners.mu.Lock()
184-
if listeners.next == 0 {
185-
// Arbitrary magic number that will hopefully help someone
186-
// debug some type confusion one day.
187-
listeners.next = 37<<16 + 1
182+
// The tailscale_listener we return to C is one side of a socketpair(2).
183+
// We do this so we can proactively call ln.Accept in a goroutine and
184+
// feed an fd for the connection through the listener. This lets C use
185+
// epoll on the tailscale_listener to know if it should call
186+
// tailscale_accept, which avoids a blocking call on the far side.
187+
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
188+
if err != nil {
189+
return s.recErr(err)
188190
}
191+
sp := fds[1]
192+
fdC := C.int(fds[0])
193+
194+
listeners.mu.Lock()
189195
if listeners.m == nil {
190196
listeners.m = map[C.int]*listener{}
191197
}
192-
ld := listeners.next
193-
listeners.next++
194-
listeners.m[ld] = &listener{s: s, ln: ln}
198+
listeners.m[fdC] = &listener{s: s, ln: ln, fd: sp}
195199
listeners.mu.Unlock()
196200

197-
*listenerOut = ld
198-
return 0
199-
}
200-
201-
//export TsnetListenerClose
202-
func TsnetListenerClose(ld C.int) C.int {
203-
listeners.mu.Lock()
204-
defer listeners.mu.Unlock()
201+
cleanup := func() {
202+
// If fdC is closed on the C side, then we end up calling
203+
// into cleanup twice. Be careful to avoid syscall.Close
204+
// twice as the FD may have been reallocated.
205+
listeners.mu.Lock()
206+
if tsLn, ok := listeners.m[fdC]; ok && tsLn.ln == ln {
207+
delete(listeners.m, fdC)
208+
syscall.Close(sp)
209+
}
210+
listeners.mu.Unlock()
205211

206-
l := listeners.m[ld]
207-
if l == nil {
208-
return C.EBADF
212+
ln.Close()
209213
}
210-
err := l.ln.Close()
211-
delete(listeners.m, ld)
214+
go func() {
215+
// fdC is never written to, so trying to read from sp blocks
216+
// until fdC is closed. We use this as a signal that C is
217+
// done with the listener, and we can tear it down.
218+
//
219+
// TODO: would using os.NewFile avoid a locked up thread?
220+
var buf [256]byte
221+
syscall.Read(sp, buf[:])
222+
cleanup()
223+
}()
224+
go func() {
225+
defer cleanup()
226+
for {
227+
netConn, err := ln.Accept()
228+
if err != nil {
229+
return
230+
}
231+
var connFd C.int
232+
if err := newConn(s, netConn, &connFd); err != nil {
233+
if s.s.Logf != nil {
234+
s.s.Logf("libtailscale.accept: newConn: %v", err)
235+
}
236+
netConn.Close()
237+
continue
238+
}
239+
rights := syscall.UnixRights(int(connFd))
240+
err = syscall.Sendmsg(sp, nil, rights, nil, 0)
241+
if err != nil {
242+
// We handle sp being closed in the read goroutine above.
243+
if s.s.Logf != nil {
244+
s.s.Logf("libtailscale.accept: sendmsg failed: %v", err)
245+
}
246+
netConn.Close()
247+
// fallthrough to close connFd, then continue Accept()ing
248+
}
249+
syscall.Close(int(connFd)) // now owned by recvmsg
250+
}
251+
}()
212252

213-
if err != nil {
214-
return l.s.recErr(err)
215-
}
253+
*listenerOut = fdC
216254
return 0
217255
}
218256

219-
func newConn(s *server, netConn net.Conn, connOut *C.int) C.int {
257+
func newConn(s *server, netConn net.Conn, connOut *C.int) error {
220258
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_STREAM, 0)
221259
if err != nil {
222-
return s.recErr(err)
260+
return err
223261
}
224262
r := os.NewFile(uintptr(fds[1]), "socketpair-r")
225263
c := &conn{s: s.s, c: netConn, r: r}
@@ -232,17 +270,21 @@ func newConn(s *server, netConn net.Conn, connOut *C.int) C.int {
232270
conns.m[fdC] = c
233271
conns.mu.Unlock()
234272

235-
var doneOnce atomic.Bool
236273
connCleanup := func() {
237-
if !doneOnce.Swap(true) {
274+
var inCleanup bool
275+
conns.mu.Lock()
276+
if tsConn, ok := conns.m[fdC]; ok && tsConn.c == netConn {
277+
delete(conns.m, fdC)
278+
inCleanup = true
279+
}
280+
conns.mu.Unlock()
281+
282+
if !inCleanup {
238283
return
239284
}
285+
240286
r.Close()
241287
netConn.Close()
242-
243-
conns.mu.Lock()
244-
delete(conns.m, fdC)
245-
conns.mu.Unlock()
246288
}
247289
go func() {
248290
defer connCleanup()
@@ -264,24 +306,7 @@ func newConn(s *server, netConn net.Conn, connOut *C.int) C.int {
264306
}()
265307

266308
*connOut = fdC
267-
return 0
268-
}
269-
270-
//export TsnetAccept
271-
func TsnetAccept(ld C.int, connOut *C.int) C.int {
272-
listeners.mu.Lock()
273-
l := listeners.m[ld]
274-
listeners.mu.Unlock()
275-
276-
if l == nil {
277-
return C.EBADF
278-
}
279-
280-
netConn, err := l.ln.Accept()
281-
if err != nil {
282-
return l.s.recErr(err)
283-
}
284-
return newConn(l.s, netConn, connOut)
309+
return nil
285310
}
286311

287312
//export TsnetDial
@@ -294,7 +319,10 @@ func TsnetDial(sd C.int, network, addr *C.char, connOut *C.int) C.int {
294319
if err != nil {
295320
return s.recErr(err)
296321
}
297-
return newConn(s, netConn, connOut)
322+
if newConn(s, netConn, connOut); err != nil {
323+
return s.recErr(err)
324+
}
325+
return 0
298326
}
299327

300328
//export TsnetSetDir

tailscale.h

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,12 @@ extern int tailscale_dial(tailscale sd, const char* network, const char* addr, t
8787
// A tailscale_listener is a socket on the tailnet listening for connections.
8888
//
8989
// It is much like allocating a system socket(2) and calling listen(2).
90-
// Because it is not a system socket, operate on it using the functions
91-
// tailscale_accept and tailscale_listener_close.
90+
// Accept connections with tailscale_accept and close the listener with close.
91+
//
92+
// Under the hood, a tailscale_listener is one half of a socketpair itself,
93+
// used to move the connection fd from Go to C. This means you can use epoll
94+
// or its equivalent on a tailscale_listener to know if there is a connection
95+
// read to accept.
9296
typedef int tailscale_listener;
9397

9498
// tailscale_listen listens for a connection on the tailnet.
@@ -104,14 +108,6 @@ typedef int tailscale_listener;
104108
// Returns zero on success or -1 on error, call tailscale_errmsg for details.
105109
extern int tailscale_listen(tailscale sd, const char* network, const char* addr, tailscale_listener* listener_out);
106110

107-
// tailscale_listener_close closes the listener.
108-
//
109-
// Returns:
110-
// 0 - success
111-
// EBADF - listener is not a valid tailscale_listener
112-
// -1 - call tailscale_errmsg for details
113-
extern int tailscale_listener_close(tailscale_listener listener);
114-
115111
// tailscale_accept accepts a connection on a tailscale_listener.
116112
//
117113
// It is the spiritual equivalent to accept(2).

tailscale_test.go

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"testing"
5+
"time"
56

67
"github.com/tailscale/libtailscale/tsnetctest"
78
)
@@ -11,27 +12,49 @@ func TestConn(t *testing.T) {
1112

1213
// RunTestConn cleans up after itself, so there shouldn't be
1314
// anything left in the global maps.
14-
conns.mu.Lock()
15-
rem := len(conns.m)
16-
conns.mu.Unlock()
15+
16+
servers.mu.Lock()
17+
rem := len(servers.m)
18+
servers.mu.Unlock()
1719

1820
if rem > 0 {
19-
t.Fatalf("want no remaining tsnet_conn objects, got %d", rem)
21+
t.Fatalf("want no remaining tsnet objects, got %d", rem)
2022
}
2123

22-
listeners.mu.Lock()
23-
rem = len(listeners.m)
24-
listeners.mu.Unlock()
25-
26-
if rem > 0 {
27-
t.Fatalf("want no remaining tsnet_listener objects, got %d", rem)
24+
var remConns, remLns int
25+
26+
for i := 0; i < 50; i++ {
27+
conns.mu.Lock()
28+
remConns = len(conns.m)
29+
conns.mu.Unlock()
30+
31+
listeners.mu.Lock()
32+
remLns = len(listeners.m)
33+
listeners.mu.Unlock()
34+
35+
if remConns == 0 && remLns == 0 {
36+
break
37+
}
38+
39+
// We are waiting for cleanup goroutines to finish.
40+
//
41+
// libtailscale closes one side of a socketpair and
42+
// then Go responds to the other side being unreadable
43+
// by closing the connections and listeners.
44+
//
45+
// This is inherently asynchronous.
46+
// Without ditching the standard close(2) and having our
47+
// own close functions.
48+
//
49+
// So we spin for a while
50+
time.Sleep(100 * time.Millisecond)
2851
}
2952

30-
servers.mu.Lock()
31-
rem = len(servers.m)
32-
servers.mu.Unlock()
53+
if remConns > 0 {
54+
t.Errorf("want no remaining tsnet_conn objects, got %d", remConns)
55+
}
3356

34-
if rem > 0 {
35-
t.Fatalf("want no remaining tsnet objects, got %d", rem)
57+
if remLns > 0 {
58+
t.Errorf("want no remaining tsnet_listener objects, got %d", remLns)
3659
}
3760
}

0 commit comments

Comments
 (0)