From 5268223fb1a9abf9be5873a274548be0ad6493f8 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Tue, 3 Feb 2026 11:57:28 -0500 Subject: [PATCH 1/4] Allow injecting the underlay type into messages. Fixes #222 --- impl.go | 16 +++++++++------- message.go | 6 ++++-- multi.go | 10 ++++++++++ 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/impl.go b/impl.go index fbf6dac..50e4119 100644 --- a/impl.go +++ b/impl.go @@ -20,21 +20,23 @@ import ( "container/heap" "crypto/x509" "fmt" - "github.com/michaelquigley/pfxlog" - "github.com/openziti/foundation/v2/concurrenz" - "github.com/openziti/foundation/v2/info" - "github.com/openziti/foundation/v2/sequence" - "github.com/pkg/errors" "io" "net" "sync" "sync/atomic" "time" + + "github.com/michaelquigley/pfxlog" + "github.com/openziti/foundation/v2/concurrenz" + "github.com/openziti/foundation/v2/info" + "github.com/openziti/foundation/v2/sequence" + "github.com/pkg/errors" ) const ( - flagClosed = 0 - flagRxStarted = 1 + flagClosed = 0 + flagRxStarted = 1 + flagInjectUnderlayType = 2 ) var connectionSeq = sequence.NewSequence() diff --git a/message.go b/message.go index ccffafd..71d8105 100644 --- a/message.go +++ b/message.go @@ -21,10 +21,11 @@ import ( "context" "encoding/binary" "fmt" - "github.com/michaelquigley/pfxlog" - "github.com/pkg/errors" "io" "time" + + "github.com/michaelquigley/pfxlog" + "github.com/pkg/errors" ) /** @@ -50,6 +51,7 @@ const ( IsGroupedHeader = 9 GroupSecretHeader = 10 IsFirstGroupConnection = 11 + UnderlayTypeHeader = 12 // Headers in the range 128-255 inclusive will be reflected when creating replies ReflectedHeaderBitMask = 1 << 7 diff --git a/multi.go b/multi.go index d622713..e72fd75 100644 --- a/multi.go +++ b/multi.go @@ -40,6 +40,8 @@ type MultiChannelConfig struct { UnderlayHandler UnderlayHandler BindHandler BindHandler Underlay Underlay + + InjectUnderlayTypeIntoMessages bool } type senderContextImpl struct { @@ -110,6 +112,8 @@ func NewMultiChannel(config *MultiChannelConfig) (MultiChannel, error) { underlayHandler: config.UnderlayHandler, } + impl.flags.Set(flagInjectUnderlayType, config.InjectUnderlayTypeIntoMessages) + impl.ownerId = config.Underlay.Id() impl.certs.Store(config.Underlay.Certificates()) impl.headers.Store(config.Underlay.Headers()) @@ -495,6 +499,9 @@ func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) { log.Debug("started") defer log.Debug("exited") + underlayType := GetUnderlayType(underlay) + injectType := self.flags.IsSet(flagInjectUnderlayType) + for { m, err := underlay.Rx() if err != nil { @@ -508,6 +515,9 @@ func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) { return } + if injectType { + m.Headers.PutStringHeader(UnderlayTypeHeader, underlayType) + } self.Rx(m) } } From 199e022456dd44d5f0c30d1e74c28d4ee3c23626 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Thu, 5 Feb 2026 10:07:23 -0500 Subject: [PATCH 2/4] reduce the underlay constraints logging --- multi.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/multi.go b/multi.go index e72fd75..397b843 100644 --- a/multi.go +++ b/multi.go @@ -602,7 +602,7 @@ func (self *UnderlayConstraints) countsShowValidState(ch MultiChannel, counts ma func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayFactory) { log := pfxlog.Logger().WithField("conn", ch.Label()) - log.Info("starting constraint check") + log.Debug("starting constraint check") if ch.IsClosed() { return @@ -629,7 +629,7 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF log.WithField("underlayType", underlayType). WithField("numDesired", constraint.numDesired). WithField("current", counts[underlayType]). - Info("checking constraint") + Debug("checking constraint") if constraint.numDesired > counts[underlayType] { dialElapsed := time.Since(self.lastDial.Load()) @@ -646,7 +646,7 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF } if allSatisfied { - pfxlog.Logger().WithField("conn", ch.Label()).Info("constraints satisfied") + pfxlog.Logger().WithField("conn", ch.Label()).Debug("constraints satisfied") return } } From 6693ebd70ae342f38e9723ad92e9905a13284cf4 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Thu, 5 Feb 2026 21:25:17 -0500 Subject: [PATCH 3/4] Update the underlay dispatcher to allow unknown underlay types to fall through to the default. Fixes #224 --- accept_dispatcher.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/accept_dispatcher.go b/accept_dispatcher.go index 42c532d..68e87ff 100644 --- a/accept_dispatcher.go +++ b/accept_dispatcher.go @@ -17,8 +17,9 @@ package channel import ( - "github.com/michaelquigley/pfxlog" "time" + + "github.com/michaelquigley/pfxlog" ) // An UnderlayAcceptor take an Underlay and generally turns it into a channel for a specific use. @@ -72,12 +73,14 @@ func (self *UnderlayDispatcher) Run() { if !found { acceptor = self.defaultAcceptor } else { - acceptor = self.acceptors[string(chanType)] + if acceptor, found = self.acceptors[string(chanType)]; !found { + acceptor = self.defaultAcceptor + } } closeUnderlay := false if acceptor == nil { - log.Warn("incoming request didn't have type header, and no default acceptor defined. closing connection") + log.Warn("incoming request didn't have a recognized type header, and no default acceptor defined. closing connection") closeUnderlay = true } else if err = acceptor.AcceptUnderlay(underlay); err != nil { log.WithError(err).Error("error handling incoming connection, closing connection") From 46b7ca20091806040fd14678270c110592244c08 Mon Sep 17 00:00:00 2001 From: Paul Lorenz Date: Thu, 5 Feb 2026 21:25:40 -0500 Subject: [PATCH 4/4] Add ChannelCreated to the UnderlayHandler API to allow handlers to be initialized with the channel before binding. Fixes #225 --- handler.go | 6 +++++- multi.go | 8 +++++--- multi_test.go | 27 ++++++++++++++------------- version | 2 +- 4 files changed, 25 insertions(+), 18 deletions(-) diff --git a/handler.go b/handler.go index 6de62f6..f2d5649 100644 --- a/handler.go +++ b/handler.go @@ -135,7 +135,11 @@ func (self CloseHandlerF) HandleClose(ch Channel) { type MessageSourceF func(notifer *CloseNotifier) (Sendable, error) type UnderlayHandler interface { - // Start is called after the MultiChannel has been created with the first underlay + // ChannelCreated is called after the MultiChannel has been created but before binding happens. + // This allows the underlay handler to set the channel, if desired, before binding happens + ChannelCreated(channel MultiChannel) + + // Start is called after the MultiChannel has been created with the first underlay and binding is complete. // If this is a dial side, Start may be used to add additional underlays Start(channel MultiChannel) diff --git a/multi.go b/multi.go index 397b843..a1384cd 100644 --- a/multi.go +++ b/multi.go @@ -119,11 +119,13 @@ func NewMultiChannel(config *MultiChannelConfig) (MultiChannel, error) { impl.headers.Store(config.Underlay.Headers()) impl.underlays.Append(config.Underlay) - if groupSecret := config.Underlay.Headers()[GroupSecretHeader]; len(groupSecret) == 0 { + groupSecret := config.Underlay.Headers()[GroupSecretHeader] + if len(groupSecret) == 0 { return nil, errors.New("no group secret header found for multi channel") - } else { - impl.groupSecret = groupSecret } + impl.groupSecret = groupSecret + + config.UnderlayHandler.ChannelCreated(impl) if err := bind(config.BindHandler, impl); err != nil { for _, u := range impl.underlays.Value() { diff --git a/multi_test.go b/multi_test.go index b436172..2997890 100644 --- a/multi_test.go +++ b/multi_test.go @@ -19,17 +19,18 @@ package channel import ( "errors" "fmt" + "io" + "math/rand" + "sync/atomic" + "testing" + "time" + "github.com/michaelquigley/pfxlog" "github.com/openziti/foundation/v2/goroutines" "github.com/openziti/identity" "github.com/openziti/transport/v2" "github.com/openziti/transport/v2/tcp" "github.com/stretchr/testify/require" - "io" - "math/rand" - "sync/atomic" - "testing" - "time" ) func Test_MultiUnderlayChannels(t *testing.T) { @@ -111,7 +112,7 @@ func Test_MultiUnderlayChannels(t *testing.T) { wrapper := &TypeLoggingUnderlay{ wrapped: underlay, } - underlayHandler := NewListenerPriorityChannel(wrapper) + underlayHandler := NewListenerPriorityChannel() return newMultiChannel("listener", underlayHandler, wrapper, closeCallback) }, func(underlay Underlay) error { return errors.New("this implementation only accepts grouped channel") @@ -185,7 +186,7 @@ func Test_MultiUnderlayChannels(t *testing.T) { req.NoError(asyncErr, "no async errors should have occurred") } -func newPriorityChannelBase(underlay Underlay) *priorityChannelBase { +func newPriorityChannelBase() *priorityChannelBase { senderContext := NewSenderContext() defaultMsgChan := make(chan Sendable, 4) @@ -194,7 +195,6 @@ func newPriorityChannelBase(underlay Underlay) *priorityChannelBase { result := &priorityChannelBase{ SenderContext: senderContext, - id: underlay.ConnectionId(), prioritySender: NewSingleChSender(senderContext, priorityMsgChan), defaultSender: NewSingleChSender(senderContext, defaultMsgChan), priorityMsgChan: priorityMsgChan, @@ -205,7 +205,6 @@ func newPriorityChannelBase(underlay Underlay) *priorityChannelBase { } type priorityChannelBase struct { - id string SenderContext prioritySender Sender defaultSender Sender @@ -215,6 +214,8 @@ type priorityChannelBase struct { retryMsgChan chan Sendable } +func (self *priorityChannelBase) ChannelCreated(MultiChannel) {} + func (self *priorityChannelBase) GetDefaultSender() Sender { return self.defaultSender } @@ -289,9 +290,9 @@ func (self *priorityChannelBase) CloseRandom(ch MultiChannel) { } } -func NewDialPriorityChannel(dialer *classicDialer, underlay Underlay) PriorityChannel { +func NewDialPriorityChannel(dialer *classicDialer, _ Underlay) PriorityChannel { result := &dialPriorityChannel{ - priorityChannelBase: *newPriorityChannelBase(underlay), + priorityChannelBase: *newPriorityChannelBase(), dialer: dialer, } @@ -351,9 +352,9 @@ func (self *dialPriorityChannel) CreateGroupedUnderlay(groupId string, groupSecr }, nil } -func NewListenerPriorityChannel(underlay Underlay) PriorityChannel { +func NewListenerPriorityChannel() PriorityChannel { result := &listenerPriorityChannel{ - priorityChannelBase: *newPriorityChannelBase(underlay), + priorityChannelBase: *newPriorityChannelBase(), } result.constraints.AddConstraint("default", 2, 1) diff --git a/version b/version index bf77d54..69df05f 100644 --- a/version +++ b/version @@ -1 +1 @@ -4.2 +4.3