From 8fc7c83ddfde77a89793626ebd3d0cbcebd8416e Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Mon, 29 Aug 2022 08:30:01 -0700 Subject: [PATCH 1/6] Proxy envoy ADS requests to Consul server --- go.mod | 6 ++-- go.sum | 36 +++++++++++++++++--- pkg/consuldp/bootstrap.go | 11 +++--- pkg/consuldp/consul_dataplane.go | 12 +++++++ pkg/consuldp/grpc.go | 58 ++++++++++++++++++++++++++++++++ 5 files changed, 112 insertions(+), 11 deletions(-) create mode 100644 pkg/consuldp/grpc.go diff --git a/go.mod b/go.mod index 221a1a95..3c73770a 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/hashicorp/consul-dataplane go 1.18 require ( + github.com/adamthesax/grpc-proxy v0.0.0-20220525203857-13e92d14f87a github.com/hashicorp/go-hclog v1.2.2 github.com/hashicorp/go-netaddrs v0.0.0-20220509001840-90ed9d26ec46 github.com/mitchellh/mapstructure v1.5.0 @@ -20,8 +21,9 @@ require ( github.com/mattn/go-isatty v0.0.14 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.4.0 // indirect - golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect - golang.org/x/sys v0.0.0-20220624220833-87e55d714810 // indirect + golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 043581ac..ab185158 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/adamthesax/grpc-proxy v0.0.0-20220525203857-13e92d14f87a h1:8fjfNnk9RLn3F4R4XEljSOZARy1+h1f0KTh6xGFefjw= +github.com/adamthesax/grpc-proxy v0.0.0-20220525203857-13e92d14f87a/go.mod h1:Aku9EjGILrB1V88F+yfJ8CaIVaKqDeWkW2vkCbY2WSA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -37,6 +39,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -54,6 +57,7 @@ github.com/hashicorp/go-hclog v1.2.2 h1:ihRI7YFwcZdiSD7SIenIhHfQH3OuDvWerAUBZbeQ github.com/hashicorp/go-hclog v1.2.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-netaddrs v0.0.0-20220509001840-90ed9d26ec46 h1:BysEAd6g+0HNJ0v99u7KbSObjzxC7rfVQ6yVx6HxrvU= github.com/hashicorp/go-netaddrs v0.0.0-20220509001840-90ed9d26ec46/go.mod h1:TjKbv4FhIra0YJ82mws5+4QXOhzv09eAWs4jtOBI4IU= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= @@ -75,30 +79,42 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= +golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e h1:TsQ7F31D3bUCLeqPT0u+yjp1guoArKaNKmCr22PYgTQ= -golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -108,17 +124,21 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210315160823-c6e025ad8005/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210331175145-43e1dd70ce54/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220624220833-87e55d714810 h1:rHZQSjJdAI4Xf5Qzeh2bBc5YJIkPFVM6oDtMFYmgws0= -golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64 h1:UiNENfZ8gDvpiWw7IpOMQ27spWmThO1RwwdQVbJahJM= +golang.org/x/sys v0.0.0-20220825204002-c680a09ffe64/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -126,6 +146,11 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -134,6 +159,7 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20210401141331-865547bb08e2/go.mod h1:9lPAdzaEmUacj36I+k7YKbEc5CXzPIeORRgDAUOu28A= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f h1:hJ/Y5SqPXbarffmAsApliUlcvMU+wScNGfyop4bZm8o= google.golang.org/genproto v0.0.0-20220624142145-8cd45d7dbd1f/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -142,6 +168,7 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.47.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= @@ -169,3 +196,4 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= diff --git a/pkg/consuldp/bootstrap.go b/pkg/consuldp/bootstrap.go index 6552d5c5..374da051 100644 --- a/pkg/consuldp/bootstrap.go +++ b/pkg/consuldp/bootstrap.go @@ -6,6 +6,7 @@ import ( "net" "os" "strconv" + "strings" "github.com/mitchellh/mapstructure" @@ -24,6 +25,9 @@ const ( // bootstrapConfig generates the Envoy bootstrap config in JSON format. func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) { + cdpFullAddr := strings.Split(cdp.gRPCListener.Addr().String(), ":") + cdpAddr := cdpFullAddr[0] + cdpPort := cdpFullAddr[1] svc := cdp.cfg.Service envoy := cdp.cfg.Envoy @@ -50,12 +54,9 @@ func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) args := &bootstrap.BootstrapTplArgs{ GRPC: bootstrap.GRPC{ - // TODO(NET-99): This should be a listener on the consul-dataplane process - // that proxies streams to the server, handles load-balancing, SDS etc. - // // For now we just give the server address directly. - AgentAddress: cdp.consulServer.address.String(), - AgentPort: strconv.Itoa(cdp.cfg.Consul.GRPCPort), + AgentAddress: cdpAddr, + AgentPort: cdpPort, AgentTLS: false, }, ProxyCluster: rsp.Service, diff --git a/pkg/consuldp/consul_dataplane.go b/pkg/consuldp/consul_dataplane.go index bdfe9cb9..2351d15b 100644 --- a/pkg/consuldp/consul_dataplane.go +++ b/pkg/consuldp/consul_dataplane.go @@ -24,6 +24,8 @@ type consulServer struct { address net.IPAddr // supportedFeatures is a map of the dataplane features supported by the Consul server supportedFeatures map[pbdataplane.DataplaneFeatures]bool + + grpcClientConn *grpc.ClientConn } // ConsulDataplane represents the consul-dataplane process @@ -32,6 +34,9 @@ type ConsulDataplane struct { cfg *Config consulServer *consulServer dpServiceClient pbdataplane.DataplaneServiceClient + + gRPCListener net.Listener + gRPCServer *grpc.Server } // NewConsulDP creates a new instance of ConsulDataplane @@ -132,6 +137,7 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { return err } defer grpcClientConn.Close() + cdp.consulServer.grpcClientConn = grpcClientConn cdp.logger.Info("connected to consul server over grpc", "grpc-target", gRPCTarget) dpservice := pbdataplane.NewDataplaneServiceClient(grpcClientConn) @@ -144,6 +150,12 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { return fmt.Errorf("failed to set supported features: %w", err) } + err = cdp.setupGRPCServer() + if err != nil { + return err + } + go cdp.startGRPCServer() + cfg, err := cdp.bootstrapConfig(ctx) if err != nil { cdp.logger.Error("failed to get bootstrap config", "error", err) diff --git a/pkg/consuldp/grpc.go b/pkg/consuldp/grpc.go new file mode 100644 index 00000000..f83404d1 --- /dev/null +++ b/pkg/consuldp/grpc.go @@ -0,0 +1,58 @@ +package consuldp + +import ( + "context" + "fmt" + "net" + "strings" + + "github.com/adamthesax/grpc-proxy/proxy" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const metadataKeyToken = "x-consul-token" + +func (cdp *ConsulDataplane) setupGRPCServer() error { + cdp.logger.Trace("setting up gRPC server") + // create gRPC listener + lis, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + cdp.logger.Error("failed to create gRPC/TCP listener: %v", err) + return err + } + cdp.gRPCListener = lis + + // create gRPC server + // one main role of this gRPC server in consul-dataplane is to proxy envoy ADS requests + // to the connected Consul server. + director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + md, _ := metadata.FromIncomingContext(ctx) + mdCopy := md.Copy() + // TODO (NET-148): Inject the ACL token acquired from the server discovery library + mdCopy[metadataKeyToken] = []string{cdp.cfg.Consul.Credentials.Static.Token} + outCtx := metadata.NewOutgoingContext(ctx, mdCopy) + if !strings.Contains(fullMethodName, "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources") { + return outCtx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName)) + } + // TODO (NET-148): Ensure the server connection here is the one acquired via the server discovery library + return outCtx, cdp.consulServer.grpcClientConn, nil + } + gRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) + cdp.gRPCServer = gRPCServer + + cdp.logger.Info("created gRPC server", "address", lis.Addr().String()) + return nil +} + +func (cdp *ConsulDataplane) startGRPCServer() { + cdp.logger.Trace("starting gRPC server") + + if err := cdp.gRPCServer.Serve(cdp.gRPCListener); err != nil { + cdp.logger.Error("failed to serve gRPC requests: %v", err) + cdp.gRPCListener.Close() + // TODO: gracefully exit + } +} From dc8c3498259947d4ae97470dca5a91785d5089a1 Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Fri, 2 Sep 2022 12:17:24 -0700 Subject: [PATCH 2/6] Start,stop,exit the grpc server gracefully --- Dockerfile | 2 +- pkg/consuldp/bootstrap.go | 11 +++++------ pkg/consuldp/consul_dataplane.go | 19 +++++++++++++++--- pkg/consuldp/grpc.go | 34 ++++++++++++++++++++------------ 4 files changed, 43 insertions(+), 23 deletions(-) diff --git a/Dockerfile b/Dockerfile index b2d753ea..cf453327 100644 --- a/Dockerfile +++ b/Dockerfile @@ -25,4 +25,4 @@ WORKDIR /root/ RUN apk add gcompat COPY --from=consul-dataplane-binary /cdp/consul-dataplane /usr/local/bin/consul-dataplane COPY --from=envoy-binary /usr/local/bin/envoy /usr/local/bin/envoy -ENTRYPOINT [ "./consul-dataplane" ] +ENTRYPOINT [ "consul-dataplane" ] diff --git a/pkg/consuldp/bootstrap.go b/pkg/consuldp/bootstrap.go index 374da051..91931918 100644 --- a/pkg/consuldp/bootstrap.go +++ b/pkg/consuldp/bootstrap.go @@ -25,9 +25,9 @@ const ( // bootstrapConfig generates the Envoy bootstrap config in JSON format. func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) { - cdpFullAddr := strings.Split(cdp.gRPCListener.Addr().String(), ":") - cdpAddr := cdpFullAddr[0] - cdpPort := cdpFullAddr[1] + cdpGRPCFullAddr := strings.Split(cdp.gRPCServer.listener.Addr().String(), ":") + cdpGRPCAddr := cdpGRPCFullAddr[0] + cdpGRPCPort := cdpGRPCFullAddr[1] svc := cdp.cfg.Service envoy := cdp.cfg.Envoy @@ -54,9 +54,8 @@ func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) args := &bootstrap.BootstrapTplArgs{ GRPC: bootstrap.GRPC{ - // For now we just give the server address directly. - AgentAddress: cdpAddr, - AgentPort: cdpPort, + AgentAddress: cdpGRPCAddr, + AgentPort: cdpGRPCPort, AgentTLS: false, }, ProxyCluster: rsp.Service, diff --git a/pkg/consuldp/consul_dataplane.go b/pkg/consuldp/consul_dataplane.go index 2351d15b..da7c68e2 100644 --- a/pkg/consuldp/consul_dataplane.go +++ b/pkg/consuldp/consul_dataplane.go @@ -25,18 +25,23 @@ type consulServer struct { // supportedFeatures is a map of the dataplane features supported by the Consul server supportedFeatures map[pbdataplane.DataplaneFeatures]bool + // grpcClientConn is the gRPC connection to the Consul server grpcClientConn *grpc.ClientConn } +type gRPCServer struct { + listener net.Listener + server *grpc.Server + exitedCh chan struct{} +} + // ConsulDataplane represents the consul-dataplane process type ConsulDataplane struct { logger hclog.Logger cfg *Config consulServer *consulServer dpServiceClient pbdataplane.DataplaneServiceClient - - gRPCListener net.Listener - gRPCServer *grpc.Server + gRPCServer *gRPCServer } // NewConsulDP creates a new instance of ConsulDataplane @@ -137,6 +142,7 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { return err } defer grpcClientConn.Close() + // TODO (NET-148): Ensure the server connection here is the one acquired via the server discovery library cdp.consulServer.grpcClientConn = grpcClientConn cdp.logger.Info("connected to consul server over grpc", "grpc-target", gRPCTarget) @@ -184,9 +190,16 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { if err := proxy.Stop(); err != nil { cdp.logger.Error("failed to stop proxy", "error", err) } + cdp.stopGRPCServer() doneCh <- nil case <-proxy.Exited(): + cdp.stopGRPCServer() doneCh <- errors.New("envoy proxy exited unexpectedly") + case <-cdp.gRPCServerExited(): + if err := proxy.Stop(); err != nil { + cdp.logger.Error("failed to stop proxy", "error", err) + } + doneCh <- errors.New("gRPC server exited unexpectedly") } }() return <-doneCh diff --git a/pkg/consuldp/grpc.go b/pkg/consuldp/grpc.go index f83404d1..b68d5792 100644 --- a/pkg/consuldp/grpc.go +++ b/pkg/consuldp/grpc.go @@ -23,36 +23,44 @@ func (cdp *ConsulDataplane) setupGRPCServer() error { cdp.logger.Error("failed to create gRPC/TCP listener: %v", err) return err } - cdp.gRPCListener = lis // create gRPC server // one main role of this gRPC server in consul-dataplane is to proxy envoy ADS requests // to the connected Consul server. director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + if !strings.Contains(fullMethodName, "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources") { + return ctx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName)) + } + md, _ := metadata.FromIncomingContext(ctx) mdCopy := md.Copy() // TODO (NET-148): Inject the ACL token acquired from the server discovery library mdCopy[metadataKeyToken] = []string{cdp.cfg.Consul.Credentials.Static.Token} outCtx := metadata.NewOutgoingContext(ctx, mdCopy) - if !strings.Contains(fullMethodName, "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources") { - return outCtx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName)) - } - // TODO (NET-148): Ensure the server connection here is the one acquired via the server discovery library return outCtx, cdp.consulServer.grpcClientConn, nil } - gRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) - cdp.gRPCServer = gRPCServer + newGRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) - cdp.logger.Info("created gRPC server", "address", lis.Addr().String()) + cdp.gRPCServer = &gRPCServer{listener: lis, server: newGRPCServer, exitedCh: make(chan struct{})} + cdp.logger.Trace("created gRPC server", "address", lis.Addr().String()) return nil } func (cdp *ConsulDataplane) startGRPCServer() { - cdp.logger.Trace("starting gRPC server") + cdp.logger.Info("starting gRPC server", "address", cdp.gRPCServer.listener.Addr().String()) - if err := cdp.gRPCServer.Serve(cdp.gRPCListener); err != nil { - cdp.logger.Error("failed to serve gRPC requests: %v", err) - cdp.gRPCListener.Close() - // TODO: gracefully exit + if err := cdp.gRPCServer.server.Serve(cdp.gRPCServer.listener); err != nil { + cdp.logger.Error("failed to serve gRPC requests", "error", err) + cdp.gRPCServer.listener.Close() + close(cdp.gRPCServer.exitedCh) } } + +func (cdp *ConsulDataplane) stopGRPCServer() { + if cdp.gRPCServer != nil { + cdp.logger.Debug("stopping gRPC server") + cdp.gRPCServer.server.Stop() + } +} + +func (cdp *ConsulDataplane) gRPCServerExited() chan struct{} { return cdp.gRPCServer.exitedCh } From 4dd7652c58c25f96fcf27f318498700455ad0dbd Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Mon, 5 Sep 2022 16:27:47 -0700 Subject: [PATCH 3/6] Pass flags, add tests, cleanup and polish --- cmd/consul-dataplane/main.go | 10 ++ pkg/consuldp/bootstrap.go | 25 +-- pkg/consuldp/bootstrap_test.go | 87 +++++++-- pkg/consuldp/config.go | 9 + pkg/consuldp/consul_dataplane.go | 64 +++++-- pkg/consuldp/consul_dataplane_test.go | 60 +++++++ pkg/consuldp/grpc.go | 66 ------- .../testdata/TestBootstrapConfig/basic.golden | 2 +- .../central-telemetry-config.golden | 2 +- .../local-unix-socket-xds-server.golden | 169 +++++++++++++++++ .../local-xds-server.golden | 170 ++++++++++++++++++ .../TestBootstrapConfig/ready-listener.golden | 2 +- pkg/consuldp/xds.go | 100 +++++++++++ pkg/consuldp/xds_test.go | 124 +++++++++++++ 14 files changed, 777 insertions(+), 113 deletions(-) delete mode 100644 pkg/consuldp/grpc.go create mode 100644 pkg/consuldp/testdata/TestBootstrapConfig/local-unix-socket-xds-server.golden create mode 100644 pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden create mode 100644 pkg/consuldp/xds.go create mode 100644 pkg/consuldp/xds_test.go diff --git a/cmd/consul-dataplane/main.go b/cmd/consul-dataplane/main.go index a67caea6..5bb6d174 100644 --- a/cmd/consul-dataplane/main.go +++ b/cmd/consul-dataplane/main.go @@ -32,6 +32,9 @@ var ( adminBindPort int readyBindAddr string readyBindPort int + + xdsBindAddr string + xdsBindPort int ) func init() { @@ -63,6 +66,9 @@ func init() { flag.IntVar(&adminBindPort, "envoy-admin-bind-port", 19000, "The port on which the Envoy admin server will be available.") flag.StringVar(&readyBindAddr, "envoy-ready-bind-address", "", "The address on which Envoy's readiness probe will be available.") flag.IntVar(&readyBindPort, "envoy-ready-bind-port", 0, "The port on which Envoy's readiness probe will be available.") + + flag.StringVar(&xdsBindAddr, "xds-bind-addr", "127.0.0.1", "The address on which the envoy xDS server will be available.") + flag.IntVar(&xdsBindPort, "xds-bind-port", 0, "The port on which the envoy xDS server will be available.") } // validateFlags performs semantic validation of the flag values @@ -111,6 +117,10 @@ func main() { ReadyBindAddress: readyBindAddr, ReadyBindPort: readyBindPort, }, + XDSServer: &consuldp.XDSServer{ + BindAddress: xdsBindAddr, + BindPort: xdsBindPort, + }, } consuldpInstance, err := consuldp.NewConsulDP(consuldpCfg) if err != nil { diff --git a/pkg/consuldp/bootstrap.go b/pkg/consuldp/bootstrap.go index 91931918..c55dd0fe 100644 --- a/pkg/consuldp/bootstrap.go +++ b/pkg/consuldp/bootstrap.go @@ -25,9 +25,6 @@ const ( // bootstrapConfig generates the Envoy bootstrap config in JSON format. func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) { - cdpGRPCFullAddr := strings.Split(cdp.gRPCServer.listener.Addr().String(), ":") - cdpGRPCAddr := cdpGRPCFullAddr[0] - cdpGRPCPort := cdpGRPCFullAddr[1] svc := cdp.cfg.Service envoy := cdp.cfg.Envoy @@ -54,8 +51,8 @@ func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) args := &bootstrap.BootstrapTplArgs{ GRPC: bootstrap.GRPC{ - AgentAddress: cdpGRPCAddr, - AgentPort: cdpGRPCPort, + AgentAddress: cdp.cfg.XDSServer.BindAddress, + AgentPort: strconv.Itoa(cdp.cfg.XDSServer.BindPort), AgentTLS: false, }, ProxyCluster: rsp.Service, @@ -66,11 +63,19 @@ func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) AdminBindAddress: envoy.AdminBindAddress, AdminBindPort: strconv.Itoa(envoy.AdminBindPort), LocalAgentClusterName: localClusterName, - // TODO(NET-148): Support login via an ACL auth-method. - Token: cdp.cfg.Consul.Credentials.Static.Token, - Namespace: rsp.Namespace, - Partition: rsp.Partition, - Datacenter: rsp.Datacenter, + Namespace: rsp.Namespace, + Partition: rsp.Partition, + Datacenter: rsp.Datacenter, + } + + if cdp.localXDSServer != nil && cdp.localXDSServer.enabled { + if cdp.localXDSServer.listenerNetwork == "unix" { + args.GRPC.AgentSocket = cdp.localXDSServer.listenerAddress + } else { + xdsServerFullAddr := strings.Split(cdp.localXDSServer.listenerAddress, ":") + args.GRPC.AgentAddress = xdsServerFullAddr[0] + args.GRPC.AgentPort = xdsServerFullAddr[1] + } } var bootstrapConfig bootstrap.BootstrapConfig diff --git a/pkg/consuldp/bootstrap_test.go b/pkg/consuldp/bootstrap_test.go index 892da7f3..724be3c3 100644 --- a/pkg/consuldp/bootstrap_test.go +++ b/pkg/consuldp/bootstrap_test.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "flag" - "net" + "fmt" "os" "os/exec" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/mock" @@ -24,8 +25,9 @@ var ( func TestBootstrapConfig(t *testing.T) { const ( - serverAddr = "1.2.3.4" - nodeName = "agentless-node" + nodeName = "agentless-node" + xdsBindPort = 1234 + socketPath = "/var/run/xds.sock" ) makeStruct := func(kv map[string]any) *structpb.Struct { @@ -42,11 +44,6 @@ func TestBootstrapConfig(t *testing.T) { &Config{ Consul: &ConsulConfig{ GRPCPort: 1234, - Credentials: &CredentialsConfig{ - Static: &StaticCredentialsConfig{ - Token: "some-acl-token", - }, - }, }, Service: &ServiceConfig{ ServiceID: "web-proxy", @@ -59,6 +56,7 @@ func TestBootstrapConfig(t *testing.T) { Telemetry: &TelemetryConfig{ UseCentralConfig: false, }, + XDSServer: &XDSServer{BindAddress: "1.2.3.4", BindPort: xdsBindPort}, }, &pbdataplane.GetEnvoyBootstrapParamsResponse{ Service: "web", @@ -72,11 +70,6 @@ func TestBootstrapConfig(t *testing.T) { &Config{ Consul: &ConsulConfig{ GRPCPort: 1234, - Credentials: &CredentialsConfig{ - Static: &StaticCredentialsConfig{ - Token: "some-acl-token", - }, - }, }, Service: &ServiceConfig{ ServiceID: "web-proxy", @@ -89,6 +82,7 @@ func TestBootstrapConfig(t *testing.T) { Telemetry: &TelemetryConfig{ UseCentralConfig: true, }, + XDSServer: &XDSServer{BindAddress: "1.2.3.4", BindPort: xdsBindPort}, }, &pbdataplane.GetEnvoyBootstrapParamsResponse{ Service: "web", @@ -102,11 +96,6 @@ func TestBootstrapConfig(t *testing.T) { &Config{ Consul: &ConsulConfig{ GRPCPort: 1234, - Credentials: &CredentialsConfig{ - Static: &StaticCredentialsConfig{ - Token: "some-acl-token", - }, - }, }, Service: &ServiceConfig{ ServiceID: "web-proxy", @@ -121,12 +110,65 @@ func TestBootstrapConfig(t *testing.T) { Telemetry: &TelemetryConfig{ UseCentralConfig: false, }, + XDSServer: &XDSServer{BindAddress: "1.2.3.4", BindPort: xdsBindPort}, }, &pbdataplane.GetEnvoyBootstrapParamsResponse{ Service: "web", NodeName: nodeName, }, }, + "local-xds-server": { + &Config{ + Consul: &ConsulConfig{ + GRPCPort: 1234, + }, + Service: &ServiceConfig{ + ServiceID: "web-proxy", + NodeName: nodeName, + }, + Envoy: &EnvoyConfig{ + AdminBindAddress: "127.0.0.1", + AdminBindPort: 19000, + }, + Telemetry: &TelemetryConfig{ + UseCentralConfig: false, + }, + XDSServer: &XDSServer{BindAddress: "127.0.0.1", BindPort: xdsBindPort}, + }, + &pbdataplane.GetEnvoyBootstrapParamsResponse{ + Service: "web", + NodeName: nodeName, + Config: makeStruct(map[string]any{ + "envoy_dogstatsd_url": "this-should-not-appear-in-generated-config", + }), + }, + }, + "local-unix-socket-xds-server": { + &Config{ + Consul: &ConsulConfig{ + GRPCPort: 1234, + }, + Service: &ServiceConfig{ + ServiceID: "web-proxy", + NodeName: nodeName, + }, + Envoy: &EnvoyConfig{ + AdminBindAddress: "127.0.0.1", + AdminBindPort: 19000, + }, + Telemetry: &TelemetryConfig{ + UseCentralConfig: false, + }, + XDSServer: &XDSServer{BindAddress: fmt.Sprintf("unix://%s", socketPath)}, + }, + &pbdataplane.GetEnvoyBootstrapParamsResponse{ + Service: "web", + NodeName: nodeName, + Config: makeStruct(map[string]any{ + "envoy_dogstatsd_url": "this-should-not-appear-in-generated-config", + }), + }, + }, } for desc, tc := range testCases { t.Run(desc, func(t *testing.T) { @@ -144,7 +186,14 @@ func TestBootstrapConfig(t *testing.T) { dp := &ConsulDataplane{ cfg: tc.cfg, dpServiceClient: client, - consulServer: &consulServer{address: net.IPAddr{IP: net.ParseIP(serverAddr)}}, + } + + if checkLocalXDSServer(tc.cfg.XDSServer.BindAddress) { + if strings.HasPrefix(tc.cfg.XDSServer.BindAddress, "unix://") { + dp.localXDSServer = &localXDSServer{enabled: true, listenerAddress: socketPath, listenerNetwork: "unix"} + } else { + dp.localXDSServer = &localXDSServer{enabled: true, listenerAddress: fmt.Sprintf("127.0.0.1:%d", xdsBindPort)} + } } bsCfg, err := dp.bootstrapConfig(ctx) diff --git a/pkg/consuldp/config.go b/pkg/consuldp/config.go index 418dd638..f9393852 100644 --- a/pkg/consuldp/config.go +++ b/pkg/consuldp/config.go @@ -76,6 +76,14 @@ type EnvoyConfig struct { ReadyBindPort int } +// +type XDSServer struct { + // BindAddress is the address on which the Envoy xDS server will be available. + BindAddress string + // BindPort is the address on which the Envoy xDS port will be available. + BindPort int +} + // Config is the configuration used by consul-dataplane, consolidated // from various sources - CLI flags, env vars, config file settings. type Config struct { @@ -84,4 +92,5 @@ type Config struct { Logging *LoggingConfig Telemetry *TelemetryConfig Envoy *EnvoyConfig + XDSServer *XDSServer } diff --git a/pkg/consuldp/consul_dataplane.go b/pkg/consuldp/consul_dataplane.go index da7c68e2..6f6f00bf 100644 --- a/pkg/consuldp/consul_dataplane.go +++ b/pkg/consuldp/consul_dataplane.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "net" + "strings" "time" "github.com/hashicorp/go-hclog" @@ -29,10 +30,13 @@ type consulServer struct { grpcClientConn *grpc.ClientConn } -type gRPCServer struct { - listener net.Listener - server *grpc.Server - exitedCh chan struct{} +type localXDSServer struct { + enabled bool + listener net.Listener + listenerAddress string + listenerNetwork string + gRPCServer *grpc.Server + exitedCh chan struct{} } // ConsulDataplane represents the consul-dataplane process @@ -41,7 +45,7 @@ type ConsulDataplane struct { cfg *Config consulServer *consulServer dpServiceClient pbdataplane.DataplaneServiceClient - gRPCServer *gRPCServer + localXDSServer *localXDSServer } // NewConsulDP creates a new instance of ConsulDataplane @@ -61,8 +65,9 @@ func NewConsulDP(cfg *Config) (*ConsulDataplane, error) { }) return &ConsulDataplane{ - logger: logger, - cfg: cfg, + logger: logger, + cfg: cfg, + localXDSServer: &localXDSServer{enabled: false}, }, nil } @@ -86,6 +91,10 @@ func validateConfig(cfg *Config) error { return errors.New("envoy admin bind port not specified") case cfg.Logging == nil: return errors.New("logging settings not specified") + case cfg.XDSServer.BindAddress == "": + return errors.New("envoy xDS bind address not specified") + case cfg.XDSServer.BindPort == 0 && !checkLocalXDSServer(cfg.XDSServer.BindAddress): + return errors.New("envoy xDS bind port not specified") } return nil } @@ -124,6 +133,28 @@ func (cdp *ConsulDataplane) setConsulServerSupportedFeatures(ctx context.Context return nil } +// checkLocalXDSServer checks if the specified xds bind address is local. +func checkLocalXDSServer(xdsBindAddr string) bool { + if strings.HasPrefix(xdsBindAddr, "unix://") || xdsBindAddr == "127.0.0.1" || xdsBindAddr == "localhost" { + return true + } + return false +} + +// checkAndEnableLocalXDSServer checks if the specified xds bind address is local. +// If local, it enables the flag to configure consul-dataplane (later on in the setup process) +// with a gRPC server to serve envoy xDS requests. +// If not local, the flag remains turned off and it is assumed the xDS requests will be served +// by a remote gRPC server. +// Potential TODO: Explicitly allow specifying an option (xds.disable?) to disable configuring +// consul-dataplane as the xDS server in case xDS requests can be served on another port locally +// (example: consul server process on localhost). +func (cdp *ConsulDataplane) checkAndEnableLocalXDSServer() { + if checkLocalXDSServer(cdp.cfg.XDSServer.BindAddress) { + cdp.localXDSServer.enabled = true + } +} + func (cdp *ConsulDataplane) Run(ctx context.Context) error { cdp.logger.Info("started consul-dataplane process") @@ -156,11 +187,16 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { return fmt.Errorf("failed to set supported features: %w", err) } - err = cdp.setupGRPCServer() - if err != nil { - return err + cdp.checkAndEnableLocalXDSServer() + + if cdp.localXDSServer.enabled { + err = cdp.setupXDSServer() + if err != nil { + return err + } + go cdp.startXDSServer() + defer cdp.stopXDSServer() } - go cdp.startGRPCServer() cfg, err := cdp.bootstrapConfig(ctx) if err != nil { @@ -190,16 +226,14 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { if err := proxy.Stop(); err != nil { cdp.logger.Error("failed to stop proxy", "error", err) } - cdp.stopGRPCServer() doneCh <- nil case <-proxy.Exited(): - cdp.stopGRPCServer() doneCh <- errors.New("envoy proxy exited unexpectedly") - case <-cdp.gRPCServerExited(): + case <-cdp.xdsServerExited(): if err := proxy.Stop(); err != nil { cdp.logger.Error("failed to stop proxy", "error", err) } - doneCh <- errors.New("gRPC server exited unexpectedly") + doneCh <- errors.New("xDS server exited unexpectedly") } }() return <-doneCh diff --git a/pkg/consuldp/consul_dataplane_test.go b/pkg/consuldp/consul_dataplane_test.go index 16de685f..5c6cd0a0 100644 --- a/pkg/consuldp/consul_dataplane_test.go +++ b/pkg/consuldp/consul_dataplane_test.go @@ -34,6 +34,9 @@ func validConfig() *Config { AdminBindAddress: "127.0.0.1", AdminBindPort: 19000, }, + XDSServer: &XDSServer{ + BindAddress: "127.0.0.1", + }, } } @@ -117,6 +120,19 @@ func TestNewConsulDPError(t *testing.T) { modFn: func(c *Config) { c.Logging = nil }, expectErr: "logging settings not specified", }, + { + name: "missing xds bind address", + modFn: func(c *Config) { c.XDSServer.BindAddress = "" }, + expectErr: "envoy xDS bind address not specified", + }, + { + name: "missing xds bind port when address not local", + modFn: func(c *Config) { + c.XDSServer.BindPort = 0 + c.XDSServer.BindAddress = "1.2.3.4" + }, + expectErr: "envoy xDS bind port not specified", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -198,3 +214,47 @@ func TestSetConsulServerSupportedFeaturesError(t *testing.T) { require.ErrorContains(t, consulDP.setConsulServerSupportedFeatures(context.Background()), "failure getting supported consul-dataplane features") require.Empty(t, consulDP.consulServer.supportedFeatures) } + +func TestCheckAndEnableLocalXDSServer(t *testing.T) { + type testCase struct { + name string + modFn func(*Config) + expectLocalXDSServer bool + } + + testCases := []testCase{ + { + name: "localhost ip", + modFn: func(c *Config) { c.XDSServer.BindAddress = "127.0.0.1" }, + expectLocalXDSServer: true, + }, + { + name: "localhost name", + modFn: func(c *Config) { c.XDSServer.BindAddress = "localhost" }, + expectLocalXDSServer: true, + }, + { + name: "unix socket", + modFn: func(c *Config) { c.XDSServer.BindAddress = "unix:///var/run/xds.sock" }, + expectLocalXDSServer: true, + }, + { + name: "remote ip", + modFn: func(c *Config) { + c.XDSServer.BindAddress = "1.2.3.4" + c.XDSServer.BindPort = 1234 + }, + expectLocalXDSServer: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := validConfig() + tc.modFn(cfg) + cdp, err := NewConsulDP(cfg) + require.NoError(t, err) + cdp.checkAndEnableLocalXDSServer() + require.Equal(t, tc.expectLocalXDSServer, cdp.localXDSServer.enabled) + }) + } +} diff --git a/pkg/consuldp/grpc.go b/pkg/consuldp/grpc.go deleted file mode 100644 index b68d5792..00000000 --- a/pkg/consuldp/grpc.go +++ /dev/null @@ -1,66 +0,0 @@ -package consuldp - -import ( - "context" - "fmt" - "net" - "strings" - - "github.com/adamthesax/grpc-proxy/proxy" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" -) - -const metadataKeyToken = "x-consul-token" - -func (cdp *ConsulDataplane) setupGRPCServer() error { - cdp.logger.Trace("setting up gRPC server") - // create gRPC listener - lis, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - cdp.logger.Error("failed to create gRPC/TCP listener: %v", err) - return err - } - - // create gRPC server - // one main role of this gRPC server in consul-dataplane is to proxy envoy ADS requests - // to the connected Consul server. - director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { - if !strings.Contains(fullMethodName, "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources") { - return ctx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName)) - } - - md, _ := metadata.FromIncomingContext(ctx) - mdCopy := md.Copy() - // TODO (NET-148): Inject the ACL token acquired from the server discovery library - mdCopy[metadataKeyToken] = []string{cdp.cfg.Consul.Credentials.Static.Token} - outCtx := metadata.NewOutgoingContext(ctx, mdCopy) - return outCtx, cdp.consulServer.grpcClientConn, nil - } - newGRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(director))) - - cdp.gRPCServer = &gRPCServer{listener: lis, server: newGRPCServer, exitedCh: make(chan struct{})} - cdp.logger.Trace("created gRPC server", "address", lis.Addr().String()) - return nil -} - -func (cdp *ConsulDataplane) startGRPCServer() { - cdp.logger.Info("starting gRPC server", "address", cdp.gRPCServer.listener.Addr().String()) - - if err := cdp.gRPCServer.server.Serve(cdp.gRPCServer.listener); err != nil { - cdp.logger.Error("failed to serve gRPC requests", "error", err) - cdp.gRPCServer.listener.Close() - close(cdp.gRPCServer.exitedCh) - } -} - -func (cdp *ConsulDataplane) stopGRPCServer() { - if cdp.gRPCServer != nil { - cdp.logger.Debug("stopping gRPC server") - cdp.gRPCServer.server.Stop() - } -} - -func (cdp *ConsulDataplane) gRPCServerExited() chan struct{} { return cdp.gRPCServer.exitedCh } diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden b/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden index d94aa5d1..93388b3a 100644 --- a/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden +++ b/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden @@ -158,7 +158,7 @@ "initial_metadata": [ { "key": "x-consul-token", - "value": "some-acl-token" + "value": "" } ], "envoy_grpc": { diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden b/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden index 638615ac..c198beb8 100644 --- a/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden +++ b/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden @@ -172,7 +172,7 @@ "initial_metadata": [ { "key": "x-consul-token", - "value": "some-acl-token" + "value": "" } ], "envoy_grpc": { diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/local-unix-socket-xds-server.golden b/pkg/consuldp/testdata/TestBootstrapConfig/local-unix-socket-xds-server.golden new file mode 100644 index 00000000..4affbca5 --- /dev/null +++ b/pkg/consuldp/testdata/TestBootstrapConfig/local-unix-socket-xds-server.golden @@ -0,0 +1,169 @@ +{ + "admin": { + "access_log_path": "/dev/null", + "address": { + "socket_address": { + "address": "127.0.0.1", + "port_value": 19000 + } + } + }, + "node": { + "cluster": "web", + "id": "web-proxy", + "metadata": { + "node_name": "agentless-node", + "namespace": "default", + "partition": "default" + } + }, + "layered_runtime": { + "layers": [ + { + "name": "base", + "static_layer": { + "re2.max_program_size.error_level": 1048576 + } + } + ] + }, + "static_resources": { + "clusters": [ + { + "name": "consul-dataplane", + "ignore_health_on_host_removal": false, + "connect_timeout": "1s", + "type": "STATIC", + "http2_protocol_options": {}, + "loadAssignment": { + "clusterName": "consul-dataplane", + "endpoints": [ + { + "lbEndpoints": [ + { + "endpoint": { + "address": { + "pipe": { + "path": "/var/run/xds.sock" + } + } + } + } + ] + } + ] + } + } + ] + }, + "stats_config": { + "stats_tags": [ + { + "regex": "^cluster\\.(?:passthrough~)?((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.custom_hash" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.service_subset" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.service" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.namespace" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:([^.]+)\\.)?[^.]+\\.internal[^.]*\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.partition" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.datacenter" + }, + { + "regex": "^cluster\\.([^.]+\\.(?:[^.]+\\.)?([^.]+)\\.external\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.peer" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.routing_type" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)", + "tag_name": "consul.destination.trust_domain" + }, + { + "regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.target" + }, + { + "regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)", + "tag_name": "consul.destination.full_target" + }, + { + "regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.(([^.]+)(?:\\.[^.]+)?(?:\\.[^.]+)?\\.[^.]+\\.)", + "tag_name": "consul.upstream.service" + }, + { + "regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.[^.]+)?\\.([^.]+)\\.)", + "tag_name": "consul.upstream.datacenter" + }, + { + "regex": "^(?:tcp|http)\\.upstream_peered\\.([^.]+(?:\\.[^.]+)?\\.([^.]+)\\.)", + "tag_name": "consul.upstream.peer" + }, + { + "regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.([^.]+(?:\\.([^.]+))?(?:\\.[^.]+)?\\.[^.]+\\.)", + "tag_name": "consul.upstream.namespace" + }, + { + "regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.([^.]+))?\\.[^.]+\\.)", + "tag_name": "consul.upstream.partition" + }, + { + "tag_name": "local_cluster", + "fixed_value": "web" + }, + { + "tag_name": "consul.source.service", + "fixed_value": "web" + }, + { + "tag_name": "consul.source.namespace", + "fixed_value": "default" + }, + { + "tag_name": "consul.source.partition", + "fixed_value": "default" + } + ], + "use_all_default_tags": true + }, + "dynamic_resources": { + "lds_config": { + "ads": {}, + "resource_api_version": "V3" + }, + "cds_config": { + "ads": {}, + "resource_api_version": "V3" + }, + "ads_config": { + "api_type": "DELTA_GRPC", + "transport_api_version": "V3", + "grpc_services": { + "initial_metadata": [ + { + "key": "x-consul-token", + "value": "" + } + ], + "envoy_grpc": { + "cluster_name": "consul-dataplane" + } + } + } + } +} diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden b/pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden new file mode 100644 index 00000000..ca0741a3 --- /dev/null +++ b/pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden @@ -0,0 +1,170 @@ +{ + "admin": { + "access_log_path": "/dev/null", + "address": { + "socket_address": { + "address": "127.0.0.1", + "port_value": 19000 + } + } + }, + "node": { + "cluster": "web", + "id": "web-proxy", + "metadata": { + "node_name": "agentless-node", + "namespace": "default", + "partition": "default" + } + }, + "layered_runtime": { + "layers": [ + { + "name": "base", + "static_layer": { + "re2.max_program_size.error_level": 1048576 + } + } + ] + }, + "static_resources": { + "clusters": [ + { + "name": "consul-dataplane", + "ignore_health_on_host_removal": false, + "connect_timeout": "1s", + "type": "STATIC", + "http2_protocol_options": {}, + "loadAssignment": { + "clusterName": "consul-dataplane", + "endpoints": [ + { + "lbEndpoints": [ + { + "endpoint": { + "address": { + "socket_address": { + "address": "127.0.0.1", + "port_value": 1234 + } + } + } + } + ] + } + ] + } + } + ] + }, + "stats_config": { + "stats_tags": [ + { + "regex": "^cluster\\.(?:passthrough~)?((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.custom_hash" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.service_subset" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.service" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.namespace" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:([^.]+)\\.)?[^.]+\\.internal[^.]*\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.partition" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.datacenter" + }, + { + "regex": "^cluster\\.([^.]+\\.(?:[^.]+\\.)?([^.]+)\\.external\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.peer" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.routing_type" + }, + { + "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)", + "tag_name": "consul.destination.trust_domain" + }, + { + "regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)", + "tag_name": "consul.destination.target" + }, + { + "regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)", + "tag_name": "consul.destination.full_target" + }, + { + "regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.(([^.]+)(?:\\.[^.]+)?(?:\\.[^.]+)?\\.[^.]+\\.)", + "tag_name": "consul.upstream.service" + }, + { + "regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.[^.]+)?\\.([^.]+)\\.)", + "tag_name": "consul.upstream.datacenter" + }, + { + "regex": "^(?:tcp|http)\\.upstream_peered\\.([^.]+(?:\\.[^.]+)?\\.([^.]+)\\.)", + "tag_name": "consul.upstream.peer" + }, + { + "regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.([^.]+(?:\\.([^.]+))?(?:\\.[^.]+)?\\.[^.]+\\.)", + "tag_name": "consul.upstream.namespace" + }, + { + "regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.([^.]+))?\\.[^.]+\\.)", + "tag_name": "consul.upstream.partition" + }, + { + "tag_name": "local_cluster", + "fixed_value": "web" + }, + { + "tag_name": "consul.source.service", + "fixed_value": "web" + }, + { + "tag_name": "consul.source.namespace", + "fixed_value": "default" + }, + { + "tag_name": "consul.source.partition", + "fixed_value": "default" + } + ], + "use_all_default_tags": true + }, + "dynamic_resources": { + "lds_config": { + "ads": {}, + "resource_api_version": "V3" + }, + "cds_config": { + "ads": {}, + "resource_api_version": "V3" + }, + "ads_config": { + "api_type": "DELTA_GRPC", + "transport_api_version": "V3", + "grpc_services": { + "initial_metadata": [ + { + "key": "x-consul-token", + "value": "" + } + ], + "envoy_grpc": { + "cluster_name": "consul-dataplane" + } + } + } + } +} diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden b/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden index 52de4676..6e4077cd 100644 --- a/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden +++ b/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden @@ -247,7 +247,7 @@ "initial_metadata": [ { "key": "x-consul-token", - "value": "some-acl-token" + "value": "" } ], "envoy_grpc": { diff --git a/pkg/consuldp/xds.go b/pkg/consuldp/xds.go new file mode 100644 index 00000000..4628a4c6 --- /dev/null +++ b/pkg/consuldp/xds.go @@ -0,0 +1,100 @@ +package consuldp + +import ( + "context" + "fmt" + "net" + "strings" + + "github.com/adamthesax/grpc-proxy/proxy" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const ( + metadataKeyToken = "x-consul-token" + envoyADSMethodName = "envoy.service.discovery.v3.AggregatedDiscoveryService/DeltaAggregatedResources" +) + +// director is the helper called by the unknown service gRPC handler. This helper is responsible for injecting the ACL token +// into the outgoing Consul server request and returning the target consul server gRPC connection. +func (cdp *ConsulDataplane) director(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) { + // check to ensure other unknown/unregistered RPCs are not proxied to the target consul server. + if !strings.Contains(fullMethodName, envoyADSMethodName) { + return ctx, nil, status.Errorf(codes.Unimplemented, fmt.Sprintf("Unknown method %s", fullMethodName)) + } + + var mdCopy metadata.MD + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + mdCopy = metadata.MD{} + } else { + mdCopy = md.Copy() + } + // TODO (NET-148): Inject the ACL token acquired from the server discovery library + mdCopy[metadataKeyToken] = []string{cdp.cfg.Consul.Credentials.Static.Token} + outCtx := metadata.NewOutgoingContext(ctx, mdCopy) + return outCtx, cdp.consulServer.grpcClientConn, nil +} + +// setupXDSServer sets up the consul-dataplane xDS server +func (cdp *ConsulDataplane) setupXDSServer() error { + cdp.logger.Trace("setting up envoy xDS server") + + // create listener to accept envoy xDS connections + var network, address string + if strings.HasPrefix(cdp.cfg.XDSServer.BindAddress, "unix://") { + network = "unix" + address = cdp.cfg.XDSServer.BindAddress[len("unix://"):] + } else { + network = "tcp" + address = fmt.Sprintf("%s:%d", cdp.cfg.XDSServer.BindAddress, cdp.cfg.XDSServer.BindPort) + } + + lis, err := net.Listen(network, address) + if err != nil { + cdp.logger.Error("failed to create envoy xDS listener: %v", err) + return err + } + + // create gRPC server to serve envoy gRPC xDS requests + // one main role of this gRPC server in consul-dataplane is to proxy envoy ADS requests + // to the connected Consul server. + + // Note on the underlying library: + // It has most of the scaffolding to proxy grpc requests to a desired target. + // The main proxy logic is here - https://github.com/adamthesax/grpc-proxy/blob/master/proxy/handler.go + // The core library being used is actually this - https://github.com/mwitkow/grpc-proxy. + // However, we needed this fix (https://github.com/mwitkow/grpc-proxy/pull/62) which was available on the fork we are using. + // TODO: Switch to the main library once the fix is merged to keep upto date. + newGRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(cdp.director))) + + cdp.localXDSServer.listener = lis + cdp.localXDSServer.listenerAddress = lis.Addr().String() + cdp.localXDSServer.listenerNetwork = lis.Addr().Network() + cdp.localXDSServer.gRPCServer = newGRPCServer + cdp.localXDSServer.exitedCh = make(chan struct{}) + + cdp.logger.Trace("created xDS server", "address", lis.Addr().String()) + return nil +} + +func (cdp *ConsulDataplane) startXDSServer() { + cdp.logger.Info("starting envoy xDS server", "address", cdp.localXDSServer.listener.Addr().String()) + + if err := cdp.localXDSServer.gRPCServer.Serve(cdp.localXDSServer.listener); err != nil { + cdp.logger.Error("failed to serve xDS requests", "error", err) + close(cdp.localXDSServer.exitedCh) + } +} + +func (cdp *ConsulDataplane) stopXDSServer() { + if cdp.localXDSServer.enabled && cdp.localXDSServer.gRPCServer != nil { + cdp.logger.Debug("stopping xDS server") + cdp.localXDSServer.gRPCServer.Stop() + } +} + +func (cdp *ConsulDataplane) xdsServerExited() chan struct{} { return cdp.localXDSServer.exitedCh } diff --git a/pkg/consuldp/xds_test.go b/pkg/consuldp/xds_test.go new file mode 100644 index 00000000..df891f91 --- /dev/null +++ b/pkg/consuldp/xds_test.go @@ -0,0 +1,124 @@ +package consuldp + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const ( + testToken = "test-token" + additionalTestMetaKey = "additional-meta-key" + additionalTestMetaValue = "additional-meta-value" +) + +func TestDirector(t *testing.T) { + type testCase struct { + name string + incomingContext context.Context + methodName string + expectedErr error + } + + incomingMetadata := metadata.MD{} + incomingMetadata[additionalTestMetaKey] = []string{additionalTestMetaValue} + + testCases := []testCase{ + { + name: "empty metdata in incoming ctx", + incomingContext: context.Background(), + methodName: envoyADSMethodName, + }, + { + name: "non-empty metdata in incoming ctx", + incomingContext: metadata.NewIncomingContext(context.Background(), incomingMetadata), + methodName: envoyADSMethodName, + }, + { + name: "invalid method name", + incomingContext: metadata.NewIncomingContext(context.Background(), incomingMetadata), + methodName: "unknownrpcmethod", + expectedErr: status.Errorf(codes.Unimplemented, "Unknown method unknownrpcmethod"), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cdp := &ConsulDataplane{ + cfg: &Config{Consul: &ConsulConfig{Credentials: &CredentialsConfig{Static: &StaticCredentialsConfig{Token: testToken}}}}, + consulServer: &consulServer{grpcClientConn: &grpc.ClientConn{}}, + } + outctx, targetConn, err := cdp.director(tc.incomingContext, tc.methodName) + if tc.expectedErr != nil { + require.ErrorIs(t, err, tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, cdp.consulServer.grpcClientConn, targetConn) + outMD, ok := metadata.FromOutgoingContext(outctx) + require.True(t, ok) + require.Equal(t, []string{testToken}, outMD.Get(metadataKeyToken)) + // validate additional metadata in the incoming context is forwarded + if _, ok := metadata.FromIncomingContext(tc.incomingContext); ok { + require.Equal(t, []string{additionalTestMetaValue}, outMD.Get(additionalTestMetaKey)) + } + } + }) + } +} + +func TestSetupXDSServer(t *testing.T) { + type testCase struct { + name string + xdsBindAddress string + xdsBindPort int + expectedListenerNetwork string + expectedListenerAddress string + } + + dir := os.TempDir() + unixSocketPath := filepath.Join(dir, fmt.Sprintf("%d.sock", time.Now().UnixNano())) + defer func() { + os.Remove(unixSocketPath) + }() + + testCases := []testCase{ + {name: "localhost with no port", xdsBindAddress: "127.0.0.1", expectedListenerNetwork: "tcp", expectedListenerAddress: "127.0.0.1"}, + {name: "localhost with port", xdsBindAddress: "127.0.0.1", xdsBindPort: 51804, expectedListenerNetwork: "tcp", expectedListenerAddress: "127.0.0.1:51804"}, + {name: "unix socket", xdsBindAddress: fmt.Sprintf("unix://%s", unixSocketPath), expectedListenerNetwork: "unix", expectedListenerAddress: unixSocketPath}, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cdp := &ConsulDataplane{ + cfg: &Config{XDSServer: &XDSServer{BindAddress: tc.xdsBindAddress, BindPort: tc.xdsBindPort}}, + logger: hclog.NewNullLogger(), + localXDSServer: &localXDSServer{enabled: true}, + } + + err := cdp.setupXDSServer() + + require.NoError(t, err) + require.NotNil(t, cdp.localXDSServer.listener) + t.Cleanup(func() { cdp.localXDSServer.listener.Close() }) + require.NotNil(t, cdp.localXDSServer.gRPCServer) + require.Equal(t, tc.expectedListenerNetwork, cdp.localXDSServer.listenerNetwork) + require.Contains(t, cdp.localXDSServer.listenerAddress, tc.expectedListenerAddress) + if tc.expectedListenerNetwork == "tcp" && tc.xdsBindPort == 0 { + listenerPort := cdp.localXDSServer.listenerAddress[len(tc.xdsBindAddress)+1:] + _, err = strconv.Atoi(listenerPort) + require.NoError(t, err) + } + }) + } + +} From e2c2ec91419ee5664becd7eedac1d91688b53369 Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Tue, 6 Sep 2022 14:08:24 -0700 Subject: [PATCH 4/6] Semantic fixes --- cmd/consul-dataplane/main.go | 4 ++-- pkg/consuldp/config.go | 2 +- pkg/consuldp/xds.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/consul-dataplane/main.go b/cmd/consul-dataplane/main.go index 5bb6d174..6a8dda99 100644 --- a/cmd/consul-dataplane/main.go +++ b/cmd/consul-dataplane/main.go @@ -67,8 +67,8 @@ func init() { flag.StringVar(&readyBindAddr, "envoy-ready-bind-address", "", "The address on which Envoy's readiness probe will be available.") flag.IntVar(&readyBindPort, "envoy-ready-bind-port", 0, "The port on which Envoy's readiness probe will be available.") - flag.StringVar(&xdsBindAddr, "xds-bind-addr", "127.0.0.1", "The address on which the envoy xDS server will be available.") - flag.IntVar(&xdsBindPort, "xds-bind-port", 0, "The port on which the envoy xDS server will be available.") + flag.StringVar(&xdsBindAddr, "xds-bind-addr", "127.0.0.1", "The address on which the Envoy xDS server will be available.") + flag.IntVar(&xdsBindPort, "xds-bind-port", 0, "The port on which the Envoy xDS server will be available.") } // validateFlags performs semantic validation of the flag values diff --git a/pkg/consuldp/config.go b/pkg/consuldp/config.go index f9393852..12c7a47c 100644 --- a/pkg/consuldp/config.go +++ b/pkg/consuldp/config.go @@ -76,7 +76,7 @@ type EnvoyConfig struct { ReadyBindPort int } -// +// XDSServer contains the configuration of the xDS server. type XDSServer struct { // BindAddress is the address on which the Envoy xDS server will be available. BindAddress string diff --git a/pkg/consuldp/xds.go b/pkg/consuldp/xds.go index 4628a4c6..7dde538f 100644 --- a/pkg/consuldp/xds.go +++ b/pkg/consuldp/xds.go @@ -34,7 +34,7 @@ func (cdp *ConsulDataplane) director(ctx context.Context, fullMethodName string) mdCopy = md.Copy() } // TODO (NET-148): Inject the ACL token acquired from the server discovery library - mdCopy[metadataKeyToken] = []string{cdp.cfg.Consul.Credentials.Static.Token} + mdCopy.Set(metadataKeyToken, cdp.cfg.Consul.Credentials.Static.Token) outCtx := metadata.NewOutgoingContext(ctx, mdCopy) return outCtx, cdp.consulServer.grpcClientConn, nil } From 943f7a7782365c49cb468cd031d3045f5b88ac5b Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Wed, 7 Sep 2022 13:32:00 -0700 Subject: [PATCH 5/6] Only allow local xds bind address --- pkg/consuldp/bootstrap.go | 14 +- pkg/consuldp/bootstrap_test.go | 42 +---- pkg/consuldp/consul_dataplane.go | 50 ++---- pkg/consuldp/consul_dataplane_test.go | 49 +---- .../testdata/TestBootstrapConfig/basic.golden | 2 +- .../central-telemetry-config.golden | 2 +- .../local-xds-server.golden | 170 ------------------ .../TestBootstrapConfig/ready-listener.golden | 2 +- ...r.golden => unix-socket-xds-server.golden} | 0 pkg/consuldp/xds.go | 24 +-- pkg/consuldp/xds_test.go | 17 +- 11 files changed, 50 insertions(+), 322 deletions(-) delete mode 100644 pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden rename pkg/consuldp/testdata/TestBootstrapConfig/{local-unix-socket-xds-server.golden => unix-socket-xds-server.golden} (100%) diff --git a/pkg/consuldp/bootstrap.go b/pkg/consuldp/bootstrap.go index c55dd0fe..5ecf97d6 100644 --- a/pkg/consuldp/bootstrap.go +++ b/pkg/consuldp/bootstrap.go @@ -68,14 +68,12 @@ func (cdp *ConsulDataplane) bootstrapConfig(ctx context.Context) ([]byte, error) Datacenter: rsp.Datacenter, } - if cdp.localXDSServer != nil && cdp.localXDSServer.enabled { - if cdp.localXDSServer.listenerNetwork == "unix" { - args.GRPC.AgentSocket = cdp.localXDSServer.listenerAddress - } else { - xdsServerFullAddr := strings.Split(cdp.localXDSServer.listenerAddress, ":") - args.GRPC.AgentAddress = xdsServerFullAddr[0] - args.GRPC.AgentPort = xdsServerFullAddr[1] - } + if cdp.xdsServer.listenerNetwork == "unix" { + args.GRPC.AgentSocket = cdp.xdsServer.listenerAddress + } else { + xdsServerFullAddr := strings.Split(cdp.xdsServer.listenerAddress, ":") + args.GRPC.AgentAddress = xdsServerFullAddr[0] + args.GRPC.AgentPort = xdsServerFullAddr[1] } var bootstrapConfig bootstrap.BootstrapConfig diff --git a/pkg/consuldp/bootstrap_test.go b/pkg/consuldp/bootstrap_test.go index 724be3c3..837443de 100644 --- a/pkg/consuldp/bootstrap_test.go +++ b/pkg/consuldp/bootstrap_test.go @@ -56,7 +56,7 @@ func TestBootstrapConfig(t *testing.T) { Telemetry: &TelemetryConfig{ UseCentralConfig: false, }, - XDSServer: &XDSServer{BindAddress: "1.2.3.4", BindPort: xdsBindPort}, + XDSServer: &XDSServer{BindAddress: "127.0.0.1", BindPort: xdsBindPort}, }, &pbdataplane.GetEnvoyBootstrapParamsResponse{ Service: "web", @@ -82,7 +82,7 @@ func TestBootstrapConfig(t *testing.T) { Telemetry: &TelemetryConfig{ UseCentralConfig: true, }, - XDSServer: &XDSServer{BindAddress: "1.2.3.4", BindPort: xdsBindPort}, + XDSServer: &XDSServer{BindAddress: "127.0.0.1", BindPort: xdsBindPort}, }, &pbdataplane.GetEnvoyBootstrapParamsResponse{ Service: "web", @@ -110,40 +110,14 @@ func TestBootstrapConfig(t *testing.T) { Telemetry: &TelemetryConfig{ UseCentralConfig: false, }, - XDSServer: &XDSServer{BindAddress: "1.2.3.4", BindPort: xdsBindPort}, - }, - &pbdataplane.GetEnvoyBootstrapParamsResponse{ - Service: "web", - NodeName: nodeName, - }, - }, - "local-xds-server": { - &Config{ - Consul: &ConsulConfig{ - GRPCPort: 1234, - }, - Service: &ServiceConfig{ - ServiceID: "web-proxy", - NodeName: nodeName, - }, - Envoy: &EnvoyConfig{ - AdminBindAddress: "127.0.0.1", - AdminBindPort: 19000, - }, - Telemetry: &TelemetryConfig{ - UseCentralConfig: false, - }, XDSServer: &XDSServer{BindAddress: "127.0.0.1", BindPort: xdsBindPort}, }, &pbdataplane.GetEnvoyBootstrapParamsResponse{ Service: "web", NodeName: nodeName, - Config: makeStruct(map[string]any{ - "envoy_dogstatsd_url": "this-should-not-appear-in-generated-config", - }), }, }, - "local-unix-socket-xds-server": { + "unix-socket-xds-server": { &Config{ Consul: &ConsulConfig{ GRPCPort: 1234, @@ -188,12 +162,10 @@ func TestBootstrapConfig(t *testing.T) { dpServiceClient: client, } - if checkLocalXDSServer(tc.cfg.XDSServer.BindAddress) { - if strings.HasPrefix(tc.cfg.XDSServer.BindAddress, "unix://") { - dp.localXDSServer = &localXDSServer{enabled: true, listenerAddress: socketPath, listenerNetwork: "unix"} - } else { - dp.localXDSServer = &localXDSServer{enabled: true, listenerAddress: fmt.Sprintf("127.0.0.1:%d", xdsBindPort)} - } + if strings.HasPrefix(tc.cfg.XDSServer.BindAddress, "unix://") { + dp.xdsServer = &xdsServer{listenerAddress: socketPath, listenerNetwork: "unix"} + } else { + dp.xdsServer = &xdsServer{listenerAddress: fmt.Sprintf("127.0.0.1:%d", xdsBindPort)} } bsCfg, err := dp.bootstrapConfig(ctx) diff --git a/pkg/consuldp/consul_dataplane.go b/pkg/consuldp/consul_dataplane.go index 6f6f00bf..62643b39 100644 --- a/pkg/consuldp/consul_dataplane.go +++ b/pkg/consuldp/consul_dataplane.go @@ -30,8 +30,7 @@ type consulServer struct { grpcClientConn *grpc.ClientConn } -type localXDSServer struct { - enabled bool +type xdsServer struct { listener net.Listener listenerAddress string listenerNetwork string @@ -45,7 +44,7 @@ type ConsulDataplane struct { cfg *Config consulServer *consulServer dpServiceClient pbdataplane.DataplaneServiceClient - localXDSServer *localXDSServer + xdsServer *xdsServer } // NewConsulDP creates a new instance of ConsulDataplane @@ -65,9 +64,8 @@ func NewConsulDP(cfg *Config) (*ConsulDataplane, error) { }) return &ConsulDataplane{ - logger: logger, - cfg: cfg, - localXDSServer: &localXDSServer{enabled: false}, + logger: logger, + cfg: cfg, }, nil } @@ -93,8 +91,8 @@ func validateConfig(cfg *Config) error { return errors.New("logging settings not specified") case cfg.XDSServer.BindAddress == "": return errors.New("envoy xDS bind address not specified") - case cfg.XDSServer.BindPort == 0 && !checkLocalXDSServer(cfg.XDSServer.BindAddress): - return errors.New("envoy xDS bind port not specified") + case !strings.HasPrefix(cfg.XDSServer.BindAddress, "unix://") && cfg.XDSServer.BindAddress != "127.0.0.1" && cfg.XDSServer.BindAddress != "localhost": + return errors.New("non-local xDS bind address not allowed") } return nil } @@ -133,28 +131,6 @@ func (cdp *ConsulDataplane) setConsulServerSupportedFeatures(ctx context.Context return nil } -// checkLocalXDSServer checks if the specified xds bind address is local. -func checkLocalXDSServer(xdsBindAddr string) bool { - if strings.HasPrefix(xdsBindAddr, "unix://") || xdsBindAddr == "127.0.0.1" || xdsBindAddr == "localhost" { - return true - } - return false -} - -// checkAndEnableLocalXDSServer checks if the specified xds bind address is local. -// If local, it enables the flag to configure consul-dataplane (later on in the setup process) -// with a gRPC server to serve envoy xDS requests. -// If not local, the flag remains turned off and it is assumed the xDS requests will be served -// by a remote gRPC server. -// Potential TODO: Explicitly allow specifying an option (xds.disable?) to disable configuring -// consul-dataplane as the xDS server in case xDS requests can be served on another port locally -// (example: consul server process on localhost). -func (cdp *ConsulDataplane) checkAndEnableLocalXDSServer() { - if checkLocalXDSServer(cdp.cfg.XDSServer.BindAddress) { - cdp.localXDSServer.enabled = true - } -} - func (cdp *ConsulDataplane) Run(ctx context.Context) error { cdp.logger.Info("started consul-dataplane process") @@ -187,16 +163,12 @@ func (cdp *ConsulDataplane) Run(ctx context.Context) error { return fmt.Errorf("failed to set supported features: %w", err) } - cdp.checkAndEnableLocalXDSServer() - - if cdp.localXDSServer.enabled { - err = cdp.setupXDSServer() - if err != nil { - return err - } - go cdp.startXDSServer() - defer cdp.stopXDSServer() + err = cdp.setupXDSServer() + if err != nil { + return err } + go cdp.startXDSServer() + defer cdp.stopXDSServer() cfg, err := cdp.bootstrapConfig(ctx) if err != nil { diff --git a/pkg/consuldp/consul_dataplane_test.go b/pkg/consuldp/consul_dataplane_test.go index 5c6cd0a0..3fa5af89 100644 --- a/pkg/consuldp/consul_dataplane_test.go +++ b/pkg/consuldp/consul_dataplane_test.go @@ -126,12 +126,11 @@ func TestNewConsulDPError(t *testing.T) { expectErr: "envoy xDS bind address not specified", }, { - name: "missing xds bind port when address not local", + name: "non-local xds bind address", modFn: func(c *Config) { - c.XDSServer.BindPort = 0 c.XDSServer.BindAddress = "1.2.3.4" }, - expectErr: "envoy xDS bind port not specified", + expectErr: "non-local xDS bind address not allowed", }, } for _, tc := range testCases { @@ -214,47 +213,3 @@ func TestSetConsulServerSupportedFeaturesError(t *testing.T) { require.ErrorContains(t, consulDP.setConsulServerSupportedFeatures(context.Background()), "failure getting supported consul-dataplane features") require.Empty(t, consulDP.consulServer.supportedFeatures) } - -func TestCheckAndEnableLocalXDSServer(t *testing.T) { - type testCase struct { - name string - modFn func(*Config) - expectLocalXDSServer bool - } - - testCases := []testCase{ - { - name: "localhost ip", - modFn: func(c *Config) { c.XDSServer.BindAddress = "127.0.0.1" }, - expectLocalXDSServer: true, - }, - { - name: "localhost name", - modFn: func(c *Config) { c.XDSServer.BindAddress = "localhost" }, - expectLocalXDSServer: true, - }, - { - name: "unix socket", - modFn: func(c *Config) { c.XDSServer.BindAddress = "unix:///var/run/xds.sock" }, - expectLocalXDSServer: true, - }, - { - name: "remote ip", - modFn: func(c *Config) { - c.XDSServer.BindAddress = "1.2.3.4" - c.XDSServer.BindPort = 1234 - }, - expectLocalXDSServer: false, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - cfg := validConfig() - tc.modFn(cfg) - cdp, err := NewConsulDP(cfg) - require.NoError(t, err) - cdp.checkAndEnableLocalXDSServer() - require.Equal(t, tc.expectLocalXDSServer, cdp.localXDSServer.enabled) - }) - } -} diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden b/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden index 93388b3a..ca0741a3 100644 --- a/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden +++ b/pkg/consuldp/testdata/TestBootstrapConfig/basic.golden @@ -44,7 +44,7 @@ "endpoint": { "address": { "socket_address": { - "address": "1.2.3.4", + "address": "127.0.0.1", "port_value": 1234 } } diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden b/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden index c198beb8..8ed91715 100644 --- a/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden +++ b/pkg/consuldp/testdata/TestBootstrapConfig/central-telemetry-config.golden @@ -44,7 +44,7 @@ "endpoint": { "address": { "socket_address": { - "address": "1.2.3.4", + "address": "127.0.0.1", "port_value": 1234 } } diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden b/pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden deleted file mode 100644 index ca0741a3..00000000 --- a/pkg/consuldp/testdata/TestBootstrapConfig/local-xds-server.golden +++ /dev/null @@ -1,170 +0,0 @@ -{ - "admin": { - "access_log_path": "/dev/null", - "address": { - "socket_address": { - "address": "127.0.0.1", - "port_value": 19000 - } - } - }, - "node": { - "cluster": "web", - "id": "web-proxy", - "metadata": { - "node_name": "agentless-node", - "namespace": "default", - "partition": "default" - } - }, - "layered_runtime": { - "layers": [ - { - "name": "base", - "static_layer": { - "re2.max_program_size.error_level": 1048576 - } - } - ] - }, - "static_resources": { - "clusters": [ - { - "name": "consul-dataplane", - "ignore_health_on_host_removal": false, - "connect_timeout": "1s", - "type": "STATIC", - "http2_protocol_options": {}, - "loadAssignment": { - "clusterName": "consul-dataplane", - "endpoints": [ - { - "lbEndpoints": [ - { - "endpoint": { - "address": { - "socket_address": { - "address": "127.0.0.1", - "port_value": 1234 - } - } - } - } - ] - } - ] - } - } - ] - }, - "stats_config": { - "stats_tags": [ - { - "regex": "^cluster\\.(?:passthrough~)?((?:([^.]+)~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.custom_hash" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:([^.]+)\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.service_subset" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?([^.]+)\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.service" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.namespace" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:([^.]+)\\.)?[^.]+\\.internal[^.]*\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.partition" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?([^.]+)\\.internal[^.]*\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.datacenter" - }, - { - "regex": "^cluster\\.([^.]+\\.(?:[^.]+\\.)?([^.]+)\\.external\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.peer" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.([^.]+)\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.routing_type" - }, - { - "regex": "^cluster\\.(?:passthrough~)?((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.([^.]+)\\.consul\\.)", - "tag_name": "consul.destination.trust_domain" - }, - { - "regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+)\\.[^.]+\\.[^.]+\\.consul\\.)", - "tag_name": "consul.destination.target" - }, - { - "regex": "^cluster\\.(?:passthrough~)?(((?:[^.]+~)?(?:[^.]+\\.)?[^.]+\\.[^.]+\\.(?:[^.]+\\.)?[^.]+\\.[^.]+\\.[^.]+)\\.consul\\.)", - "tag_name": "consul.destination.full_target" - }, - { - "regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.(([^.]+)(?:\\.[^.]+)?(?:\\.[^.]+)?\\.[^.]+\\.)", - "tag_name": "consul.upstream.service" - }, - { - "regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.[^.]+)?\\.([^.]+)\\.)", - "tag_name": "consul.upstream.datacenter" - }, - { - "regex": "^(?:tcp|http)\\.upstream_peered\\.([^.]+(?:\\.[^.]+)?\\.([^.]+)\\.)", - "tag_name": "consul.upstream.peer" - }, - { - "regex": "^(?:tcp|http)\\.upstream(?:_peered)?\\.([^.]+(?:\\.([^.]+))?(?:\\.[^.]+)?\\.[^.]+\\.)", - "tag_name": "consul.upstream.namespace" - }, - { - "regex": "^(?:tcp|http)\\.upstream\\.([^.]+(?:\\.[^.]+)?(?:\\.([^.]+))?\\.[^.]+\\.)", - "tag_name": "consul.upstream.partition" - }, - { - "tag_name": "local_cluster", - "fixed_value": "web" - }, - { - "tag_name": "consul.source.service", - "fixed_value": "web" - }, - { - "tag_name": "consul.source.namespace", - "fixed_value": "default" - }, - { - "tag_name": "consul.source.partition", - "fixed_value": "default" - } - ], - "use_all_default_tags": true - }, - "dynamic_resources": { - "lds_config": { - "ads": {}, - "resource_api_version": "V3" - }, - "cds_config": { - "ads": {}, - "resource_api_version": "V3" - }, - "ads_config": { - "api_type": "DELTA_GRPC", - "transport_api_version": "V3", - "grpc_services": { - "initial_metadata": [ - { - "key": "x-consul-token", - "value": "" - } - ], - "envoy_grpc": { - "cluster_name": "consul-dataplane" - } - } - } - } -} diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden b/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden index 6e4077cd..1d45d37d 100644 --- a/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden +++ b/pkg/consuldp/testdata/TestBootstrapConfig/ready-listener.golden @@ -44,7 +44,7 @@ "endpoint": { "address": { "socket_address": { - "address": "1.2.3.4", + "address": "127.0.0.1", "port_value": 1234 } } diff --git a/pkg/consuldp/testdata/TestBootstrapConfig/local-unix-socket-xds-server.golden b/pkg/consuldp/testdata/TestBootstrapConfig/unix-socket-xds-server.golden similarity index 100% rename from pkg/consuldp/testdata/TestBootstrapConfig/local-unix-socket-xds-server.golden rename to pkg/consuldp/testdata/TestBootstrapConfig/unix-socket-xds-server.golden diff --git a/pkg/consuldp/xds.go b/pkg/consuldp/xds.go index 7dde538f..2340d12b 100644 --- a/pkg/consuldp/xds.go +++ b/pkg/consuldp/xds.go @@ -71,30 +71,32 @@ func (cdp *ConsulDataplane) setupXDSServer() error { // TODO: Switch to the main library once the fix is merged to keep upto date. newGRPCServer := grpc.NewServer(grpc.UnknownServiceHandler(proxy.TransparentHandler(cdp.director))) - cdp.localXDSServer.listener = lis - cdp.localXDSServer.listenerAddress = lis.Addr().String() - cdp.localXDSServer.listenerNetwork = lis.Addr().Network() - cdp.localXDSServer.gRPCServer = newGRPCServer - cdp.localXDSServer.exitedCh = make(chan struct{}) + cdp.xdsServer = &xdsServer{ + listener: lis, + listenerAddress: lis.Addr().String(), + listenerNetwork: lis.Addr().Network(), + gRPCServer: newGRPCServer, + exitedCh: make(chan struct{}), + } cdp.logger.Trace("created xDS server", "address", lis.Addr().String()) return nil } func (cdp *ConsulDataplane) startXDSServer() { - cdp.logger.Info("starting envoy xDS server", "address", cdp.localXDSServer.listener.Addr().String()) + cdp.logger.Info("starting envoy xDS server", "address", cdp.xdsServer.listener.Addr().String()) - if err := cdp.localXDSServer.gRPCServer.Serve(cdp.localXDSServer.listener); err != nil { + if err := cdp.xdsServer.gRPCServer.Serve(cdp.xdsServer.listener); err != nil { cdp.logger.Error("failed to serve xDS requests", "error", err) - close(cdp.localXDSServer.exitedCh) + close(cdp.xdsServer.exitedCh) } } func (cdp *ConsulDataplane) stopXDSServer() { - if cdp.localXDSServer.enabled && cdp.localXDSServer.gRPCServer != nil { + if cdp.xdsServer.gRPCServer != nil { cdp.logger.Debug("stopping xDS server") - cdp.localXDSServer.gRPCServer.Stop() + cdp.xdsServer.gRPCServer.Stop() } } -func (cdp *ConsulDataplane) xdsServerExited() chan struct{} { return cdp.localXDSServer.exitedCh } +func (cdp *ConsulDataplane) xdsServerExited() chan struct{} { return cdp.xdsServer.exitedCh } diff --git a/pkg/consuldp/xds_test.go b/pkg/consuldp/xds_test.go index df891f91..9ee1efd9 100644 --- a/pkg/consuldp/xds_test.go +++ b/pkg/consuldp/xds_test.go @@ -100,21 +100,20 @@ func TestSetupXDSServer(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { cdp := &ConsulDataplane{ - cfg: &Config{XDSServer: &XDSServer{BindAddress: tc.xdsBindAddress, BindPort: tc.xdsBindPort}}, - logger: hclog.NewNullLogger(), - localXDSServer: &localXDSServer{enabled: true}, + cfg: &Config{XDSServer: &XDSServer{BindAddress: tc.xdsBindAddress, BindPort: tc.xdsBindPort}}, + logger: hclog.NewNullLogger(), } err := cdp.setupXDSServer() require.NoError(t, err) - require.NotNil(t, cdp.localXDSServer.listener) - t.Cleanup(func() { cdp.localXDSServer.listener.Close() }) - require.NotNil(t, cdp.localXDSServer.gRPCServer) - require.Equal(t, tc.expectedListenerNetwork, cdp.localXDSServer.listenerNetwork) - require.Contains(t, cdp.localXDSServer.listenerAddress, tc.expectedListenerAddress) + require.NotNil(t, cdp.xdsServer.listener) + t.Cleanup(func() { cdp.xdsServer.listener.Close() }) + require.NotNil(t, cdp.xdsServer.gRPCServer) + require.Equal(t, tc.expectedListenerNetwork, cdp.xdsServer.listenerNetwork) + require.Contains(t, cdp.xdsServer.listenerAddress, tc.expectedListenerAddress) if tc.expectedListenerNetwork == "tcp" && tc.xdsBindPort == 0 { - listenerPort := cdp.localXDSServer.listenerAddress[len(tc.xdsBindAddress)+1:] + listenerPort := cdp.xdsServer.listenerAddress[len(tc.xdsBindAddress)+1:] _, err = strconv.Atoi(listenerPort) require.NoError(t, err) } From cd799a4a9ce74ce367f3cc03ccdf605405c48c4e Mon Sep 17 00:00:00 2001 From: Riddhi Shah Date: Thu, 8 Sep 2022 09:15:20 -0700 Subject: [PATCH 6/6] Check loopback --- pkg/consuldp/consul_dataplane.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/consuldp/consul_dataplane.go b/pkg/consuldp/consul_dataplane.go index 62643b39..bbbd021e 100644 --- a/pkg/consuldp/consul_dataplane.go +++ b/pkg/consuldp/consul_dataplane.go @@ -91,7 +91,7 @@ func validateConfig(cfg *Config) error { return errors.New("logging settings not specified") case cfg.XDSServer.BindAddress == "": return errors.New("envoy xDS bind address not specified") - case !strings.HasPrefix(cfg.XDSServer.BindAddress, "unix://") && cfg.XDSServer.BindAddress != "127.0.0.1" && cfg.XDSServer.BindAddress != "localhost": + case !strings.HasPrefix(cfg.XDSServer.BindAddress, "unix://") && !net.ParseIP(cfg.XDSServer.BindAddress).IsLoopback(): return errors.New("non-local xDS bind address not allowed") } return nil