Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,7 @@
[[constraint]]
name = "google.golang.org/grpc"
version = "1.8.2"

[[constraint]]
name = "github.com/newrelic/go-agent"
version = "~1.11.0"
8 changes: 5 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ setup:
go get github.com/DATA-DOG/godog/cmd/godog
go get -u github.com/go-playground/overalls
go get -u github.com/golang/dep/cmd/dep
dep ensure
dep ensure -v
mkdir -p out/
go build -o $(APP_EXECUTABLE)
cp application.yml.sample application.yml
@echo "consul-envoy-xds is setup!! Run make test to run tests"

copy-config:
cp application.yml.sample application.yml

build-deps:
go install

Expand Down Expand Up @@ -50,7 +52,7 @@ lint:
golint $$p | { grep -vwE "exported (var|function|method|type|const) \S+ should have comment" || true; } \
done

test: compile
test: compile copy-config
ENVIRONMENT=test go test $(UNIT_TEST_PACKAGES) -p=1

test-coverage: compile
Expand Down
1 change: 1 addition & 0 deletions application.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ CONSUL_CLIENT_HOST: localhost
CONSUL_DC: dc1
CONSUL_TOKEN: ""
WATCHED_SERVICE: foo-service
NEW_RELIC_LICENSE_KEY: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
12 changes: 12 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,18 @@ func (cfg *Config) WatchedServices() []string {
return strings.Split(cfg.GetValue("WATCHED_SERVICE"), ",")
}

func (cfg *Config) AppName() string {
return cfg.GetOptionalValue("APP_NAME", "consul-envoy-xds")
}

func (cfg *Config) NewRelicLicenseKey() string {
return cfg.GetValue("NEW_RELIC_LICENSE_KEY")
}

func (cfg *Config) NewRelicEnabled() bool {
return cfg.GetFeature("NEW_RELIC_ENABLED")
}

func (cfg *Config) WhitelistedRoutes(svc string) []string {
canonicalName := strings.Replace(svc, "-", "_", -1)
whitelist := cfg.GetOptionalValue(strings.ToUpper(canonicalName)+"_WHITELISTED_ROUTES", "/")
Expand Down
16 changes: 10 additions & 6 deletions eds/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package eds

import (
"context"
"fmt"
"time"

"github.com/gojektech/consul-envoy-xds/agent"
"github.com/gojektech/consul-envoy-xds/pubsub"

"log"
"time"

cp "github.com/envoyproxy/go-control-plane/envoy/api/v2"
cpcore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
eds "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/gojektech/consul-envoy-xds/agent"
"github.com/gojektech/consul-envoy-xds/eventctx"
"github.com/gojektech/consul-envoy-xds/instrument"
"github.com/gojektech/consul-envoy-xds/pubsub"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/watch"
)
Expand Down Expand Up @@ -126,8 +127,11 @@ func (s *service) WatchPlan(publish func(*pubsub.Event)) (*watch.Plan, error) {
return nil, err
}
plan.Handler = func(idx uint64, data interface{}) {
txn := instrument.NewRelicApp().StartTransaction("watch", nil, nil)
defer txn.End()
log.Println(fmt.Sprintf("consul watch triggerred: %v", data))
publish(&pubsub.Event{CLA: s.CLA(), Clusters: s.Clusters(), Routes: s.Routes()})
ctx := eventctx.SetNewRelicTxn(context.Background(), txn)
publish(&pubsub.Event{Context: ctx, CLA: s.CLA(), Clusters: s.Clusters(), Routes: s.Routes()})
}
return plan, nil
}
Expand Down
49 changes: 49 additions & 0 deletions eventctx/ctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package eventctx

import (
"context"
"github.com/newrelic/go-agent"
"net/http"
)

const newRelicTxnKey = "CTX_NEW_RELIC_TXN_KEY"

type noOpTransaction struct {
http.ResponseWriter
}

func (t noOpTransaction) End() error {
return nil
}

func (t noOpTransaction) Ignore() error {
return nil
}

func (t noOpTransaction) SetName(name string) error {
return nil
}

func (t noOpTransaction) NoticeError(err error) error {
return nil
}

func (t noOpTransaction) AddAttribute(key string, value interface{}) error {
return nil
}

func (t noOpTransaction) StartSegmentNow() newrelic.SegmentStartTime {
return newrelic.SegmentStartTime{}
}

func NewRelicTxn(ctx context.Context) newrelic.Transaction {
val := ctx.Value(newRelicTxnKey)
if val == nil {
return noOpTransaction{}
}
return val.(newrelic.Transaction)
}

func SetNewRelicTxn(ctx context.Context, txn newrelic.Transaction) context.Context {
return context.WithValue(ctx, newRelicTxnKey, txn)
}
25 changes: 25 additions & 0 deletions instrument/newrelic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package instrument

import (
"log"

"github.com/gojektech/consul-envoy-xds/config"
"github.com/newrelic/go-agent"
)

var newRelicApp newrelic.Application

func init() {
cfg := config.Load()
newRelicCfg := newrelic.NewConfig(cfg.AppName(), cfg.NewRelicLicenseKey())
newRelicCfg.Enabled = cfg.NewRelicEnabled()
var err error
newRelicApp, err = newrelic.NewApplication(newRelicCfg)
if err != nil {
log.Fatal("failed to initialize new relic application", err)
}
}

