Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions controller/api/destination/destination_fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ func FuzzAdd(data []byte) int {
}
t := &testing.T{}
_, translator := makeEndpointTranslator(t)
translator.Start()
defer translator.Stop()
translator.Add(set)
translator.Remove(set)
return 1
Expand Down Expand Up @@ -98,8 +96,6 @@ func FuzzProfileTranslatorUpdate(data []byte) int {
if err != nil {
return 0
}
translator.Start()
defer translator.Stop()
translator.Update(profile)
return 1
}
178 changes: 54 additions & 124 deletions controller/api/destination/endpoint_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/netip"
"reflect"
"sync"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
Expand Down Expand Up @@ -34,6 +35,8 @@ const (
// into Destination.Get messages.
type (
endpointTranslator struct {
mu sync.Mutex // protects availableEndpoints and filteredSnapshot

controllerNS string
identityTrustDomain string
nodeTopologyZone string
Expand All @@ -51,25 +54,11 @@ type (

availableEndpoints watcher.AddressSet
filteredSnapshot watcher.AddressSet
stream pb.Destination_GetServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter

updates chan interface{}
stop chan struct{}
}

addUpdate struct {
set watcher.AddressSet
}

removeUpdate struct {
set watcher.AddressSet
}

noEndpointsUpdate struct {
exists bool
messages chan<- *pb.Update
resetStream func()
}
)

Expand All @@ -96,10 +85,9 @@ func newEndpointTranslator(
srcNodeName string,
defaultOpaquePorts map[uint32]struct{},
k8sAPI *k8s.MetadataAPI,
stream pb.Destination_GetServer,
endStream chan struct{},
messages chan<- *pb.Update,
resetStream func(),
log *logging.Entry,
queueCapacity int,
) (*endpointTranslator, error) {
log = log.WithFields(logging.Fields{
"component": "endpoint-translator",
Expand All @@ -120,109 +108,30 @@ func newEndpointTranslator(
}

return &endpointTranslator{
controllerNS,
identityTrustDomain,
nodeTopologyZone,
srcNodeName,
defaultOpaquePorts,
forceOpaqueTransport,
enableH2Upgrade,
enableEndpointFiltering,
enableIPv6,
extEndpointZoneWeights,
meshedHTTP2ClientParams,

availableEndpoints,
filteredSnapshot,
stream,
endStream,
log,
counter,
make(chan interface{}, queueCapacity),
make(chan struct{}),
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
nodeTopologyZone: nodeTopologyZone,
nodeName: srcNodeName,
defaultOpaquePorts: defaultOpaquePorts,
forceOpaqueTransport: forceOpaqueTransport,
enableH2Upgrade: enableH2Upgrade,
enableEndpointFiltering: enableEndpointFiltering,
enableIPv6: enableIPv6,
extEndpointZoneWeights: extEndpointZoneWeights,
meshedHTTP2ClientParams: meshedHTTP2ClientParams,
availableEndpoints: availableEndpoints,
filteredSnapshot: filteredSnapshot,
log: log,
overflowCounter: counter,
messages: messages,
resetStream: resetStream,
}, nil
}

func (et *endpointTranslator) Add(set watcher.AddressSet) {
et.enqueueUpdate(&addUpdate{set})
}

func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.enqueueUpdate(&removeUpdate{set})
}
et.mu.Lock()
defer et.mu.Unlock()

func (et *endpointTranslator) NoEndpoints(exists bool) {
et.enqueueUpdate(&noEndpointsUpdate{exists})
}

// Add, Remove, and NoEndpoints are called from a client-go informer callback
// and therefore must not block. For each of these, we enqueue an update in
// a channel so that it can be processed asyncronously. To ensure that enqueuing
// does not block, we first check to see if there is capacity in the buffered
// channel. If there is not, we drop the update and signal to the stream that
// it has fallen too far behind and should be closed.
func (et *endpointTranslator) enqueueUpdate(update interface{}) {
select {
case et.updates <- update:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
et.overflowCounter.Inc()
select {
case <-et.endStream:
// The endStream channel has already been closed so no action is
// necessary.
default:
et.log.Error("endpoint update queue full; aborting stream")
close(et.endStream)
}
}
}

// Start initiates a goroutine which processes update events off of the
// endpointTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls several non-thread-safe functions (including
// Send) and therefore, Start must not be called more than once.
func (et *endpointTranslator) Start() {
go func() {
for {
select {
case update, ok := <-et.updates:
if !ok {
return
}
et.processUpdate(update)
case <-et.stop:
return
}
}
}()
}

// Stop terminates the goroutine started by Start.
func (et *endpointTranslator) Stop() {
close(et.stop)
}

// DrainAndStop closes the updates channel, causing the goroutine started by
// Start to terminate after processing all remaining updates.
func (et *endpointTranslator) DrainAndStop() {
close(et.updates)
}

func (et *endpointTranslator) processUpdate(update interface{}) {
switch update := update.(type) {
case *addUpdate:
et.add(update.set)
case *removeUpdate:
et.remove(update.set)
case *noEndpointsUpdate:
et.noEndpoints(update.exists)
}
}

func (et *endpointTranslator) add(set watcher.AddressSet) {
for id, address := range set.Addresses {
et.availableEndpoints.Addresses[id] = address
}
Expand All @@ -233,15 +142,21 @@ func (et *endpointTranslator) add(set watcher.AddressSet) {
et.sendFilteredUpdate()
}

func (et *endpointTranslator) remove(set watcher.AddressSet) {
func (et *endpointTranslator) Remove(set watcher.AddressSet) {
et.mu.Lock()
defer et.mu.Unlock()

for id := range set.Addresses {
delete(et.availableEndpoints.Addresses, id)
}

et.sendFilteredUpdate()
}

func (et *endpointTranslator) noEndpoints(exists bool) {
func (et *endpointTranslator) NoEndpoints(exists bool) {
et.mu.Lock()
defer et.mu.Unlock()

et.log.Debugf("NoEndpoints(%+v)", exists)

et.availableEndpoints.Addresses = map[watcher.ID]watcher.Address{}
Expand Down Expand Up @@ -501,9 +416,7 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
}}

et.log.Debugf("Sending destination add: %+v", add)
if err := et.stream.Send(add); err != nil {
et.log.Debugf("Failed to send address update: %s", err)
}
et.enqueueUpdate(add)
}

func (et *endpointTranslator) sendClientRemove(set watcher.AddressSet) {
Expand All @@ -524,8 +437,25 @@ func (et *endpointTranslator) sendClientRemove(set watcher.AddressSet) {
}}

et.log.Debugf("Sending destination remove: %+v", remove)
if err := et.stream.Send(remove); err != nil {
et.log.Debugf("Failed to send address update: %s", err)
et.enqueueUpdate(remove)
}

// Add, Remove, and NoEndpoints are called from a client-go informer callback
// and therefore must not block. For each of these, we enqueue an update in
// a channel so that it can be processed asyncronously. To ensure that enqueuing
// does not block, we first check to see if there is capacity in the buffered
// channel. If there is not, we drop the update and signal to the stream that
// it has fallen too far behind and should be closed.
func (et *endpointTranslator) enqueueUpdate(update *pb.Update) {
select {
case et.messages <- update:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
et.overflowCounter.Inc()
et.log.Warn("Endpoint update queue overflowed; closing stream")
et.resetStream()
}
}

Expand Down
Loading