Skip to content

Commit 67ed52c

Browse files
committed
Rework the AutoRegister API and add comprehensive integration tests!
I worked on this in pieces over several weekends. First I got carried away rewriting things and then later I added the tests. For AutoRegister changes, I didn't like passing in WaitGroup as a parameter in the API and I felt like it added too much boilerplate to call the function. I also didn't like breaking standard go conventions and returning the error in the struct. Later on after I wrote the tests I also realised the "return only the first registration" API is nice to use without the default client as well so I made it a client-specific version. In order to make testing work easily I exposed the LeaseTTL as a global variable. I'm now thinking I want to change it to a flag so each instance of embedportal.Run can have a different TTL. I already had too many changes going on and I had to stop and commit. Tools changes: With the AutoRegister changes I ended up making tools.AutoTLSConf, but I'm not entirely happy with the API yet. To tools.HTTPServer I added the Listener option mainly so that I could use a file descriptor listener in the tests. This also allows binding to a specific IP. Kinda makes the port argument useless if you use it but it's nice to have the extra option and get the nice shutdown code. The host, assimilate, and spawn changes are for the most part to use the new API so this commit compiles. Also in spawn I ended up adding a context and WaitGroup to the dashboard. I think I need to rework this some too. When I wrote the tests I also found and fixed two bugs: 1. In gate.ResolveFlags() the conditions weren't quite right for people setting the pointers directly in code 2. There was a subtle threading issue in clientLeasor that I discovered when I made the tests parallel. Because I was trying to cache the heap pointers myself setting the nextLeasor variable wasn't a thread safe operation and it was possible to have race issues. The proper fix is to use sync.Pool, it was a bit complicated to get exactly right. Last and most importantly, I did a lot of refinement on the tests before committing. It runs one portal server for all of the parallel tests and uses open ports automatically selected by the OS and even passes those open sockets to portal via FDs so that there's no thread race issue when we close it and portal reopens. Lots of interesting careful use of t.Cleanup. I also wrote some cool test helpers for subtests using reflection. So you don't have to manually name the subtests and update the list of t.Run calls when you add new test functions. It's almost like it's built in syntax for go test. Also the FreePort function is kind of reusable, unfortunately I had to make it ugly to support windows. At first I wrote it to get the portal token by writing the state file to the test temp dir and then repeatedly reading the state file and parsing the proto until the token appeared. I ended up changing it over to reading the logs like portal does because I didn't really like polling the filesystem and using it for communication like that. Also I decided I want to be able to test the way spawn actually works and I will write a portal restart test which proves the state file works without actually parsing it. Unfortunately it ended up longer code. I think I need to make extracting the token from portal better but I'm not sure how yet. As for the actual tests: I was careful to cover all the branches of AutoRegister although it's not perfect yet. I left lots of TODO comments. I'm really happy with the cleanup sequences and for example the extra check that unregister happened at the end of the HTTP test. I really like that I was able to stick to only the public interface and make such a nice setup.
1 parent 437984f commit 67ed52c

File tree

21 files changed

+876
-183
lines changed

21 files changed

+876
-183
lines changed

.github/workflows/tests.yml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ jobs:
1818
- uses: actions/checkout@v4
1919

2020
- name: Set up Go
21-
uses: actions/setup-go@v5
21+
uses: actions/setup-go@v5.5.0
2222
with:
2323
go-version-file: go.mod
2424

2525
- name: Build daemon
26-
run: go build -race -v ./...
26+
shell: bash
27+
run: |
28+
GOTOOLCHAIN=local go version
29+
GOTOOLCHAIN=local go build -race -v ./...
2730
2831
- name: Run daemon unit tests
2932
run: go test -race -v ./...
@@ -64,7 +67,7 @@ jobs:
6467
path: 'pebble'
6568

6669
- name: Set up Go
67-
uses: actions/setup-go@v5
70+
uses: actions/setup-go@v5.5.0
6871
with:
6972
go-version-file: pebble/go.mod
7073
cache-dependency-path: |
@@ -137,7 +140,7 @@ jobs:
137140
path: 'pebble'
138141

139142
- name: Set up Go
140-
uses: actions/setup-go@v5
143+
uses: actions/setup-go@v5.5.0
141144
with:
142145
go-version-file: pebble/go.mod
143146
cache-dependency-path: |