func NewRelicApp() newrelic.Application {
return newRelicApp
}
2 changes: 2 additions & 0 deletions pubsub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"sync"

"context"
cp "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/satori/go.uuid"
)
Expand All @@ -21,6 +22,7 @@ type Event struct {
CLA []*cp.ClusterLoadAssignment
Clusters []*cp.Cluster
Routes []*cp.RouteConfiguration
Context context.Context
}

type hub struct {
Expand Down
3 changes: 2 additions & 1 deletion pubsub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pubsub
import (
"testing"

"context"
cp "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/stretchr/testify/assert"
)
Expand All @@ -12,7 +13,7 @@ func TestShouldAddSubscriptionToListOfSubscribers(t *testing.T) {
subscription := hub.Subscribe()
cla := []*cp.ClusterLoadAssignment{}
cluster := &cp.Cluster{}
event := &Event{cla, []*cp.Cluster{cluster}, nil}
event := &Event{Context: context.Background(), CLA: cla, Clusters: []*cp.Cluster{cluster}, Routes: nil}
hub.Publish(event)
a := <-subscription.Events
assert.Equal(t, 1, hub.Size())
Expand Down
29 changes: 20 additions & 9 deletions stream/discovery_response_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ import (

"time"

"context"

cp "github.com/envoyproxy/go-control-plane/envoy/api/v2"
dis "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/gojektech/consul-envoy-xds/eventctx"
"github.com/newrelic/go-agent"
)

type EndpointDiscoveryResponseStream interface {
SendEDS([]*cp.ClusterLoadAssignment) error
SendEDS(context.Context, []*cp.ClusterLoadAssignment) error
}

type ClusterDiscoveryResponseStream interface {
SendCDS([]*cp.Cluster) error
SendCDS(context.Context, []*cp.Cluster) error
}

type RouteDiscoveryResponseStream interface {
SendRDS([]*cp.RouteConfiguration) error
SendRDS(context.Context, []*cp.RouteConfiguration) error
}

//DiscoveryResponseStream is an xDS Stream wrapper and wraps grpc stream API and pipes DiscoveryResponse events to it.
Expand All @@ -38,7 +42,7 @@ type responseStream struct {
}

//Send a CLA on current stream
func (streamer *responseStream) SendEDS(cLAList []*cp.ClusterLoadAssignment) error {
func (streamer *responseStream) SendEDS(ctx context.Context, cLAList []*cp.ClusterLoadAssignment) error {
var resources []types.Any
for _, cLA := range cLAList {
data, err := proto.Marshal(cLA)
Expand All @@ -57,15 +61,15 @@ func (streamer *responseStream) SendEDS(cLAList []*cp.ClusterLoadAssignment) err
TypeUrl: "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
Nonce: strconv.FormatInt(int64(streamer.nonce), 10),
}
streamer.stream.Send(resp)
streamer.send(ctx, resp)
log.Printf("sent EDS on stream: %v", resp)
streamer.version = time.Now().UnixNano()
streamer.nonce = time.Now().UnixNano()
return nil
}

//Send a Cluster on current stream
func (streamer *responseStream) SendCDS(clusters []*cp.Cluster) error {
func (streamer *responseStream) SendCDS(ctx context.Context, clusters []*cp.Cluster) error {
if len(clusters) == 0 {
return nil
}
Expand All @@ -87,15 +91,15 @@ func (streamer *responseStream) SendCDS(clusters []*cp.Cluster) error {
TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster",
Nonce: strconv.FormatInt(int64(streamer.nonce), 10),
}
streamer.stream.Send(resp)
streamer.send(ctx, resp)
log.Printf("sent CDS on stream: %v", resp)
streamer.version = time.Now().UnixNano()
streamer.nonce = time.Now().UnixNano()
return nil
}

//Send a Cluster on current stream
func (streamer *responseStream) SendRDS(routeConfig []*cp.RouteConfiguration) error {
func (streamer *responseStream) SendRDS(ctx context.Context, routeConfig []*cp.RouteConfiguration) error {
if len(routeConfig) == 0 {
return nil
}
Expand All @@ -117,13 +121,20 @@ func (streamer *responseStream) SendRDS(routeConfig []*cp.RouteConfiguration) er
TypeUrl: "type.googleapis.com/envoy.api.v2.RouteConfiguration",
Nonce: strconv.FormatInt(int64(streamer.nonce), 10),
}
streamer.stream.Send(resp)
streamer.send(ctx, resp)
log.Printf("sent RDS on stream: %v", resp)
streamer.version = time.Now().UnixNano()
streamer.nonce = time.Now().UnixNano()
return nil
}

func (streamer *responseStream) send(ctx context.Context, resp *cp.DiscoveryResponse) error {
txn := eventctx.NewRelicTxn(ctx)
seg := newrelic.StartSegment(txn, resp.GetTypeUrl())
defer seg.End()
return streamer.stream.Send(resp)
}

//NewDiscoveryResponseStream creates a DiscoveryResponseStream
func NewDiscoveryResponseStream(stream dis.AggregatedDiscoveryService_StreamAggregatedResourcesServer) DiscoveryResponseStream {
return &responseStream{stream: stream, nonce: time.Now().UnixNano(), version: time.Now().UnixNano()}
Expand Down
Loading