Skip to content

Commit 5116855

Browse files
committed
Improve Consul routing provider
1 parent 06ed753 commit 5116855

File tree

4 files changed

+120
-18
lines changed

4 files changed

+120
-18
lines changed

README.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ Exposing dynamic backends on the static frontend ports is the bread-and-butter o
3636
- [routing.http.poll\_interval `duration`](#routinghttppoll_interval-duration)
3737
- [routing.http.poll\_timeout `duration`](#routinghttppoll_timeout-duration)
3838
- [routing.consul.address `string`](#routingconsuladdress-string)
39+
- [routing.consul.token `string`](#routingconsultoken-string)
40+
- [routing.consul.datacenter `string`](#routingconsuldatacenter-string)
41+
- [routing.consul.namespace `string`](#routingconsulnamespace-string)
42+
- [routing.consul.wait `duration`](#routingconsulwait-duration)
43+
- [routing.consul.consistency\_mode `string`](#routingconsulconsistency_mode-string)
3944
- [Formats](#formats)
4045
- [TOML](#toml)
4146
- [YAML](#yaml)
@@ -277,12 +282,15 @@ Value of Go's `runtime.GOMAXPROCS()`
277282

278283
_default: equals to `runtime.NumCPU()`_
279284

285+
---
280286
#### routing.file.path `string`
281287
Location of [routing configuration file](#schema).
282288
#### routing.file.watch `bool`
283289
Subscribe to changes made to the routing configuration file which would give you the full power of dill's dynamic routing capabilities.
284290

285291
_default: `true`_
292+
293+
---
286294
#### routing.http.endpoint `string`
287295
Endpoint which [http provider](#http) will poll for routing configuration
288296
#### routing.http.poll_interval `duration`
@@ -293,8 +301,30 @@ _default: `5s`_
293301
Maximum time [http provider](#http) will wait when fetching routing configuration
294302

295303
_default: `5s`_
304+
305+
---
296306
#### routing.consul.address `string`
297307
Consul address from which `dill` will fetch the updates and build the routing table.
308+
#### routing.consul.token `string`
309+
Token giving access to Consul API. Required ACLs `node:read,service:read`
310+
311+
_Optional_
312+
#### routing.consul.datacenter `string`
313+
Defines what datacenter will be queried when building routing table.
314+
315+
_Optional. If not provided `dill` uses Consul defaults._
316+
#### routing.consul.namespace `string`
317+
Defines what namespace will be queried when building routing table. Namespaces are available only for Consul Enterprise users.
318+
319+
_Optional. If not provided `dill` uses Consul defaults._
320+
#### routing.consul.wait `duration`
321+
Defines how long [blocking API query](https://developer.hashicorp.com/consul/api-docs/features/blocking) will wait for a potential change using long polling.
322+
323+
_Optional. If not provided `dill` uses Consul defaults._
324+
#### routing.consul.consistency_mode `string`
325+
Defines what [consistency mode](https://developer.hashicorp.com/consul/api-docs/features/consistency) to use when `dill` fetches the updates.
326+
327+
_Optional. Allowed values: `stale`, `consistent`, `leader`. If not provided `dill` uses Consul defaults._
298328
### Formats
299329
Configuration is powered by [Viper](https://github.com/spf13/viper) so it's possible to use format that suits you best.
300330

pkg/routing/consul/client.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package consul
2+
3+
import (
4+
"net/http"
5+
6+
log "github.com/sirupsen/logrus"
7+
8+
"golang.org/x/exp/slices"
9+
)
10+
11+
var validConsistencyModes = []string{"stale", "consistent", "leader"}
12+
13+
type consulConfig struct {
14+
Address string
15+
Token string
16+
Datacenter string
17+
Namespace string
18+
Wait string
19+
ConsistencyMode string `mapstructure:"consistency_mode"`
20+
}
21+
22+
func (c *consulConfig) Validate() {
23+
if c.ConsistencyMode != "" && !slices.Contains(validConsistencyModes, c.ConsistencyMode) {
24+
log.Fatal("Invalid Consul's consistency mode")
25+
}
26+
}
27+
28+
type httpClient struct {
29+
client http.Client
30+
config *consulConfig
31+
}
32+
33+
func (c *httpClient) Do(req *http.Request) (*http.Response, error) {
34+
if c.config.Token != "" {
35+
req.Header.Add("X-Consul-Token", c.config.Token)
36+
}
37+
38+
q := req.URL.Query()
39+
if c.config.Datacenter != "" {
40+
q.Add("dc", c.config.Datacenter)
41+
}
42+
if c.config.Namespace != "" {
43+
q.Add("ns", c.config.Namespace)
44+
}
45+
if c.config.ConsistencyMode != "" {
46+
q.Add(c.config.ConsistencyMode, "")
47+
}
48+
req.URL.RawQuery = q.Encode()
49+
return c.client.Do(req)
50+
}

pkg/routing/consul/consul.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@ import (
55
"fmt"
66
"io"
77
"net/http"
8+
"net/url"
89
"strconv"
910
"strings"
10-
11-
"github.com/spf13/viper"
1211
)
1312

1413
type service struct {
@@ -44,31 +43,36 @@ func (s *service) Proxy() string {
4443
return ""
4544
}
4645

47-
func fetchHealthyServices(index int) ([]string, int, error) {
48-
// TODO allow for stale reads
46+
func fetchHealthyServices(index int, consulClient *httpClient) ([]string, int, error) {
4947
req, err := http.NewRequest(
5048
"GET",
51-
viper.GetString("consul.address")+"/v1/health/state/passing",
49+
consulClient.config.Address+"/v1/health/state/passing",
5250
nil,
5351
)
5452
if err != nil {
5553
return nil, 0, err
5654
}
57-
q := req.URL.Query()
55+
q := url.Values{}
5856
q.Add("filter", "ServiceTags contains `dill`")
5957
if index <= 0 {
6058
index = 1
6159
}
6260
q.Add("index", strconv.Itoa(index))
61+
if consulClient.config.Wait != "" {
62+
q.Add("wait", consulClient.config.Wait)
63+
}
6364
req.URL.RawQuery = q.Encode()
64-
65-
c := &http.Client{}
66-
res, err := c.Do(req)
65+
res, err := consulClient.Do(req)
6766
if err != nil {
6867
return nil, 0, err
6968
}
7069
defer res.Body.Close()
7170
data, _ := io.ReadAll(res.Body)
71+
72+
if res.StatusCode != 200 {
73+
return nil, 0, fmt.Errorf("%d: %s", res.StatusCode, data)
74+
}
75+
7276
var healthyServices []struct {
7377
Name string `json:"ServiceName"`
7478
}
@@ -93,20 +97,30 @@ func fetchHealthyServices(index int) ([]string, int, error) {
9397
return unique, newIndex, nil
9498
}
9599

96-
func fetchServiceDetails(name string) ([]service, error) {
97-
res, err := http.Get(
98-
fmt.Sprintf(
99-
"%s/v1/health/service/%s?passing=true",
100-
viper.GetString("consul.address"),
101-
name,
102-
),
100+
func fetchServiceDetails(name string, consulClient *httpClient) ([]service, error) {
101+
req, err := http.NewRequest(
102+
"GET",
103+
consulClient.config.Address+"/v1/health/service/"+name,
104+
nil,
103105
)
104106
if err != nil {
105107
return nil, err
106108
}
109+
q := url.Values{}
110+
q.Add("passing", "true")
111+
req.URL.RawQuery = q.Encode()
112+
res, err := consulClient.Do(req)
113+
if err != nil {
114+
return nil, err
115+
}
116+
107117
defer res.Body.Close()
108118
data, _ := io.ReadAll(res.Body)
109119

120+
if res.StatusCode != 200 {
121+
return nil, fmt.Errorf("%d: %s", res.StatusCode, data)
122+
}
123+
110124
var parsed []struct {
111125
Node struct {
112126
Address string `json:"Address"`

pkg/routing/consul/monitor.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"time"
55

66
log "github.com/sirupsen/logrus"
7+
"github.com/spf13/viper"
78

89
"dill/pkg/proxy"
910
)
@@ -13,9 +14,16 @@ var waitTime time.Duration = 5 * time.Second
1314
// MonitorServices fetches healthy services that was tagged as `dill`
1415
func MonitorServices(c chan<- *proxy.RoutingTable) {
1516
log.Info("Starting service monitor")
17+
18+
cfg := consulConfig{}
19+
viper.UnmarshalKey("routing.consul", &cfg)
20+
cfg.Validate()
21+
22+
consulClient := &httpClient{config: &cfg}
23+
1624
index := 1
1725
for {
18-
services, newIndex, err := fetchHealthyServices(index)
26+
services, newIndex, err := fetchHealthyServices(index, consulClient)
1927
if err != nil {
2028
log.WithField("error", err).Warning("Fetching healthy services failed")
2129
time.Sleep(waitTime)
@@ -24,7 +32,7 @@ func MonitorServices(c chan<- *proxy.RoutingTable) {
2432
index = newIndex
2533
rt := &proxy.RoutingTable{Table: map[string][]proxy.Upstream{}, ConsulIndex: newIndex}
2634
for _, s := range services {
27-
details, err := fetchServiceDetails(s)
35+
details, err := fetchServiceDetails(s, consulClient)
2836
if err != nil {
2937
log.WithFields(
3038
log.Fields{"error": err, "service": s},

0 commit comments

Comments
 (0)