Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package agent_test

import (
"fmt"
"github.com/gojektech/consul-envoy-xds/agent"
"github.com/gojek/consul-envoy-xds/agent"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testutil/retry"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -39,19 +39,19 @@ func TestShouldReturnServicesFromCatalog(t *testing.T) {

func TestShouldReturnHealthCheckPassedServicesFromCatalog(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Println(w,"helo")
fmt.Println(w, "helo")
}))
defer ts.Close()

consulSvr, consulClient := agent.StartConsulTestServer()
defer consulSvr.Stop()

consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{Name: "foo", ID: "foo", Check: &api.AgentServiceCheck{
HTTP: "invalidhost:8080",
HTTP: "invalidhost:8080",
Interval: "1ms",
}})
consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{Name: "bar", ID: "bar", Check: &api.AgentServiceCheck{
HTTP: ts.URL,
HTTP: ts.URL,
Interval: "1ms",
}})

Expand Down
17 changes: 9 additions & 8 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"log"
"net"

"github.com/gojektech/consul-envoy-xds/config"
"github.com/gojektech/consul-envoy-xds/eds"
"github.com/gojektech/consul-envoy-xds/edswatch"
"github.com/gojektech/consul-envoy-xds/pubsub"
"github.com/gojek/consul-envoy-xds/config"
"github.com/gojek/consul-envoy-xds/eds"
"github.com/gojek/consul-envoy-xds/edswatch"
"github.com/gojek/consul-envoy-xds/pubsub"

cp "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/gojektech/consul-envoy-xds/agent"
"github.com/gojektech/consul-envoy-xds/stream"
"github.com/gojek/consul-envoy-xds/agent"
"github.com/gojek/consul-envoy-xds/stream"
"google.golang.org/grpc"
)

Expand All @@ -26,8 +26,9 @@ func Start() {
var services []eds.Service
for _, s := range svcCfg {
services = append(services, eds.Service{
Name: s,
Whitelist: cfg.WhitelistedRoutes(s),
Name: s,
Whitelist: cfg.WhitelistedRoutes(s),
CircuirBreakerConfig: cfg.CircuitBreakerConfig(s),
})
}

Expand Down
4 changes: 2 additions & 2 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
dis "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
google_protobuf5 "github.com/gogo/protobuf/types"
"github.com/gojektech/consul-envoy-xds/agent"
"github.com/gojektech/consul-envoy-xds/app"
"github.com/gojek/consul-envoy-xds/agent"
"github.com/gojek/consul-envoy-xds/app"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib/freeport"
"github.com/stretchr/testify/assert"
Expand Down
38 changes: 35 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package config

import (
"fmt"

"strings"

"github.com/gojek-engineering/goconfig"
"strconv"
)

const (
DefaultCircuitBreakerMaxConnections = 1024
DefaultCircuitBreakerMaxRequests = 1024
DefaultCircuitBreakerMaxPendingRequests = 1024
DefaultCircuitBreakerMaxRetries = 3
)

type Config struct {
goconfig.BaseConfig
}
Expand All @@ -19,6 +25,13 @@ type HTTPHeaderRateLimitConfig struct {
DescriptorKey string
}

type CircuitBreakerConfig struct {
MaxConnections uint32
MaxPendingRequests uint32
MaxRequests uint32
MaxRetries uint32
}

func Load() *Config {
cfg := &Config{}
cfg.LoadWithOptions(map[string]interface{}{"newrelic": false, "db": false})
Expand Down Expand Up @@ -74,11 +87,30 @@ func (cfg *Config) GetHTTPHeaderRateLimitConfig() *HTTPHeaderRateLimitConfig {
}

func (cfg *Config) WhitelistedRoutes(svc string) []string {
canonicalName := strings.Replace(svc, "-", "_", -1)
whitelist := cfg.GetOptionalValue(strings.ToUpper(canonicalName)+"_WHITELISTED_ROUTES", "/")
canonicalName := canonicalizeSvcName(svc)
whitelist := cfg.GetOptionalValue(canonicalName+"_WHITELISTED_ROUTES", "/")
return strings.Split(whitelist, ",")
}

func (cfg *Config) EnableHealthCheckCatalogService() bool {
return cfg.GetFeature("ENABLE_HEALTH_CHECK_CATALOG_SVC")
}

func (cfg *Config) CircuitBreakerConfig(svc string) CircuitBreakerConfig {
canonicalName := canonicalizeSvcName(svc)
maxConnections := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_CONNECTIONS", DefaultCircuitBreakerMaxConnections))
maxPendingRequests := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_PENDING_REQUESTS", DefaultCircuitBreakerMaxPendingRequests))
maxRequests := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_REQUESTS", DefaultCircuitBreakerMaxRequests))
maxRetries := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_RETRIES", DefaultCircuitBreakerMaxRetries))

return CircuitBreakerConfig{
MaxConnections: maxConnections,
MaxPendingRequests: maxPendingRequests,
MaxRequests: maxRequests,
MaxRetries: maxRetries,
}
}

func canonicalizeSvcName(svc string) string {
return strings.ToUpper(strings.Replace(svc, "-", "_", -1))
}
86 changes: 85 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"os"
"testing"

"github.com/gojektech/consul-envoy-xds/config"
"github.com/gojek/consul-envoy-xds/config"
"github.com/stretchr/testify/assert"
)

Expand All @@ -25,3 +25,87 @@ func TestWhiteListedRoutesConfigReplacesHyphens(t *testing.T) {

assert.Equal(t, []string{"foo"}, cfg.WhitelistedRoutes("foo-bar"))
}

func TestConfig_CircuitBreakerConfig(t *testing.T) {

t.Run("MaxConnections", func(t *testing.T) {
t.Run("Should return default value if config is missing", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_CONNECTIONS"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxConnections), cfg.MaxConnections)
})

t.Run("Should return value from env vars", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_CONNECTIONS", "100"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(100), cfg.MaxConnections)
})
})

