diff --git a/Gopkg.lock b/Gopkg.lock index e273c12..37446ff 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -394,6 +394,7 @@ "github.com/hashicorp/consul/lib/freeport", "github.com/hashicorp/consul/testutil", "github.com/hashicorp/consul/watch", + "github.com/newrelic/go-agent", "github.com/satori/go.uuid", "github.com/spf13/viper", "github.com/stretchr/testify/assert", diff --git a/Gopkg.toml b/Gopkg.toml index ab48e72..0c91294 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -52,3 +52,7 @@ [[constraint]] name = "google.golang.org/grpc" version = "1.8.2" + +[[constraint]] + name = "github.com/newrelic/go-agent" + version = "~1.11.0" diff --git a/Makefile b/Makefile index e5058a8..5de3f4d 100644 --- a/Makefile +++ b/Makefile @@ -17,12 +17,14 @@ setup: go get github.com/DATA-DOG/godog/cmd/godog go get -u github.com/go-playground/overalls go get -u github.com/golang/dep/cmd/dep - dep ensure + dep ensure -v mkdir -p out/ go build -o $(APP_EXECUTABLE) - cp application.yml.sample application.yml @echo "consul-envoy-xds is setup!! Run make test to run tests" +copy-config: + cp application.yml.sample application.yml + build-deps: go install @@ -50,7 +52,7 @@ lint: golint $$p | { grep -vwE "exported (var|function|method|type|const) \S+ should have comment" || true; } \ done -test: compile +test: compile copy-config ENVIRONMENT=test go test $(UNIT_TEST_PACKAGES) -p=1 test-coverage: compile diff --git a/application.yml.sample b/application.yml.sample index 06c1db2..2ff9814 100644 --- a/application.yml.sample +++ b/application.yml.sample @@ -5,3 +5,4 @@ CONSUL_CLIENT_HOST: localhost CONSUL_DC: dc1 CONSUL_TOKEN: "" WATCHED_SERVICE: foo-service +NEW_RELIC_LICENSE_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx diff --git a/config/config.go b/config/config.go index 37eb33c..6bc13f1 100644 --- a/config/config.go +++ b/config/config.go @@ -50,6 +50,18 @@ func (cfg *Config) WatchedServices() []string { return strings.Split(cfg.GetValue("WATCHED_SERVICE"), ",") } +func (cfg *Config) AppName() string { + return cfg.GetOptionalValue("APP_NAME", "consul-envoy-xds") +} + +func (cfg *Config) NewRelicLicenseKey() string { + return cfg.GetValue("NEW_RELIC_LICENSE_KEY") +} + +func (cfg *Config) NewRelicEnabled() bool { + return cfg.GetFeature("NEW_RELIC_ENABLED") +} + func (cfg *Config) WhitelistedRoutes(svc string) []string { canonicalName := strings.Replace(svc, "-", "_", -1) whitelist := cfg.GetOptionalValue(strings.ToUpper(canonicalName)+"_WHITELISTED_ROUTES", "/") diff --git a/eds/endpoint.go b/eds/endpoint.go index e33f0e7..4e9a384 100644 --- a/eds/endpoint.go +++ b/eds/endpoint.go @@ -1,18 +1,19 @@ package eds import ( + "context" "fmt" - "time" - - "github.com/gojektech/consul-envoy-xds/agent" - "github.com/gojektech/consul-envoy-xds/pubsub" - "log" + "time" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" cpcore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" eds "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" + "github.com/gojektech/consul-envoy-xds/agent" + "github.com/gojektech/consul-envoy-xds/eventctx" + "github.com/gojektech/consul-envoy-xds/instrument" + "github.com/gojektech/consul-envoy-xds/pubsub" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/watch" ) @@ -126,8 +127,11 @@ func (s *service) WatchPlan(publish func(*pubsub.Event)) (*watch.Plan, error) { return nil, err } plan.Handler = func(idx uint64, data interface{}) { + txn := instrument.NewRelicApp().StartTransaction("watch", nil, nil) + defer txn.End() log.Println(fmt.Sprintf("consul watch triggerred: %v", data)) - publish(&pubsub.Event{CLA: s.CLA(), Clusters: s.Clusters(), Routes: s.Routes()}) + ctx := eventctx.SetNewRelicTxn(context.Background(), txn) + publish(&pubsub.Event{Context: ctx, CLA: s.CLA(), Clusters: s.Clusters(), Routes: s.Routes()}) } return plan, nil } diff --git a/eventctx/ctx.go b/eventctx/ctx.go new file mode 100644 index 0000000..ab5b787 --- /dev/null +++ b/eventctx/ctx.go @@ -0,0 +1,49 @@ +package eventctx + +import ( + "context" + "github.com/newrelic/go-agent" + "net/http" +) + +const newRelicTxnKey = "CTX_NEW_RELIC_TXN_KEY" + +type noOpTransaction struct { + http.ResponseWriter +} + +func (t noOpTransaction) End() error { + return nil +} + +func (t noOpTransaction) Ignore() error { + return nil +} + +func (t noOpTransaction) SetName(name string) error { + return nil +} + +func (t noOpTransaction) NoticeError(err error) error { + return nil +} + +func (t noOpTransaction) AddAttribute(key string, value interface{}) error { + return nil +} + +func (t noOpTransaction) StartSegmentNow() newrelic.SegmentStartTime { + return newrelic.SegmentStartTime{} +} + +func NewRelicTxn(ctx context.Context) newrelic.Transaction { + val := ctx.Value(newRelicTxnKey) + if val == nil { + return noOpTransaction{} + } + return val.(newrelic.Transaction) +} + +func SetNewRelicTxn(ctx context.Context, txn newrelic.Transaction) context.Context { + return context.WithValue(ctx, newRelicTxnKey, txn) +} diff --git a/instrument/newrelic.go b/instrument/newrelic.go new file mode 100644 index 0000000..4d0bcf8 --- /dev/null +++ b/instrument/newrelic.go @@ -0,0 +1,25 @@ +package instrument + +import ( + "log" + + "github.com/gojektech/consul-envoy-xds/config" + "github.com/newrelic/go-agent" +) + +var newRelicApp newrelic.Application + +func init() { + cfg := config.Load() + newRelicCfg := newrelic.NewConfig(cfg.AppName(), cfg.NewRelicLicenseKey()) + newRelicCfg.Enabled = cfg.NewRelicEnabled() + var err error + newRelicApp, err = newrelic.NewApplication(newRelicCfg) + if err != nil { + log.Fatal("failed to initialize new relic application", err) + } +} + +func NewRelicApp() newrelic.Application { + return newRelicApp +} diff --git a/pubsub/hub.go b/pubsub/hub.go index dcf211e..0d14465 100644 --- a/pubsub/hub.go +++ b/pubsub/hub.go @@ -4,6 +4,7 @@ import ( "log" "sync" + "context" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/satori/go.uuid" ) @@ -21,6 +22,7 @@ type Event struct { CLA []*cp.ClusterLoadAssignment Clusters []*cp.Cluster Routes []*cp.RouteConfiguration + Context context.Context } type hub struct { diff --git a/pubsub/hub_test.go b/pubsub/hub_test.go index 187b853..656b2e4 100644 --- a/pubsub/hub_test.go +++ b/pubsub/hub_test.go @@ -3,6 +3,7 @@ package pubsub import ( "testing" + "context" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/stretchr/testify/assert" ) @@ -12,7 +13,7 @@ func TestShouldAddSubscriptionToListOfSubscribers(t *testing.T) { subscription := hub.Subscribe() cla := []*cp.ClusterLoadAssignment{} cluster := &cp.Cluster{} - event := &Event{cla, []*cp.Cluster{cluster}, nil} + event := &Event{Context: context.Background(), CLA: cla, Clusters: []*cp.Cluster{cluster}, Routes: nil} hub.Publish(event) a := <-subscription.Events assert.Equal(t, 1, hub.Size()) diff --git a/stream/discovery_response_stream.go b/stream/discovery_response_stream.go index 2b9d0c3..2837e6c 100644 --- a/stream/discovery_response_stream.go +++ b/stream/discovery_response_stream.go @@ -6,22 +6,26 @@ import ( "time" + "context" + cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" dis "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/gojektech/consul-envoy-xds/eventctx" + "github.com/newrelic/go-agent" ) type EndpointDiscoveryResponseStream interface { - SendEDS([]*cp.ClusterLoadAssignment) error + SendEDS(context.Context, []*cp.ClusterLoadAssignment) error } type ClusterDiscoveryResponseStream interface { - SendCDS([]*cp.Cluster) error + SendCDS(context.Context, []*cp.Cluster) error } type RouteDiscoveryResponseStream interface { - SendRDS([]*cp.RouteConfiguration) error + SendRDS(context.Context, []*cp.RouteConfiguration) error } //DiscoveryResponseStream is an xDS Stream wrapper and wraps grpc stream API and pipes DiscoveryResponse events to it. @@ -38,7 +42,7 @@ type responseStream struct { } //Send a CLA on current stream -func (streamer *responseStream) SendEDS(cLAList []*cp.ClusterLoadAssignment) error { +func (streamer *responseStream) SendEDS(ctx context.Context, cLAList []*cp.ClusterLoadAssignment) error { var resources []types.Any for _, cLA := range cLAList { data, err := proto.Marshal(cLA) @@ -57,7 +61,7 @@ func (streamer *responseStream) SendEDS(cLAList []*cp.ClusterLoadAssignment) err TypeUrl: "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", Nonce: strconv.FormatInt(int64(streamer.nonce), 10), } - streamer.stream.Send(resp) + streamer.send(ctx, resp) log.Printf("sent EDS on stream: %v", resp) streamer.version = time.Now().UnixNano() streamer.nonce = time.Now().UnixNano() @@ -65,7 +69,7 @@ func (streamer *responseStream) SendEDS(cLAList []*cp.ClusterLoadAssignment) err } //Send a Cluster on current stream -func (streamer *responseStream) SendCDS(clusters []*cp.Cluster) error { +func (streamer *responseStream) SendCDS(ctx context.Context, clusters []*cp.Cluster) error { if len(clusters) == 0 { return nil } @@ -87,7 +91,7 @@ func (streamer *responseStream) SendCDS(clusters []*cp.Cluster) error { TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster", Nonce: strconv.FormatInt(int64(streamer.nonce), 10), } - streamer.stream.Send(resp) + streamer.send(ctx, resp) log.Printf("sent CDS on stream: %v", resp) streamer.version = time.Now().UnixNano() streamer.nonce = time.Now().UnixNano() @@ -95,7 +99,7 @@ func (streamer *responseStream) SendCDS(clusters []*cp.Cluster) error { } //Send a Cluster on current stream -func (streamer *responseStream) SendRDS(routeConfig []*cp.RouteConfiguration) error { +func (streamer *responseStream) SendRDS(ctx context.Context, routeConfig []*cp.RouteConfiguration) error { if len(routeConfig) == 0 { return nil } @@ -117,13 +121,20 @@ func (streamer *responseStream) SendRDS(routeConfig []*cp.RouteConfiguration) er TypeUrl: "type.googleapis.com/envoy.api.v2.RouteConfiguration", Nonce: strconv.FormatInt(int64(streamer.nonce), 10), } - streamer.stream.Send(resp) + streamer.send(ctx, resp) log.Printf("sent RDS on stream: %v", resp) streamer.version = time.Now().UnixNano() streamer.nonce = time.Now().UnixNano() return nil } +func (streamer *responseStream) send(ctx context.Context, resp *cp.DiscoveryResponse) error { + txn := eventctx.NewRelicTxn(ctx) + seg := newrelic.StartSegment(txn, resp.GetTypeUrl()) + defer seg.End() + return streamer.stream.Send(resp) +} + //NewDiscoveryResponseStream creates a DiscoveryResponseStream func NewDiscoveryResponseStream(stream dis.AggregatedDiscoveryService_StreamAggregatedResourcesServer) DiscoveryResponseStream { return &responseStream{stream: stream, nonce: time.Now().UnixNano(), version: time.Now().UnixNano()} diff --git a/stream/discovery_response_stream_test.go b/stream/discovery_response_stream_test.go index 7c32408..3b0efe6 100644 --- a/stream/discovery_response_stream_test.go +++ b/stream/discovery_response_stream_test.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "context" cp "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" @@ -19,7 +20,8 @@ func TestShouldStreamEndpointDiscoveryResponse(t *testing.T) { mockStream.On("Send", mock.AnythingOfType("*v2.DiscoveryResponse")).Return(nil) drs := stream.NewDiscoveryResponseStream(mockStream) cla := []*cp.ClusterLoadAssignment{{}} - drs.SendEDS(cla) + background := context.Background() + drs.SendEDS(background, cla) assert.Equal(t, "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", mockStream.Capture().TypeUrl) capturedResponse := mockStream.Capture() assert.Equal(t, 1, len(capturedResponse.Resources)) @@ -38,7 +40,7 @@ func TestShouldStreamEndpointDiscoveryResponseWithMultipleClusterResources(t *te &cp.Cluster{Name: "foo"}, &cp.Cluster{Name: "bar"}, } - drs.SendCDS(clusters) + drs.SendCDS(context.Background(), clusters) assert.Equal(t, "type.googleapis.com/envoy.api.v2.Cluster", mockStream.Capture().TypeUrl) capturedResponse := mockStream.Capture() assert.Equal(t, 2, len(capturedResponse.Resources)) @@ -60,7 +62,7 @@ func TestShouldStreamEndpointDiscoveryResponseWithMultipleRouteResources(t *test &cp.RouteConfiguration{Name: "foo"}, &cp.RouteConfiguration{Name: "bar"}, } - drs.SendRDS(routeConfig) + drs.SendRDS(context.Background(), routeConfig) assert.Equal(t, "type.googleapis.com/envoy.api.v2.RouteConfiguration", mockStream.Capture().TypeUrl) capturedResponse := mockStream.Capture() assert.Equal(t, 2, len(capturedResponse.Resources)) @@ -80,19 +82,19 @@ func TestShouldStreamEndpointDiscoveryResponseWithIncrementingNonceAndVersion(t mockStream.On("Send", mock.AnythingOfType("*v2.DiscoveryResponse")).Times(3).Return(nil) drs := stream.NewDiscoveryResponseStream(mockStream) - drs.SendEDS([]*cp.ClusterLoadAssignment{}) + drs.SendEDS(context.Background(), []*cp.ClusterLoadAssignment{}) nonce1, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce1) > now) version1, _ := strconv.Atoi(mockStream.Capture().VersionInfo) assert.True(t, int64(version1) > now) - drs.SendEDS([]*cp.ClusterLoadAssignment{}) + drs.SendEDS(context.Background(), []*cp.ClusterLoadAssignment{}) nonce2, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce2) > now) version2, _ := strconv.Atoi(mockStream.Capture().VersionInfo) assert.True(t, int64(version2) > now) - drs.SendEDS([]*cp.ClusterLoadAssignment{}) + drs.SendEDS(context.Background(), []*cp.ClusterLoadAssignment{}) nonce3, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce3) > now) version3, _ := strconv.Atoi(mockStream.Capture().VersionInfo) @@ -107,7 +109,7 @@ func TestShouldStreamClusterDiscoveryResponse(t *testing.T) { mockStream.On("Send", mock.AnythingOfType("*v2.DiscoveryResponse")).Return(nil) drs := stream.NewDiscoveryResponseStream(mockStream) cluster := &cp.Cluster{} - drs.SendCDS([]*cp.Cluster{cluster}) + drs.SendCDS(context.Background(), []*cp.Cluster{cluster}) assert.Equal(t, "type.googleapis.com/envoy.api.v2.Cluster", mockStream.Capture().TypeUrl) capturedResponse := mockStream.Capture() assert.Equal(t, 1, len(capturedResponse.Resources)) @@ -124,19 +126,19 @@ func TestShouldStreamClusterDiscoveryResponseWithIncrementingNonceAndVersion(t * mockStream.On("Send", mock.AnythingOfType("*v2.DiscoveryResponse")).Times(3).Return(nil) drs := stream.NewDiscoveryResponseStream(mockStream) - drs.SendCDS([]*cp.Cluster{{}}) + drs.SendCDS(context.Background(), []*cp.Cluster{{}}) nonce1, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce1) > now) version1, _ := strconv.Atoi(mockStream.Capture().VersionInfo) assert.True(t, int64(version1) > now) - drs.SendCDS([]*cp.Cluster{{}}) + drs.SendCDS(context.Background(), []*cp.Cluster{{}}) nonce2, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce2) > now) version2, _ := strconv.Atoi(mockStream.Capture().VersionInfo) assert.True(t, int64(version2) > now) - drs.SendCDS([]*cp.Cluster{{}}) + drs.SendCDS(context.Background(), []*cp.Cluster{{}}) nonce3, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce3) > now) version3, _ := strconv.Atoi(mockStream.Capture().VersionInfo) @@ -151,7 +153,7 @@ func TestShouldStreamRouteConfigurationResponse(t *testing.T) { mockStream.On("Send", mock.AnythingOfType("*v2.DiscoveryResponse")).Return(nil) drs := stream.NewDiscoveryResponseStream(mockStream) route := &cp.RouteConfiguration{} - drs.SendRDS([]*cp.RouteConfiguration{route}) + drs.SendRDS(context.Background(), []*cp.RouteConfiguration{route}) assert.Equal(t, "type.googleapis.com/envoy.api.v2.RouteConfiguration", mockStream.Capture().TypeUrl) capturedResponse := mockStream.Capture() assert.Equal(t, 1, len(capturedResponse.Resources)) @@ -168,19 +170,19 @@ func TestShouldStreamRouteConfigurationResponseWithIncrementingNonceAndVersion(t mockStream.On("Send", mock.AnythingOfType("*v2.DiscoveryResponse")).Times(3).Return(nil) drs := stream.NewDiscoveryResponseStream(mockStream) - drs.SendRDS([]*cp.RouteConfiguration{{}}) + drs.SendRDS(context.Background(), []*cp.RouteConfiguration{{}}) nonce1, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce1) > now) version1, _ := strconv.Atoi(mockStream.Capture().VersionInfo) assert.True(t, int64(version1) > now) - drs.SendRDS([]*cp.RouteConfiguration{{}}) + drs.SendRDS(context.Background(), []*cp.RouteConfiguration{{}}) nonce2, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce2) > now) version2, _ := strconv.Atoi(mockStream.Capture().VersionInfo) assert.True(t, int64(version2) > now) - drs.SendRDS([]*cp.RouteConfiguration{{}}) + drs.SendRDS(context.Background(), []*cp.RouteConfiguration{{}}) nonce3, _ := strconv.Atoi(mockStream.Capture().Nonce) assert.True(t, int64(nonce3) > now) version3, _ := strconv.Atoi(mockStream.Capture().VersionInfo) diff --git a/stream/subscription_stream.go b/stream/subscription_stream.go index c38da01..02ec3db 100644 --- a/stream/subscription_stream.go +++ b/stream/subscription_stream.go @@ -4,8 +4,12 @@ import ( "io" "log" + "context" + cp "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/gojektech/consul-envoy-xds/eds" + "github.com/gojektech/consul-envoy-xds/eventctx" + "github.com/gojektech/consul-envoy-xds/instrument" "github.com/gojektech/consul-envoy-xds/pubsub" ) @@ -46,11 +50,7 @@ func (es *subscriptionStream) Stream() error { for { select { case e := <-es.subscription.Events: - if e != nil { - responseStream.SendCDS(e.Clusters) - responseStream.SendRDS(e.Routes) - responseStream.SendEDS(e.CLA) - } + es.process(e, responseStream) } } }() @@ -66,6 +66,17 @@ func (es *subscriptionStream) Stream() error { return nil } +func (es *subscriptionStream) process(e *pubsub.Event, responseStream DiscoveryResponseStream) { + txn := instrument.NewRelicApp().StartTransaction("discovery_response_stream", nil, nil) + defer txn.End() + ctx := eventctx.SetNewRelicTxn(context.Background(), txn) + if e != nil { + responseStream.SendCDS(ctx, e.Clusters) + responseStream.SendRDS(ctx, e.Routes) + responseStream.SendEDS(ctx, e.CLA) + } +} + func NewSubscriptionStream(stream cp.AggregatedDiscoveryService_StreamAggregatedResourcesServer, subscription *pubsub.Subscription, service eds.Endpoint, hub pubsub.Hub) SubscriptionStream { return &subscriptionStream{stream: stream, subscription: subscription, service: service, hub: hub} }