Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions accept_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package channel

import (
"github.com/michaelquigley/pfxlog"
"time"

"github.com/michaelquigley/pfxlog"
)

// An UnderlayAcceptor take an Underlay and generally turns it into a channel for a specific use.
Expand Down Expand Up @@ -72,12 +73,14 @@ func (self *UnderlayDispatcher) Run() {
if !found {
acceptor = self.defaultAcceptor
} else {
acceptor = self.acceptors[string(chanType)]
if acceptor, found = self.acceptors[string(chanType)]; !found {
acceptor = self.defaultAcceptor
}
}

closeUnderlay := false
if acceptor == nil {
log.Warn("incoming request didn't have type header, and no default acceptor defined. closing connection")
log.Warn("incoming request didn't have a recognized type header, and no default acceptor defined. closing connection")
closeUnderlay = true
} else if err = acceptor.AcceptUnderlay(underlay); err != nil {
log.WithError(err).Error("error handling incoming connection, closing connection")
Expand Down
6 changes: 5 additions & 1 deletion handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ func (self CloseHandlerF) HandleClose(ch Channel) {
type MessageSourceF func(notifer *CloseNotifier) (Sendable, error)

type UnderlayHandler interface {
// Start is called after the MultiChannel has been created with the first underlay
// ChannelCreated is called after the MultiChannel has been created but before binding happens.
// This allows the underlay handler to set the channel, if desired, before binding happens
ChannelCreated(channel MultiChannel)

// Start is called after the MultiChannel has been created with the first underlay and binding is complete.
// If this is a dial side, Start may be used to add additional underlays
Start(channel MultiChannel)

Expand Down
16 changes: 9 additions & 7 deletions impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,23 @@ import (
"container/heap"
"crypto/x509"
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/foundation/v2/sequence"
"github.com/pkg/errors"
"io"
"net"
"sync"
"sync/atomic"
"time"

"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/foundation/v2/sequence"
"github.com/pkg/errors"
)

const (
flagClosed = 0
flagRxStarted = 1
flagClosed = 0
flagRxStarted = 1
flagInjectUnderlayType = 2
)

var connectionSeq = sequence.NewSequence()
Expand Down
6 changes: 4 additions & 2 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"context"
"encoding/binary"
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/pkg/errors"
"io"
"time"

"github.com/michaelquigley/pfxlog"
"github.com/pkg/errors"
)

/**
Expand All @@ -50,6 +51,7 @@ const (
IsGroupedHeader = 9
GroupSecretHeader = 10
IsFirstGroupConnection = 11
UnderlayTypeHeader = 12

// Headers in the range 128-255 inclusive will be reflected when creating replies
ReflectedHeaderBitMask = 1 << 7
Expand Down
24 changes: 18 additions & 6 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type MultiChannelConfig struct {
UnderlayHandler UnderlayHandler
BindHandler BindHandler
Underlay Underlay

InjectUnderlayTypeIntoMessages bool
}

type senderContextImpl struct {
Expand Down Expand Up @@ -110,16 +112,20 @@ func NewMultiChannel(config *MultiChannelConfig) (MultiChannel, error) {
underlayHandler: config.UnderlayHandler,
}

impl.flags.Set(flagInjectUnderlayType, config.InjectUnderlayTypeIntoMessages)

impl.ownerId = config.Underlay.Id()
impl.certs.Store(config.Underlay.Certificates())
impl.headers.Store(config.Underlay.Headers())
impl.underlays.Append(config.Underlay)

if groupSecret := config.Underlay.Headers()[GroupSecretHeader]; len(groupSecret) == 0 {
groupSecret := config.Underlay.Headers()[GroupSecretHeader]
if len(groupSecret) == 0 {
return nil, errors.New("no group secret header found for multi channel")
} else {
impl.groupSecret = groupSecret
}
impl.groupSecret = groupSecret

config.UnderlayHandler.ChannelCreated(impl)

if err := bind(config.BindHandler, impl); err != nil {
for _, u := range impl.underlays.Value() {
Expand Down Expand Up @@ -495,6 +501,9 @@ func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) {
log.Debug("started")
defer log.Debug("exited")

underlayType := GetUnderlayType(underlay)
injectType := self.flags.IsSet(flagInjectUnderlayType)

for {
m, err := underlay.Rx()
if err != nil {
Expand All @@ -508,6 +517,9 @@ func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) {
return
}

if injectType {
m.Headers.PutStringHeader(UnderlayTypeHeader, underlayType)
}
self.Rx(m)
}
}
Expand Down Expand Up @@ -592,7 +604,7 @@ func (self *UnderlayConstraints) countsShowValidState(ch MultiChannel, counts ma

func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayFactory) {
log := pfxlog.Logger().WithField("conn", ch.Label())
log.Info("starting constraint check")
log.Debug("starting constraint check")

if ch.IsClosed() {
return
Expand All @@ -619,7 +631,7 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF
log.WithField("underlayType", underlayType).
WithField("numDesired", constraint.numDesired).
WithField("current", counts[underlayType]).
Info("checking constraint")
Debug("checking constraint")
if constraint.numDesired > counts[underlayType] {
dialElapsed := time.Since(self.lastDial.Load())

Expand All @@ -636,7 +648,7 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF
}

if allSatisfied {
pfxlog.Logger().WithField("conn", ch.Label()).Info("constraints satisfied")
pfxlog.Logger().WithField("conn", ch.Label()).Debug("constraints satisfied")
return
}
}
Expand Down
27 changes: 14 additions & 13 deletions multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@ package channel
import (
"errors"
"fmt"
"io"
"math/rand"
"sync/atomic"
"testing"
"time"

"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/goroutines"
"github.com/openziti/identity"
"github.com/openziti/transport/v2"
"github.com/openziti/transport/v2/tcp"
"github.com/stretchr/testify/require"
"io"
"math/rand"
"sync/atomic"
"testing"
"time"
)

func Test_MultiUnderlayChannels(t *testing.T) {
Expand Down Expand Up @@ -111,7 +112,7 @@ func Test_MultiUnderlayChannels(t *testing.T) {
wrapper := &TypeLoggingUnderlay{
wrapped: underlay,
}
underlayHandler := NewListenerPriorityChannel(wrapper)
underlayHandler := NewListenerPriorityChannel()
return newMultiChannel("listener", underlayHandler, wrapper, closeCallback)
}, func(underlay Underlay) error {
return errors.New("this implementation only accepts grouped channel")
Expand Down Expand Up @@ -185,7 +186,7 @@ func Test_MultiUnderlayChannels(t *testing.T) {
req.NoError(asyncErr, "no async errors should have occurred")
}

func newPriorityChannelBase(underlay Underlay) *priorityChannelBase {
func newPriorityChannelBase() *priorityChannelBase {
senderContext := NewSenderContext()

defaultMsgChan := make(chan Sendable, 4)
Expand All @@ -194,7 +195,6 @@ func newPriorityChannelBase(underlay Underlay) *priorityChannelBase {

result := &priorityChannelBase{
SenderContext: senderContext,
id: underlay.ConnectionId(),
prioritySender: NewSingleChSender(senderContext, priorityMsgChan),
defaultSender: NewSingleChSender(senderContext, defaultMsgChan),
priorityMsgChan: priorityMsgChan,
Expand All @@ -205,7 +205,6 @@ func newPriorityChannelBase(underlay Underlay) *priorityChannelBase {
}

type priorityChannelBase struct {
id string
SenderContext
prioritySender Sender
defaultSender Sender
Expand All @@ -215,6 +214,8 @@ type priorityChannelBase struct {
retryMsgChan chan Sendable
}

func (self *priorityChannelBase) ChannelCreated(MultiChannel) {}

func (self *priorityChannelBase) GetDefaultSender() Sender {
return self.defaultSender
}
Expand Down Expand Up @@ -289,9 +290,9 @@ func (self *priorityChannelBase) CloseRandom(ch MultiChannel) {
}
}

func NewDialPriorityChannel(dialer *classicDialer, underlay Underlay) PriorityChannel {
func NewDialPriorityChannel(dialer *classicDialer, _ Underlay) PriorityChannel {
result := &dialPriorityChannel{
priorityChannelBase: *newPriorityChannelBase(underlay),
priorityChannelBase: *newPriorityChannelBase(),
dialer: dialer,
}

Expand Down Expand Up @@ -351,9 +352,9 @@ func (self *dialPriorityChannel) CreateGroupedUnderlay(groupId string, groupSecr
}, nil
}

func NewListenerPriorityChannel(underlay Underlay) PriorityChannel {
func NewListenerPriorityChannel() PriorityChannel {
result := &listenerPriorityChannel{
priorityChannelBase: *newPriorityChannelBase(underlay),
priorityChannelBase: *newPriorityChannelBase(),
}

result.constraints.AddConstraint("default", 2, 1)
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
4.2
4.3