t.Run("MaxPendingRequests", func(t *testing.T) {
t.Run("Should return default value if config is missing", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_PENDING_REQUESTS"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxPendingRequests), cfg.MaxPendingRequests)
})

t.Run("Should return value from env vars", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_PENDING_REQUESTS", "100"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(100), cfg.MaxConnections)
})
})

t.Run("MaxRequests", func(t *testing.T) {

t.Run("Should return default value if config is missing", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_REQUESTS"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxRequests), cfg.MaxRequests)
})

t.Run("Should return value from env vars", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_REQUESTS", "100"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(100), cfg.MaxRequests)
})
})

t.Run("MaxRetries", func(t *testing.T) {
t.Run("Should return default value if config is missing", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_RETRIES"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxRetries), cfg.MaxRetries)
})

t.Run("Should return value from env vars", func(t *testing.T) {
mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_RETRIES", "5"))
defer mutation.Rollback()

cfg := config.Load().CircuitBreakerConfig("foo-bar")

assert.Equal(t, uint32(5), cfg.MaxRetries)
})
})
}
57 changes: 57 additions & 0 deletions config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,60 @@ func checkKey(key string) {
log.Fatalf("%s key is not set", key)
}
}

type EnvVar struct {
Present bool
Key string
Value string
}

func NewEnvVar(key, value string) EnvVar {
return EnvVar{
Present: true,
Key: key,
Value: value,
}
}

func MissingEnvVar(key string) EnvVar {
return EnvVar{
Present: false,
Key: key,
}
}

type EnvVarMutation struct {
previousValues []EnvVar
}

func (e EnvVarMutation) Rollback() {

}

func ApplyEnvVars(vars ...EnvVar) EnvVarMutation {
var oldEnvVars []EnvVar

for _, envVar := range vars {
oldValue, exists := os.LookupEnv(envVar.Key)

oldEnvVars = append(oldEnvVars, EnvVar{
Present: exists,
Key: envVar.Key,
Value: oldValue,
})

if envVar.Present {
Must(os.Setenv(envVar.Key, envVar.Value))
} else {
Must(os.Unsetenv(envVar.Key))
}
}

return EnvVarMutation{previousValues: oldEnvVars}
}

