diff --git a/agent/agent_test.go b/agent/agent_test.go index 8d6be12..4b9dfa7 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -2,7 +2,7 @@ package agent_test import ( "fmt" - "github.com/gojektech/consul-envoy-xds/agent" + "github.com/gojek/consul-envoy-xds/agent" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testutil/retry" "github.com/stretchr/testify/assert" @@ -39,7 +39,7 @@ func TestShouldReturnServicesFromCatalog(t *testing.T) { func TestShouldReturnHealthCheckPassedServicesFromCatalog(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - fmt.Println(w,"helo") + fmt.Println(w, "helo") })) defer ts.Close() @@ -47,11 +47,11 @@ func TestShouldReturnHealthCheckPassedServicesFromCatalog(t *testing.T) { defer consulSvr.Stop() consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{Name: "foo", ID: "foo", Check: &api.AgentServiceCheck{ - HTTP: "invalidhost:8080", + HTTP: "invalidhost:8080", Interval: "1ms", }}) consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{Name: "bar", ID: "bar", Check: &api.AgentServiceCheck{ - HTTP: ts.URL, + HTTP: ts.URL, Interval: "1ms", }}) diff --git a/app/app.go b/app/app.go index 112a84d..cb61207 100644 --- a/app/app.go +++ b/app/app.go @@ -5,14 +5,14 @@ import ( "log" "net" - "github.com/gojektech/consul-envoy-xds/config" - "github.com/gojektech/consul-envoy-xds/eds" - "github.com/gojektech/consul-envoy-xds/edswatch" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/config" + "github.com/gojek/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/edswatch" + "github.com/gojek/consul-envoy-xds/pubsub" cp "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "github.com/gojektech/consul-envoy-xds/agent" - "github.com/gojektech/consul-envoy-xds/stream" + "github.com/gojek/consul-envoy-xds/agent" + "github.com/gojek/consul-envoy-xds/stream" "google.golang.org/grpc" ) @@ -26,8 +26,9 @@ func Start() { var services []eds.Service for _, s := range svcCfg { services = append(services, eds.Service{ - Name: s, - Whitelist: cfg.WhitelistedRoutes(s), + Name: s, + Whitelist: cfg.WhitelistedRoutes(s), + CircuirBreakerConfig: cfg.CircuitBreakerConfig(s), }) } diff --git a/app/app_test.go b/app/app_test.go index d3010a4..0b5648b 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -19,8 +19,8 @@ import ( "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" dis "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" google_protobuf5 "github.com/gogo/protobuf/types" - "github.com/gojektech/consul-envoy-xds/agent" - "github.com/gojektech/consul-envoy-xds/app" + "github.com/gojek/consul-envoy-xds/agent" + "github.com/gojek/consul-envoy-xds/app" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib/freeport" "github.com/stretchr/testify/assert" diff --git a/config/config.go b/config/config.go index 855c043..e76bb45 100644 --- a/config/config.go +++ b/config/config.go @@ -2,13 +2,19 @@ package config import ( "fmt" - "strings" "github.com/gojek-engineering/goconfig" "strconv" ) +const ( + DefaultCircuitBreakerMaxConnections = 1024 + DefaultCircuitBreakerMaxRequests = 1024 + DefaultCircuitBreakerMaxPendingRequests = 1024 + DefaultCircuitBreakerMaxRetries = 3 +) + type Config struct { goconfig.BaseConfig } @@ -19,6 +25,13 @@ type HTTPHeaderRateLimitConfig struct { DescriptorKey string } +type CircuitBreakerConfig struct { + MaxConnections uint32 + MaxPendingRequests uint32 + MaxRequests uint32 + MaxRetries uint32 +} + func Load() *Config { cfg := &Config{} cfg.LoadWithOptions(map[string]interface{}{"newrelic": false, "db": false}) @@ -74,11 +87,30 @@ func (cfg *Config) GetHTTPHeaderRateLimitConfig() *HTTPHeaderRateLimitConfig { } func (cfg *Config) WhitelistedRoutes(svc string) []string { - canonicalName := strings.Replace(svc, "-", "_", -1) - whitelist := cfg.GetOptionalValue(strings.ToUpper(canonicalName)+"_WHITELISTED_ROUTES", "/") + canonicalName := canonicalizeSvcName(svc) + whitelist := cfg.GetOptionalValue(canonicalName+"_WHITELISTED_ROUTES", "/") return strings.Split(whitelist, ",") } func (cfg *Config) EnableHealthCheckCatalogService() bool { return cfg.GetFeature("ENABLE_HEALTH_CHECK_CATALOG_SVC") } + +func (cfg *Config) CircuitBreakerConfig(svc string) CircuitBreakerConfig { + canonicalName := canonicalizeSvcName(svc) + maxConnections := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_CONNECTIONS", DefaultCircuitBreakerMaxConnections)) + maxPendingRequests := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_PENDING_REQUESTS", DefaultCircuitBreakerMaxPendingRequests)) + maxRequests := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_REQUESTS", DefaultCircuitBreakerMaxRequests)) + maxRetries := uint32(cfg.GetOptionalIntValue(canonicalName+"_CIRCUIT_BREAKER_MAX_RETRIES", DefaultCircuitBreakerMaxRetries)) + + return CircuitBreakerConfig{ + MaxConnections: maxConnections, + MaxPendingRequests: maxPendingRequests, + MaxRequests: maxRequests, + MaxRetries: maxRetries, + } +} + +func canonicalizeSvcName(svc string) string { + return strings.ToUpper(strings.Replace(svc, "-", "_", -1)) +} diff --git a/config/config_test.go b/config/config_test.go index d1e14a9..0806210 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -4,7 +4,7 @@ import ( "os" "testing" - "github.com/gojektech/consul-envoy-xds/config" + "github.com/gojek/consul-envoy-xds/config" "github.com/stretchr/testify/assert" ) @@ -25,3 +25,87 @@ func TestWhiteListedRoutesConfigReplacesHyphens(t *testing.T) { assert.Equal(t, []string{"foo"}, cfg.WhitelistedRoutes("foo-bar")) } + +func TestConfig_CircuitBreakerConfig(t *testing.T) { + + t.Run("MaxConnections", func(t *testing.T) { + t.Run("Should return default value if config is missing", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_CONNECTIONS")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxConnections), cfg.MaxConnections) + }) + + t.Run("Should return value from env vars", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_CONNECTIONS", "100")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(100), cfg.MaxConnections) + }) + }) + + t.Run("MaxPendingRequests", func(t *testing.T) { + t.Run("Should return default value if config is missing", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_PENDING_REQUESTS")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxPendingRequests), cfg.MaxPendingRequests) + }) + + t.Run("Should return value from env vars", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_PENDING_REQUESTS", "100")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(100), cfg.MaxConnections) + }) + }) + + t.Run("MaxRequests", func(t *testing.T) { + + t.Run("Should return default value if config is missing", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_REQUESTS")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxRequests), cfg.MaxRequests) + }) + + t.Run("Should return value from env vars", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_REQUESTS", "100")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(100), cfg.MaxRequests) + }) + }) + + t.Run("MaxRetries", func(t *testing.T) { + t.Run("Should return default value if config is missing", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.MissingEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_RETRIES")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(config.DefaultCircuitBreakerMaxRetries), cfg.MaxRetries) + }) + + t.Run("Should return value from env vars", func(t *testing.T) { + mutation := config.ApplyEnvVars(config.NewEnvVar("FOO_BAR_CIRCUIT_BREAKER_MAX_RETRIES", "5")) + defer mutation.Rollback() + + cfg := config.Load().CircuitBreakerConfig("foo-bar") + + assert.Equal(t, uint32(5), cfg.MaxRetries) + }) + }) +} diff --git a/config/utils.go b/config/utils.go index ea4858f..739ceb8 100644 --- a/config/utils.go +++ b/config/utils.go @@ -38,3 +38,60 @@ func checkKey(key string) { log.Fatalf("%s key is not set", key) } } + +type EnvVar struct { + Present bool + Key string + Value string +} + +func NewEnvVar(key, value string) EnvVar { + return EnvVar{ + Present: true, + Key: key, + Value: value, + } +} + +func MissingEnvVar(key string) EnvVar { + return EnvVar{ + Present: false, + Key: key, + } +} + +type EnvVarMutation struct { + previousValues []EnvVar +} + +func (e EnvVarMutation) Rollback() { + +} + +func ApplyEnvVars(vars ...EnvVar) EnvVarMutation { + var oldEnvVars []EnvVar + + for _, envVar := range vars { + oldValue, exists := os.LookupEnv(envVar.Key) + + oldEnvVars = append(oldEnvVars, EnvVar{ + Present: exists, + Key: envVar.Key, + Value: oldValue, + }) + + if envVar.Present { + Must(os.Setenv(envVar.Key, envVar.Value)) + } else { + Must(os.Unsetenv(envVar.Key)) + } + } + + return EnvVarMutation{previousValues: oldEnvVars} +} + +func Must(err error) { + if err != nil { + panic(err) + } +} diff --git a/eds/endpoint.go b/eds/endpoint.go index dffab9d..a401e7b 100644 --- a/eds/endpoint.go +++ b/eds/endpoint.go @@ -2,10 +2,12 @@ package eds import ( "fmt" + "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster" + "github.com/gojek/consul-envoy-xds/utils" "time" - "github.com/gojektech/consul-envoy-xds/agent" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/agent" + "github.com/gojek/consul-envoy-xds/pubsub" "log" @@ -13,7 +15,7 @@ import ( cpcore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" eds "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" - "github.com/gojektech/consul-envoy-xds/config" + "github.com/gojek/consul-envoy-xds/config" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/watch" "strings" @@ -38,6 +40,8 @@ type service struct { enableHealthCheck bool } +var _ Endpoint = &service{} + func (s *service) serviceNames() []string { var names []string for k := range s.services { @@ -83,8 +87,11 @@ func (s *service) Clusters() []*cp.Cluster { var clusters []*cp.Cluster for _, services := range serviceList { if len(services) > 0 { + serviceName := services[0].ServiceName + svc := s.services[serviceName] + clusters = append(clusters, &cp.Cluster{ - Name: services[0].ServiceName, + Name: serviceName, Type: cp.Cluster_EDS, ConnectTimeout: 1 * time.Second, ProtocolSelection: cp.Cluster_USE_DOWNSTREAM_PROTOCOL, @@ -95,6 +102,15 @@ func (s *service) Clusters() []*cp.Cluster { }, }, }, + CircuitBreakers: &cluster.CircuitBreakers{ + Thresholds: []*cluster.CircuitBreakers_Thresholds{{ + cpcore.RoutingPriority_DEFAULT, + utils.Uint32Value(svc.CircuirBreakerConfig.MaxConnections), + utils.Uint32Value(svc.CircuirBreakerConfig.MaxPendingRequests), + utils.Uint32Value(svc.CircuirBreakerConfig.MaxRequests), + utils.Uint32Value(svc.CircuirBreakerConfig.MaxRetries), + }}, + }, }) } } diff --git a/eds/endpoint_test.go b/eds/endpoint_test.go index 3232682..e15822b 100644 --- a/eds/endpoint_test.go +++ b/eds/endpoint_test.go @@ -1,15 +1,17 @@ package eds_test import ( + "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster" + "github.com/gojek/consul-envoy-xds/utils" "testing" - "github.com/gojektech/consul-envoy-xds/eds" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/pubsub" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" cpcore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" - "github.com/gojektech/consul-envoy-xds/config" + "github.com/gojek/consul-envoy-xds/config" "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -52,6 +54,14 @@ func TestShouldHaveClusterUsingAgentCatalogServiceEndpoints(t *testing.T) { assert.Equal(t, "foo-service", clusters[0].Name) assert.Equal(t, cp.Cluster_USE_DOWNSTREAM_PROTOCOL, clusters[0].ProtocolSelection) + circuitBreakers := cluster.CircuitBreakers{Thresholds: []*cluster.CircuitBreakers_Thresholds{{ + Priority: cpcore.RoutingPriority_DEFAULT, + MaxConnections: utils.Uint32Value(1024), + MaxPendingRequests: utils.Uint32Value(1024), + MaxRequests: utils.Uint32Value(1024), + MaxRetries: utils.Uint32Value(3), + }}} + assert.Equal(t, circuitBreakers, clusters[0].CircuitBreakers) } func TestShouldHaveAuthHeaderRateLimit(t *testing.T) { diff --git a/eds/mock_endpoint.go b/eds/mock_endpoint.go index 846bd30..1e78033 100644 --- a/eds/mock_endpoint.go +++ b/eds/mock_endpoint.go @@ -2,7 +2,7 @@ package eds import ( cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/pubsub" "github.com/hashicorp/consul/watch" "github.com/stretchr/testify/mock" ) diff --git a/eds/service.go b/eds/service.go index c17247e..bd86f09 100644 --- a/eds/service.go +++ b/eds/service.go @@ -1,6 +1,9 @@ package eds +import "github.com/gojek/consul-envoy-xds/config" + type Service struct { - Name string - Whitelist []string + Name string + Whitelist []string + CircuirBreakerConfig config.CircuitBreakerConfig } diff --git a/eds/service_host_test.go b/eds/service_host_test.go index 33010a6..c218534 100644 --- a/eds/service_host_test.go +++ b/eds/service_host_test.go @@ -3,7 +3,7 @@ package eds_test import ( "testing" - "github.com/gojektech/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/eds" "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" diff --git a/edswatch/watch.go b/edswatch/watch.go index 7d650fb..5f2bd21 100644 --- a/edswatch/watch.go +++ b/edswatch/watch.go @@ -1,8 +1,8 @@ package edswatch import ( - "github.com/gojektech/consul-envoy-xds/eds" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/pubsub" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/hashicorp/consul/watch" diff --git a/go.mod b/go.mod index 51eb785..c45960b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 github.com/gojek-engineering/goconfig v0.0.0-20171121145242-6012be80012d - github.com/gojektech/consul-envoy-xds v0.0.0-20190405094702-1389b6876cc8 github.com/golang/protobuf v1.3.2 github.com/hashicorp/consul v1.2.2 github.com/hashicorp/go-cleanhttp v0.0.0-20170211013415-3573b8b52aa7 diff --git a/go.sum b/go.sum index 1194824..20571bc 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/hashicorp/consul v1.2.0 h1:ys4DE07Yg9o3EQMs/VZMP9t2DaMeuFD4zf4phGOhzu8= +github.com/hashicorp/consul v1.2.0/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI= github.com/hashicorp/consul v1.2.2 h1:C5FurAZWLQ+XAjmL9g6rXbPlwxyyz8DvTL0WCAxTLAo= github.com/hashicorp/consul v1.2.2/go.mod h1:mFrjN1mfidgJfYP1xrJCF+AfRhr6Eaqhb2+sfyn/OOI= github.com/hashicorp/go-cleanhttp v0.0.0-20170211013415-3573b8b52aa7 h1:67fHcS+inUoiIqWCKIqeDuq2AlPHNHPiTqp97LdQ+bc= diff --git a/main.go b/main.go index d062797..fb0155d 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,6 @@ package main -import "github.com/gojektech/consul-envoy-xds/app" +import "github.com/gojek/consul-envoy-xds/app" func main() { app.Start() diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index de664b0..7321694 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -3,7 +3,7 @@ package pubsub_test import ( "testing" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/pubsub" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" uuid "github.com/satori/go.uuid" diff --git a/stream/discovery_response_stream_test.go b/stream/discovery_response_stream_test.go index 7c32408..dd45cc1 100644 --- a/stream/discovery_response_stream_test.go +++ b/stream/discovery_response_stream_test.go @@ -3,7 +3,7 @@ package stream_test import ( "testing" - "github.com/gojektech/consul-envoy-xds/stream" + "github.com/gojek/consul-envoy-xds/stream" "strconv" "time" diff --git a/stream/server.go b/stream/server.go index 69edb86..5109163 100644 --- a/stream/server.go +++ b/stream/server.go @@ -2,8 +2,8 @@ package stream import ( cp "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "github.com/gojektech/consul-envoy-xds/eds" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/pubsub" ) //ConsulEDS is an implementation of envoy EDS grpc api via envoy go control plan api contract. diff --git a/stream/subscription_stream.go b/stream/subscription_stream.go index 44d59d6..566ed2c 100644 --- a/stream/subscription_stream.go +++ b/stream/subscription_stream.go @@ -5,8 +5,8 @@ import ( "log" cp "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "github.com/gojektech/consul-envoy-xds/eds" - "github.com/gojektech/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/pubsub" ) //SubscriptionStream is stream of stream of x discovery responses diff --git a/stream/subscription_stream_test.go b/stream/subscription_stream_test.go index 9ec0980..4f669a4 100644 --- a/stream/subscription_stream_test.go +++ b/stream/subscription_stream_test.go @@ -4,11 +4,11 @@ import ( "testing" "time" - "github.com/gojektech/consul-envoy-xds/pubsub" - "github.com/gojektech/consul-envoy-xds/stream" + "github.com/gojek/consul-envoy-xds/pubsub" + "github.com/gojek/consul-envoy-xds/stream" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/gojektech/consul-envoy-xds/eds" + "github.com/gojek/consul-envoy-xds/eds" "github.com/satori/go.uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..67ab4f2 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,7 @@ +package utils + +import "github.com/gogo/protobuf/types" + +func Uint32Value(value uint32) *types.UInt32Value { + return &types.UInt32Value{Value: value} +}