Skip to content

Commit f6c7734

Browse files
authored
Add health checks for liveness and readiness (#71)
This adds HTTP-based health checks at `/liveness` and `/readiness`. It also adds new CLI flags: * `--health-check` which toggles the HTTP health checks (off by default) * `--readiness-timeout` controls how long an outage needs to be before considering the proxy not ready This also moves most of the "command" code into a new file `run.go` so that the CLI is testable. Also, we're now correctly handling signals.
1 parent ec65efa commit f6c7734

File tree

6 files changed

+443
-135
lines changed

6 files changed

+443
-135
lines changed

proxy.go

Lines changed: 19 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -16,142 +16,35 @@ package main
1616

1717
import (
1818
"context"
19-
"net"
20-
"net/http"
21-
_ "net/http/pprof"
22-
"strings"
23-
"time"
19+
"os"
20+
"os/signal"
2421

25-
"github.com/datastax/cql-proxy/astra"
2622
"github.com/datastax/cql-proxy/proxy"
27-
"github.com/datastax/cql-proxy/proxycore"
28-
29-
"github.com/alecthomas/kong"
30-
"github.com/datastax/go-cassandra-native-protocol/primitive"
31-
"go.uber.org/zap"
3223
)
3324

34-
var cli struct {
35-
Bundle string `help:"Path to secure connect bundle" short:"b" env:"BUNDLE"`
36-
Username string `help:"Username to use for authentication" short:"u" env:"USERNAME"`
37-
Password string `help:"Password to use for authentication" short:"p" env:"PASSWORD"`
38-
ContactPoints []string `help:"Contact points for cluster. Ignored if using the bundle path option." short:"c" env:"CONTACT_POINTS"`
39-
ProtocolVersion string `help:"Initial protocol version to use when connecting to the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" short:"n" env:"PROTOCOL_VERSION"`
40-
MaxProtocolVersion string `help:"Max protocol version supported by the backend cluster (default: v4, options: v3, v4, v5, DSEv1, DSEv2)" short:"m" env:"MAX_PROTOCOL_VERSION"`
41-
Bind string `help:"Address to use to bind serve" short:"a" env:"BIND"`
42-
Debug bool `help:"Show debug logging" env:"DEBUG"`
43-
Profiling bool `help:"Enable profiling" env:"PROFILING"`
44-
HeartbeatInterval time.Duration `help:"Interval between performing heartbeats to the cluster" default:"30s" env:"HEARTBEAT_INTERVAL"`
45-
IdleTimeout time.Duration `help:"Time between successful heartbeats before a connection to the cluster is considered unresponsive and closed" default:"60s" env:"IDLE_TIMEOUT"`
46-
}
47-
48-
func parseProtocolVersion(s string) (version primitive.ProtocolVersion, ok bool) {
49-
ok = true
50-
lowered := strings.ToLower(s)
51-
if lowered == "3" || lowered == "v3" {
52-
version = primitive.ProtocolVersion3
53-
} else if lowered == "4" || lowered == "v4" {
54-
version = primitive.ProtocolVersion4
55-
} else if lowered == "5" || lowered == "v5" {
56-
version = primitive.ProtocolVersion5
57-
} else if lowered == "65" || lowered == "dsev1" {
58-
version = primitive.ProtocolVersionDse1
59-
} else if lowered == "66" || lowered == "dsev2" {
60-
version = primitive.ProtocolVersionDse1
61-
} else {
62-
ok = false
63-
}
64-
return version, ok
65-
}
66-
6725
func main() {
68-
cliCtx := kong.Parse(&cli)
69-
70-
var resolver proxycore.EndpointResolver
71-
72-
if len(cli.Bundle) > 0 {
73-
bundle, err := astra.LoadBundleZipFromPath(cli.Bundle)
74-
if err != nil {
75-
cliCtx.Fatalf("unable to open bundle %s: %v", cli.Bundle, err)
76-
}
77-
resolver = astra.NewResolver(bundle)
78-
} else if len(cli.ContactPoints) > 0 {
79-
resolver = proxycore.NewResolver(cli.ContactPoints...)
80-
} else {
81-
cliCtx.Fatalf("must provide either bundle path or contact points")
82-
}
83-
84-
if cli.HeartbeatInterval >= cli.IdleTimeout {
85-
cliCtx.Fatalf("idle-timeout must be greater than heartbeat-interval")
86-
}
87-
88-
version := primitive.ProtocolVersion4
89-
if len(cli.ProtocolVersion) > 0 {
90-
var ok bool
91-
if version, ok = parseProtocolVersion(cli.ProtocolVersion); !ok {
92-
cliCtx.Fatalf("unsupported protocol version: %s", cli.ProtocolVersion)
93-
}
94-
}
95-
96-
maxVersion := primitive.ProtocolVersion4
97-
if len(cli.MaxProtocolVersion) > 0 {
98-
var ok bool
99-
if maxVersion, ok = parseProtocolVersion(cli.MaxProtocolVersion); !ok {
100-
cliCtx.Fatalf("unsupported max protocol version: %s", cli.ProtocolVersion)
101-
}
102-
}
103-
104-
if version > maxVersion {
105-
cliCtx.Fatalf("default protocol version is greater than max protocol version")
106-
}
107-
108-
ctx := context.Background()
109-
110-
var logger *zap.Logger
111-
var err error
112-
if cli.Debug {
113-
logger, err = zap.NewDevelopment()
114-
} else {
115-
logger, err = zap.NewProduction()
116-
}
117-
if err != nil {
118-
cliCtx.Fatalf("unable to create logger")
119-
}
26+
ctx, cancel := signalContext(context.Background(), os.Interrupt, os.Kill)
27+
defer cancel()
12028

121-
var auth proxycore.Authenticator
122-
123-
if len(cli.Username) > 0 || len(cli.Password) > 0 {
124-
auth = proxycore.NewPasswordAuth(cli.Username, cli.Password)
125-
}
126-
127-
p := proxy.NewProxy(ctx, proxy.Config{
128-
Version: version,
129-
MaxVersion: maxVersion,
130-
Resolver: resolver,
131-
ReconnectPolicy: proxycore.NewReconnectPolicy(),
132-
NumConns: 1,
133-
Auth: auth,
134-
Logger: logger,
135-
HeartBeatInterval: cli.HeartbeatInterval,
136-
IdleTimeout: cli.IdleTimeout,
137-
})
138-
139-
bind, _, err := net.SplitHostPort(cli.Bind)
140-
if err != nil {
141-
bind = net.JoinHostPort(cli.Bind, "9042")
142-
}
29+
os.Exit(proxy.Run(ctx, os.Args[1:]))
30+
}
14331

144-
if cli.Profiling {
32+
// signalContext is a simplified version of `signal.NotifyContext()` for golang 1.15 and earlier
33+
func signalContext(parent context.Context, sig ...os.Signal) (context.Context, func()) {
34+
ctx, cancel := context.WithCancel(parent)
35+
ch := make(chan os.Signal)
36+
signal.Notify(ch, sig...)
37+
if ctx.Err() == nil {
14538
go func() {
146-
err := http.ListenAndServe("localhost:6060", nil) // Profiling
147-
if err != nil {
148-
logger.Error("unable to setup profiling", zap.Error(err))
39+
select {
40+
case <-ch:
41+
cancel()
42+
case <-ctx.Done():
14943
}
15044
}()
15145
}
152-
153-
err = p.ListenAndServe(bind)
154-
if err != nil {
155-
cliCtx.FatalIfErrorf(err)
46+
return ctx, func() {
47+
cancel()
48+
signal.Stop(ch)
15649
}
15750
}

proxy/proxy.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,14 @@ func (p *Proxy) Shutdown() error {
203203
return p.listener.Close()
204204
}
205205

206+
func (p *Proxy) Ready() bool {
207+
return true
208+
}
209+
210+
func (p *Proxy) OutageDuration() time.Duration {
211+
return p.cluster.OutageDuration()
212+
}
213+
206214
func (p *Proxy) handle(conn *net.TCPConn) {
207215
if err := conn.SetKeepAlive(false); err != nil {
208216
p.logger.Warn("failed to disable keepalive on connection", zap.Error(err))

0 commit comments

Comments
 (0)