func Must(err error) {
if err != nil {
panic(err)
}
}
24 changes: 20 additions & 4 deletions eds/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package eds

import (
"fmt"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
"github.com/gojek/consul-envoy-xds/utils"
"time"

"github.com/gojektech/consul-envoy-xds/agent"
"github.com/gojektech/consul-envoy-xds/pubsub"
"github.com/gojek/consul-envoy-xds/agent"
"github.com/gojek/consul-envoy-xds/pubsub"

"log"

cp "github.com/envoyproxy/go-control-plane/envoy/api/v2"
cpcore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
eds "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/gojektech/consul-envoy-xds/config"
"github.com/gojek/consul-envoy-xds/config"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
"strings"
Expand All @@ -38,6 +40,8 @@ type service struct {
enableHealthCheck bool
}

var _ Endpoint = &service{}

func (s *service) serviceNames() []string {
var names []string
for k := range s.services {
Expand Down Expand Up @@ -83,8 +87,11 @@ func (s *service) Clusters() []*cp.Cluster {
var clusters []*cp.Cluster
for _, services := range serviceList {
if len(services) > 0 {
serviceName := services[0].ServiceName
svc := s.services[serviceName]

clusters = append(clusters, &cp.Cluster{
Name: services[0].ServiceName,
Name: serviceName,
Type: cp.Cluster_EDS,
ConnectTimeout: 1 * time.Second,
ProtocolSelection: cp.Cluster_USE_DOWNSTREAM_PROTOCOL,
Expand All @@ -95,6 +102,15 @@ func (s *service) Clusters() []*cp.Cluster {
},
},
},
CircuitBreakers: &cluster.CircuitBreakers{
Thresholds: []*cluster.CircuitBreakers_Thresholds{{
cpcore.RoutingPriority_DEFAULT,
utils.Uint32Value(svc.CircuirBreakerConfig.MaxConnections),
utils.Uint32Value(svc.CircuirBreakerConfig.MaxPendingRequests),
utils.Uint32Value(svc.CircuirBreakerConfig.MaxRequests),
utils.Uint32Value(svc.CircuirBreakerConfig.MaxRetries),
}},
},
})
}
}
Expand Down
16 changes: 13 additions & 3 deletions eds/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package eds_test

import (
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
"github.com/gojek/consul-envoy-xds/utils"
"testing"

"github.com/gojektech/consul-envoy-xds/eds"
"github.com/gojektech/consul-envoy-xds/pubsub"
"github.com/gojek/consul-envoy-xds/eds"
"github.com/gojek/consul-envoy-xds/pubsub"

cp "github.com/envoyproxy/go-control-plane/envoy/api/v2"
cpcore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/gojektech/consul-envoy-xds/config"
"github.com/gojek/consul-envoy-xds/config"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -52,6 +54,14 @@ func TestShouldHaveClusterUsingAgentCatalogServiceEndpoints(t *testing.T) {

assert.Equal(t, "foo-service", clusters[0].Name)
assert.Equal(t, cp.Cluster_USE_DOWNSTREAM_PROTOCOL, clusters[0].ProtocolSelection)
circuitBreakers := cluster.CircuitBreakers{Thresholds: []*cluster.CircuitBreakers_Thresholds{{
Priority: cpcore.RoutingPriority_DEFAULT,
MaxConnections: utils.Uint32Value(1024),
MaxPendingRequests: utils.Uint32Value(1024),
MaxRequests: utils.Uint32Value(1024),
MaxRetries: utils.Uint32Value(3),
}}}
assert.Equal(t, circuitBreakers, clusters[0].CircuitBreakers)
}

func TestShouldHaveAuthHeaderRateLimit(t *testing.T) {
Expand Down
Loading