Skip to content

Commit 7b9312d

Browse files
jmwamplemingyech
andauthored
Reload Handling for combined Reg Server (#148)
* clientconf reload for bidirectional API registrar and phantom subnet reload for processor * Fix config decoding The toml library complains about nil pointer when decoding. * Add synchronization Co-authored-by: Mingye Chen <[email protected]>
1 parent 436b0f8 commit 7b9312d

File tree

6 files changed

+117
-25
lines changed

6 files changed

+117
-25
lines changed

cmd/registration-server/main.go

Lines changed: 58 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import (
44
"flag"
55
"fmt"
66
"os"
7+
"os/signal"
78
"strconv"
89
"sync"
10+
"syscall"
911
"time"
1012

1113
"github.com/BurntSushi/toml"
@@ -36,8 +38,9 @@ type config struct {
3638
ZMQPrivateKeyPath string `toml:"zmq_privkey_path"`
3739
StationPublicKeys []string `toml:"station_pubkeys"`
3840
ClientConfPath string `toml:"clientconf_path"`
39-
LogLevel string `toml:"log_level"`
40-
LogMetricsInterval uint16 `toml:"log_metrics_interval"`
41+
latestClientConf *pb.ClientConf
42+
LogLevel string `toml:"log_level"`
43+
LogMetricsInterval uint16 `toml:"log_metrics_interval"`
4144
}
4245

4346
// parseClientConf parse the latest ClientConf based on path file
@@ -108,6 +111,27 @@ func readKeyAndEncode(path string) (string, error) {
108111
return privkey, nil
109112
}
110113

114+
// loadConfig is intended to re-parse portions of the config in conjunction with
115+
// setupReloadHandler. This is specifically for settings where we do not want to
116+
// restart the station. This is not intended to be a full re-build of the
117+
// station (i.e. auth, workers, and loglevel are not changed), Mostly this
118+
// should allow us to dynamically reload when there is an update to the latest
119+
// client configuration or the phantom subnets that we select from.
120+
func loadConfig(configPath string) (*config, error) {
121+
conf := &config{}
122+
_, err := toml.DecodeFile(configPath, conf)
123+
if err != nil {
124+
return nil, err
125+
}
126+
127+
conf.latestClientConf, err = parseClientConf(conf.ClientConfPath)
128+
if err != nil {
129+
return nil, err
130+
}
131+
132+
return conf, nil
133+
}
134+
111135
func main() {
112136
var configPath string
113137

@@ -126,10 +150,9 @@ func main() {
126150

127151
log.SetFormatter(logFormatter)
128152

129-
var conf config
130-
_, err := toml.DecodeFile(configPath, &conf)
153+
conf, err := loadConfig(configPath)
131154
if err != nil {
132-
log.Fatalf("Error in reading config file: %v", err)
155+
log.Fatalf("error occurred while parsing config: %v", err)
133156
}
134157

135158
logClientIP, err := strconv.ParseBool(os.Getenv("LOG_CLIENT_IP"))
@@ -166,27 +189,50 @@ func main() {
166189
log.Fatal(err)
167190
}
168191

169-
latestClientConf, err := parseClientConf(conf.ClientConfPath)
170-
if err != nil {
171-
log.Fatal(err)
172-
}
173-
174192
dnsPrivKey, err := readKey(conf.DNSPrivkeyPath)
175193
if err != nil {
176194
log.Fatal(err)
177195
}
178196

179-
dnsRegServer, err := dnsregserver.NewDNSRegServer(conf.Domain, conf.DNSListenAddr, dnsPrivKey, processor, latestClientConf.GetGeneration(), log.WithField("registrar", "DNS"), metrics)
197+
dnsRegServer, err := dnsregserver.NewDNSRegServer(conf.Domain, conf.DNSListenAddr, dnsPrivKey, processor, conf.latestClientConf.GetGeneration(), log.WithField("registrar", "DNS"), metrics)
180198
if err != nil {
181199
log.Fatal(err)
182200
}
183201

184-
apiRegServer, err := apiregserver.NewAPIRegServer(conf.APIPort, processor, latestClientConf, log.WithField("registrar", "API"), logClientIP, metrics)
202+
apiRegServer, err := apiregserver.NewAPIRegServer(conf.APIPort, processor, conf.latestClientConf, log.WithField("registrar", "API"), logClientIP, metrics)
185203
if err != nil {
186204
log.Fatal(err)
187205
}
188206

189207
regServers := []regServer{dnsRegServer, apiRegServer}
190208

209+
signalChan := make(chan os.Signal, 1)
210+
211+
signal.Notify(
212+
signalChan,
213+
syscall.SIGHUP, // listen for SIGHUP as reload signal
214+
)
215+
216+
// spawn a goroutine to handle os signals continuously
217+
go func() {
218+
for {
219+
sig := <-signalChan
220+
221+
if sig == syscall.SIGHUP {
222+
conf, err = loadConfig(configPath)
223+
if err != nil {
224+
log.Errorf("error occurred while reloading config -- aborting reload: %v", err)
225+
} else {
226+
err := processor.ReloadSubnets()
227+
if err != nil {
228+
log.Errorf("failed to reload phantom subnets - aborting reload: %v", err)
229+
}
230+
apiRegServer.NewClientConf(conf.latestClientConf)
231+
dnsRegServer.UpdateLatestCCGen(conf.latestClientConf.GetGeneration())
232+
}
233+
}
234+
}
235+
}()
236+
191237
run(regServers)
192238
}

pkg/apiregserver/apiregserver.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net"
99
"net/http"
1010
"strings"
11+
"sync"
1112

1213
"github.com/gorilla/mux"
1314
"github.com/refraction-networking/conjure/pkg/metrics"
@@ -25,6 +26,7 @@ type registrar interface {
2526
type APIRegServer struct {
2627
apiPort uint16
2728
latestClientConf *pb.ClientConf // Latest clientConf for sharing over RegistrationResponse channel.
29+
ccMutex sync.RWMutex
2830
processor registrar
2931
logger log.FieldLogger
3032
logClientIP bool
@@ -190,6 +192,9 @@ func (s *APIRegServer) registerBidirectional(w http.ResponseWriter, r *http.Requ
190192
// Use this function in registerBidirectional, if the returned ClientConfig is
191193
// not nil add it to the RegistrationResponse.
192194
func (s *APIRegServer) compareClientConfGen(genNum uint32) *pb.ClientConf {
195+
s.ccMutex.RLock()
196+
defer s.ccMutex.RUnlock()
197+
193198
// Check that server has a currnet (latest) client config
194199
if s.latestClientConf == nil {
195200
s.logger.Debugf("Server latest ClientConf is nil")
@@ -228,6 +233,14 @@ func parseIP(addrPort string) *net.IP {
228233

229234
}
230235

236+
func (s *APIRegServer) NewClientConf(c *pb.ClientConf) {
237+
if c != nil {
238+
s.ccMutex.Lock()
239+
defer s.ccMutex.Unlock()
240+
s.latestClientConf = c
241+
}
242+
}
243+
231244
func (s *APIRegServer) ListenAndServe() error {
232245
r := mux.NewRouter()
233246
r.HandleFunc("/register", s.register)
@@ -247,6 +260,7 @@ func NewAPIRegServer(apiPort uint16, regprocessor *regprocessor.RegProcessor, la
247260
apiPort: apiPort,
248261
processor: regprocessor,
249262
latestClientConf: latestCC,
263+
ccMutex: sync.RWMutex{},
250264
logger: logger,
251265
logClientIP: logClientIP,
252266
metrics: metrics,

pkg/dnsregserver/dnsregserver.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package dnsregserver
33
import (
44
"encoding/hex"
55
"errors"
6+
"sync/atomic"
67

78
"github.com/refraction-networking/conjure/pkg/metrics"
89
"github.com/refraction-networking/conjure/pkg/regprocessor"
@@ -70,7 +71,7 @@ func (s *DNSRegServer) processRequest(reqIn []byte) ([]byte, error) {
7071
reqLogger.Tracef("Request received: [%+v]", c2sPayload)
7172

7273
clientconfOutdated := false
73-
if c2sPayload.RegistrationPayload.GetDecoyListGeneration() < s.latestCCGen {
74+
if c2sPayload.RegistrationPayload.GetDecoyListGeneration() < atomic.LoadUint32(&s.latestCCGen) {
7475
clientconfOutdated = true
7576
}
7677

@@ -117,3 +118,8 @@ func (s *DNSRegServer) processRequest(reqIn []byte) ([]byte, error) {
117118
func (f *DNSRegServer) Close() error {
118119
return f.dnsResponder.Close()
119120
}
121+
122+
// Close closes the underlying dns responder.
123+
func (f *DNSRegServer) UpdateLatestCCGen(gen uint32) {
124+
atomic.StoreUint32(&f.latestCCGen, gen)
125+
}

pkg/regprocessor/regprocessor.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,11 @@ type ipSelector interface {
4444

4545
// RegProcessor provides an interface to publish registrations and helper functions to process registration requests
4646
type RegProcessor struct {
47-
zmqMutex sync.Mutex
48-
ipSelector ipSelector
49-
sock zmqSender
50-
metrics *metrics.Metrics
47+
zmqMutex sync.Mutex
48+
selectorMutex sync.RWMutex
49+
ipSelector ipSelector
50+
sock zmqSender
51+
metrics *metrics.Metrics
5152
}
5253

5354
// NewRegProcessor initialize a new RegProcessor
@@ -76,10 +77,11 @@ func NewRegProcessor(zmqBindAddr string, zmqPort uint16, privkey string, authVer
7677
}
7778

7879
return &RegProcessor{
79-
zmqMutex: sync.Mutex{},
80-
ipSelector: phantomSelector,
81-
sock: sock,
82-
metrics: metrics,
80+
zmqMutex: sync.Mutex{},
81+
selectorMutex: sync.RWMutex{},
82+
ipSelector: phantomSelector,
83+
sock: sock,
84+
metrics: metrics,
8385
}, nil
8486
}
8587

@@ -101,10 +103,11 @@ func NewRegProcessorNoAuth(zmqBindAddr string, zmqPort uint16, metrics *metrics.
101103
}
102104

103105
return &RegProcessor{
104-
zmqMutex: sync.Mutex{},
105-
ipSelector: phantomSelector,
106-
sock: sock,
107-
metrics: metrics,
106+
zmqMutex: sync.Mutex{},
107+
selectorMutex: sync.RWMutex{},
108+
ipSelector: phantomSelector,
109+
sock: sock,
110+
metrics: metrics,
108111
}, nil
109112
}
110113

@@ -173,6 +176,8 @@ func (p *RegProcessor) processBdReq(c2sPayload *pb.C2SWrapper) (*pb.Registration
173176
}
174177

175178
if *c2sPayload.RegistrationPayload.V4Support {
179+
p.selectorMutex.RLock()
180+
defer p.selectorMutex.RUnlock()
176181
phantom4, err := p.ipSelector.Select(
177182
cjkeys.DarkDecoySeed,
178183
uint(c2sPayload.GetRegistrationPayload().GetDecoyListGeneration()), //generation type uint
@@ -190,6 +195,8 @@ func (p *RegProcessor) processBdReq(c2sPayload *pb.C2SWrapper) (*pb.Registration
190195
}
191196

192197
if *c2sPayload.RegistrationPayload.V6Support {
198+
p.selectorMutex.RLock()
199+
defer p.selectorMutex.RUnlock()
193200
phantom6, err := p.ipSelector.Select(
194201
cjkeys.DarkDecoySeed,
195202
uint(c2sPayload.GetRegistrationPayload().GetDecoyListGeneration()),
@@ -254,3 +261,16 @@ func (p *RegProcessor) processC2SWrapper(c2sPayload *pb.C2SWrapper, clientAddr [
254261

255262
return proto.Marshal(payload)
256263
}
264+
265+
func (p *RegProcessor) ReloadSubnets() error {
266+
phantomSelector, err := lib.GetPhantomSubnetSelector()
267+
if err != nil {
268+
return err
269+
}
270+
271+
p.selectorMutex.Lock()
272+
defer p.selectorMutex.Unlock()
273+
p.ipSelector = phantomSelector
274+
275+
return nil
276+
}

sysconfig/conjure-app.service

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ EnvironmentFile=/opt/conjure/sysconfig/conjure.conf
1818
ExecStartPre=/bin/sleep 10
1919
ExecStart=/opt/conjure/application/application
2020

21+
# send SIGHUP to the station process
22+
ExecReload=/bin/kill -HUP $MAINPID
23+
2124
# on stop processes will get SIGTERM, and after 10 secs - SIGKILL (default 90)
2225
TimeoutStopSec=10
2326

sysconfig/conjure-registration-server.service

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ EnvironmentFile=/opt/conjure/sysconfig/conjure.conf
1111

1212
ExecStart=/opt/conjure/cmd/registration-server/registration-server --config /opt/conjure/cmd/registration-server/config.toml
1313

14+
# send SIGHUP to the registration server process
15+
ExecReload=/bin/kill -HUP $MAINPID
16+
1417
# on stop processes will get SIGTERM, and after 10 secs - SIGKILL (default 90)
1518
TimeoutStopSec=10
1619

0 commit comments

Comments
 (0)