assimilate/embedassimilate/assimilate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func Run(ctx context.Context, flags *flag.FlagSet, args []string) {
7070
idx := i
7171
wg.Add(1)
7272
go func() {
73-
err := client.AutoRegister(ctx, registration, nil)
73+
err := client.AutoRegisterChan(ctx, registration, nil)
7474
wg.Done()
7575
if err != nil && !errors.Is(err, context.Cause(ctx)) {
7676
log.Printf("Error for registration #%v: %v", idx, err)

go.mod

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
module ask.systems/daemon
22

3-
go 1.20
3+
go 1.21.13
4+
5+
toolchain go1.24.0
46

57
require (
68
golang.org/x/crypto v0.33.0
79
golang.org/x/sys v0.30.0
810
google.golang.org/grpc v1.64.1
9-
google.golang.org/protobuf v1.34.2
11+
google.golang.org/protobuf v1.36.5
1012
)
1113

1214
require (

go.sum

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
2+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
23
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
34
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
45
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
@@ -11,5 +12,5 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 h1:
1112
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
1213
google.golang.org/grpc v1.64.1 h1:LKtvyfbX3UGVPFcGqJ9ItpVWW6oN/2XqTxfAnwRRXiA=
1314
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
14-
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
15-
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
15+
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
16+
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=

host/embedhost/host.go

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"os"
1414
"path/filepath"
1515
"strings"
16-
"sync"
1716

1817
_ "ask.systems/daemon/portal/flags"
1918
_ "ask.systems/daemon/tools/flags"
@@ -47,24 +46,6 @@ func Run(ctx context.Context, flags *flag.FlagSet, args []string) {
4746
"instead of hosting a server.")
4847
flags.Parse(args[1:])
4948

50-
// Setup graceful stopping
51-
ctx, cancel := context.WithCancel(context.Background())
52-
ctx = tools.ContextWithQuitSignals(ctx)
53-
wg := &sync.WaitGroup{}
54-
defer func() {
55-
cancel()
56-
wg.Wait()
57-
log.Print("Goodbye.")
58-
}()
59-
60-
reg, err := gate.AutoRegister(ctx, &gate.RegisterRequest{
61-
Pattern: *urlPath,
62-
}, wg)
63-
if err != nil {
64-
log.Print("Fatal error registering with portal:", err)
65-
return
66-
}
67-
6849
// Extract the path part of the pattern and the prefix to remove
6950
_, servePath := gate.ParsePattern(*urlPath)
7051
prefix := servePath
@@ -110,7 +91,23 @@ func Run(ctx context.Context, flags *flag.FlagSet, args []string) {
11091
log.Print("WARNING: Failed to open and stat web_root directory, we probably can't serve anything. Error: ", err)
11192
}
11293

94+
// Setup graceful stopping
95+
ctx = tools.ContextWithQuitSignals(context.Background())
96+
97+
// Register the reverse proxy pattern with portal.
98+
// Only blocks until the initial registration is done, then keeps renewing.
99+
reg, waitForUnregister, err := gate.AutoRegister(ctx, &gate.RegisterRequest{
100+
Pattern: *urlPath,
101+
})
102+
if err != nil {
103+
log.Print("Fatal error registering with portal:", err)
104+
return
105+
}
106+
// Start serving files (blocks until graceful stop is done)
113107
tools.HTTPServer(ctx.Done(), reg.Lease.Port, reg.TLSConfig, nil)
108+
// Wait for the AutoRegister background goroutine to gracefully stop
109+
<-waitForUnregister
110+
log.Print("Goodbye.")
114111
}
115112

116113
// The -hash_pasword utility

portal/embedportal/http_proxyserv.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ type httpProxy struct {
5050
type forwarder struct {
5151
Handler http.Handler
5252
Lease *gate.Lease
53-
AdminOnly bool
5453
AllowHTTP bool
5554
}
5655

portal/embedportal/port_leasor.go

Lines changed: 49 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
)
1515

1616
// How often to look through the Leases and unregister those past TTL
17-
const (
17+
var (
1818
ttlCheckFreq = leaseTTL / 100
1919
)
2020

@@ -27,8 +27,8 @@ var (
2727
type onCancelFunc func(*gate.Lease)
2828

2929
type clientLeasor struct {
30-
leasors *sync.Map
31-
nextLeasor *portLeasor
30+
leasors *sync.Map // Key: string Value: *portLeasor
31+
leasorPool *sync.Pool // Contains *portLeasor
3232

3333
startPort uint16
3434
endPort uint16
@@ -43,8 +43,7 @@ func makeClientLeasor(startPort, endPort uint16, reservedPorts map[uint16]bool,
4343
if endPort < startPort {
4444
startPort, endPort = endPort, startPort
4545
}
46-
return &clientLeasor{
47-
nextLeasor: &portLeasor{},
46+
c := &clientLeasor{
4847
leasors: &sync.Map{},
4948
onCancelMut: &sync.Mutex{},
5049

@@ -53,6 +52,27 @@ func makeClientLeasor(startPort, endPort uint16, reservedPorts map[uint16]bool,
5352
reservedPorts: reservedPorts,
5453
quit: quit,
5554
}
55+
c.leasorPool = &sync.Pool{
56+
New: func() any {
57+
return &portLeasor{
58+
mut: &sync.Mutex{},
59+
startPort: startPort,
60+
endPort: endPort,
61+
// TODO: clean up this weird system with copyOnCancel and and c.OnCancel
62+
// calling OnCancel on all the portLeasors. I think the onCancel list
63+
// should not be copied to each portLeasor and instead they should
64+
// report cancels back to the clientLeasor which should keep the only
65+
// list.
66+
//
67+
// I think leasorPool.New is garunteed to only be called when we call
68+
// Get and that is the only reason this system is safe
69+
onCancel: c.copyOnCancel(),
70+
leases: make(map[uint32][]*gate.Lease),
71+
unusedPorts: makeUnusedPorts(startPort, endPort, reservedPorts),
72+
}
73+
},
74+
}
75+
return c
5676
}
5777

5878
func leaseString(lease *gate.Lease) string {
@@ -62,22 +82,32 @@ func leaseString(lease *gate.Lease) string {
6282
}
6383

6484
func (c *clientLeasor) PortLeasorForClient(clientAddr string) *portLeasor {
65-
leasor, loaded := c.leasors.LoadOrStore(clientAddr, c.nextLeasor)
85+
// We use a sync.Pool because we have to call LoadOrStore on the leasors map,
86+
// we need to have a pointer to space on the heap every time we lookup a
87+
// client so that we can start a new leasor if one doesn't exist.
88+
//
89+
// The pool lets us save and re-use the heap space here when we find a client
90+
// we already have a leasor for.
91+
nextLeasor := c.leasorPool.Get()
92+
// This thread exclusively owns nextLeasor right now becasue we called Get and
93+
// have not stored it in the map yet.
94+
nextLeasor.(*portLeasor).clientAddr = clientAddr
95+
leasor, loaded := c.leasors.LoadOrStore(clientAddr, nextLeasor)
6696
if !loaded {
67-
// Because we have to call LoadOrStore on the leasors map, we need to have a
68-
// pointer to space on the heap every time we lookup a client so that we can
69-
// start a new leasor if one doesn't exist.
70-
//
71-
// So, save and re-use the heap space here when we find a client we already
72-
// have a leasor for.
73-
c.nextLeasor.Start(clientAddr, c.startPort, c.endPort,
74-
makeUnusedPorts(c.startPort, c.endPort, c.reservedPorts),
75-
c.quit, c.copyOnCancel())
76-
c.nextLeasor = &portLeasor{}
97+
// This same pointer might have been returned from a concurrent call before
98+
// we start this goroutine, but this will still only happen once and it's
99+
// okay if we start using leasor before this.
100+
go leasor.(*portLeasor).monitorTTLs(c.quit)
101+
} else {
102+
c.leasorPool.Put(nextLeasor)
77103
}
78104
return leasor.(*portLeasor)
79105
}
80106

107+
func (c *clientLeasor) makePortLeasor() any {
108+
return &portLeasor{}
109+
}
110+
81111
func (c *clientLeasor) copyOnCancel() []onCancelFunc {
82112
c.onCancelMut.Lock()
83113
defer c.onCancelMut.Unlock()
@@ -90,9 +120,9 @@ func (c *clientLeasor) OnCancel(cancelFunc func(*gate.Lease)) {
90120
c.onCancelMut.Lock()
91121
defer c.onCancelMut.Unlock()
92122
c.onCancel = append(c.onCancel, cancelFunc)
93-
// Since we still have the mutex lock, this covers all existing leasors. New
94-
// leasors can't be created until the mutex is released and they'll get the
95-
// new list.
123+
// Since we still have the mutex lock and copyOnCancel also uses it, this
124+
// covers all existing leasors. New leasors can't be created until the mutex
125+
// is released and they'll get the new list.
96126
c.leasors.Range(func(key, value interface{}) bool {
97127
l := value.(*portLeasor)
98128
l.OnCancel(cancelFunc)
@@ -117,19 +147,6 @@ type portLeasor struct {
117147
clientAddr string
118148
}
119149

120-
func (l *portLeasor) Start(clientAddr string, startPort, endPort uint16, unusedPorts []uint16, quit chan struct{}, onCancel []onCancelFunc) {
121-
*l = portLeasor{
122-
mut: &sync.Mutex{},
123-
clientAddr: clientAddr,
124-
startPort: startPort,
125-
endPort: endPort,
126-
onCancel: onCancel,
127-
leases: make(map[uint32][]*gate.Lease),
128-
unusedPorts: unusedPorts,
129-
}
130-
go l.monitorTTLs(quit)
131-
}
132-
133150
func (l *portLeasor) OnCancel(cancelFunc func(*gate.Lease)) {
134151
l.mut.Lock()
135152
defer l.mut.Unlock()

portal/embedportal/portal.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,17 @@ import (
2727
//go:generate protoc -I ../ ../embedportal/storage.proto --go_out ../ --go_opt=paths=source_relative
2828
//go:generate protoc -I ../ ../gate/service.proto --go_out ../ --go-grpc_out ../ --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative
2929

30-
const (
31-
leaseTTL = 24 * time.Hour
30+
var (
31+
LeaseTTL = 24 * time.Hour
32+
leaseTTL = LeaseTTL // TODO
3233
ttlRandomStagger = 0.05
3334
)
3435

3536
var kACMEAddress string
3637

3738
// TODO: actually use ctx
3839
func Run(ctx context.Context, flags *flag.FlagSet, args []string) {
40+
leaseTTL = LeaseTTL // TODO
3941
flags.Usage = func() {
4042
fmt.Fprintf(flags.Output(), ""+
4143
"Usage: %s [flags]\n"+
@@ -63,7 +65,7 @@ func Run(ctx context.Context, flags *flag.FlagSet, args []string) {
6365
flags.Name(), flags.Name())
6466
flags.PrintDefaults()
6567
}
66-
rpcPort := flags.Uint("rpc_port", 2048, ""+
68+
rpcPort := flags.Int("rpc_port", 2048, ""+
6769
"The port to bind for the portal RPC server that clients use to register\n"+
6870
"with. You shouldn't need to change this unless there's a conflict or you\n"+
6971
"run multiple instances of portal.")
@@ -198,7 +200,7 @@ func Run(ctx context.Context, flags *flag.FlagSet, args []string) {
198200
// Starts serving the rpc server port.
199201
// First loads the registrations from the state into the two proxy servers.
200202
_, err = startRPCServer(leasor,
201-
tcpProxy, httpProxy, uint16(*rpcPort),
203+
tcpProxy, httpProxy, *rpcPort,
202204
rootCert, state, quit)
203205
if err != nil {
204206
log.Fatal("Failed to start RPC server:", err)

portal/embedportal/rpcserv.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"fmt"
99
"log"
1010
"net"
11-
"strconv"
1211
"strings"
1312
"time"
1413

@@ -220,7 +219,7 @@ func (s *rpcServ) Renew(ctx context.Context, lease *gate.Lease) (*gate.Lease, er
220219
// StartNew creates a new RPCServ and starts it
221220
func startRPCServer(clientLeasor *clientLeasor,
222221
tcpProxy *tcpProxy, httpProxy *httpProxy,
223-
port uint16, rootCert *tls.Config,
222+
port int, rootCert *tls.Config,
224223
state *stateManager, quit chan struct{}) (*rpcServ, error) {
225224

226225
s := &rpcServ{
@@ -259,7 +258,7 @@ func startRPCServer(clientLeasor *clientLeasor,
259258
}),
260259
)
261260
gate.RegisterPortalServer(server, s)
262-
l, err := net.Listen("tcp", ":"+strconv.Itoa(int(port)))
261+
l, err := listenerFromPortOrFD(port)
263262
if err != nil {
264263
return nil, fmt.Errorf("Failed to start listener: %v", err)
265264
}

portal/embedportal/tls_cert.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ func (t *tlsRefresher) refreshCertFile(idx int, cert, key *os.File) (*tls.Certif
194194
newCertFile, errCert := os.Open(cert.Name())
195195
newKeyFile, errKey := os.Open(key.Name())
196196
if errCert != nil || errKey != nil {
197-
err := fmt.Errorf("Failed to reopen tls cert (%w) or key (%w) files", errCert, errKey)
197+
err := fmt.Errorf("Failed to reopen TLS cert (%w) or key (%w) files", errCert, errKey)
198198
log.Print(err.Error())
199199
newCertFile.Close()
200200
newKeyFile.Close()

0 commit comments

Comments
 (0)