Skip to content

Commit 1a78e87

Browse files
authored
Merge pull request #223 from openziti/allow-receiving-underlay-type
Allow injecting the underlay type into messages. Fixes #222
2 parents 7351179 + 46b7ca2 commit 1a78e87

File tree

7 files changed

+57
-33
lines changed

7 files changed

+57
-33
lines changed

accept_dispatcher.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
package channel
1818

1919
import (
20-
"github.com/michaelquigley/pfxlog"
2120
"time"
21+
22+
"github.com/michaelquigley/pfxlog"
2223
)
2324

2425
// An UnderlayAcceptor take an Underlay and generally turns it into a channel for a specific use.
@@ -72,12 +73,14 @@ func (self *UnderlayDispatcher) Run() {
7273
if !found {
7374
acceptor = self.defaultAcceptor
7475
} else {
75-
acceptor = self.acceptors[string(chanType)]
76+
if acceptor, found = self.acceptors[string(chanType)]; !found {
77+
acceptor = self.defaultAcceptor
78+
}
7679
}
7780

7881
closeUnderlay := false
7982
if acceptor == nil {
80-
log.Warn("incoming request didn't have type header, and no default acceptor defined. closing connection")
83+
log.Warn("incoming request didn't have a recognized type header, and no default acceptor defined. closing connection")
8184
closeUnderlay = true
8285
} else if err = acceptor.AcceptUnderlay(underlay); err != nil {
8386
log.WithError(err).Error("error handling incoming connection, closing connection")

handler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,11 @@ func (self CloseHandlerF) HandleClose(ch Channel) {
135135
type MessageSourceF func(notifer *CloseNotifier) (Sendable, error)
136136

137137
type UnderlayHandler interface {
138-
// Start is called after the MultiChannel has been created with the first underlay
138+
// ChannelCreated is called after the MultiChannel has been created but before binding happens.
139+
// This allows the underlay handler to set the channel, if desired, before binding happens
140+
ChannelCreated(channel MultiChannel)
141+
142+
// Start is called after the MultiChannel has been created with the first underlay and binding is complete.
139143
// If this is a dial side, Start may be used to add additional underlays
140144
Start(channel MultiChannel)
141145

impl.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,23 @@ import (
2020
"container/heap"
2121
"crypto/x509"
2222
"fmt"
23-
"github.com/michaelquigley/pfxlog"
24-
"github.com/openziti/foundation/v2/concurrenz"
25-
"github.com/openziti/foundation/v2/info"
26-
"github.com/openziti/foundation/v2/sequence"
27-
"github.com/pkg/errors"
2823
"io"
2924
"net"
3025
"sync"
3126
"sync/atomic"
3227
"time"
28+
29+
"github.com/michaelquigley/pfxlog"
30+
"github.com/openziti/foundation/v2/concurrenz"
31+
"github.com/openziti/foundation/v2/info"
32+
"github.com/openziti/foundation/v2/sequence"
33+
"github.com/pkg/errors"
3334
)
3435

3536
const (
36-
flagClosed = 0
37-
flagRxStarted = 1
37+
flagClosed = 0
38+
flagRxStarted = 1
39+
flagInjectUnderlayType = 2
3840
)
3941

4042
var connectionSeq = sequence.NewSequence()

message.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@ import (
2121
"context"
2222
"encoding/binary"
2323
"fmt"
24-
"github.com/michaelquigley/pfxlog"
25-
"github.com/pkg/errors"
2624
"io"
2725
"time"
26+
27+
"github.com/michaelquigley/pfxlog"
28+
"github.com/pkg/errors"
2829
)
2930

3031
/**
@@ -50,6 +51,7 @@ const (
5051
IsGroupedHeader = 9
5152
GroupSecretHeader = 10
5253
IsFirstGroupConnection = 11
54+
UnderlayTypeHeader = 12
5355

5456
// Headers in the range 128-255 inclusive will be reflected when creating replies
5557
ReflectedHeaderBitMask = 1 << 7

multi.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ type MultiChannelConfig struct {
4040
UnderlayHandler UnderlayHandler
4141
BindHandler BindHandler
4242
Underlay Underlay
43+
44+
InjectUnderlayTypeIntoMessages bool
4345
}
4446

4547
type senderContextImpl struct {
@@ -110,16 +112,20 @@ func NewMultiChannel(config *MultiChannelConfig) (MultiChannel, error) {
110112
underlayHandler: config.UnderlayHandler,
111113
}
112114

115+
impl.flags.Set(flagInjectUnderlayType, config.InjectUnderlayTypeIntoMessages)
116+
113117
impl.ownerId = config.Underlay.Id()
114118
impl.certs.Store(config.Underlay.Certificates())
115119
impl.headers.Store(config.Underlay.Headers())
116120
impl.underlays.Append(config.Underlay)
117121

118-
if groupSecret := config.Underlay.Headers()[GroupSecretHeader]; len(groupSecret) == 0 {
122+
groupSecret := config.Underlay.Headers()[GroupSecretHeader]
123+
if len(groupSecret) == 0 {
119124
return nil, errors.New("no group secret header found for multi channel")
120-
} else {
121-
impl.groupSecret = groupSecret
122125
}
126+
impl.groupSecret = groupSecret
127+
128+
config.UnderlayHandler.ChannelCreated(impl)
123129

124130
if err := bind(config.BindHandler, impl); err != nil {
125131
for _, u := range impl.underlays.Value() {
@@ -495,6 +501,9 @@ func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) {
495501
log.Debug("started")
496502
defer log.Debug("exited")
497503

504+
underlayType := GetUnderlayType(underlay)
505+
injectType := self.flags.IsSet(flagInjectUnderlayType)
506+
498507
for {
499508
m, err := underlay.Rx()
500509
if err != nil {
@@ -508,6 +517,9 @@ func (self *multiChannelImpl) Rxer(underlay Underlay, notifier *CloseNotifier) {
508517
return
509518
}
510519

520+
if injectType {
521+
m.Headers.PutStringHeader(UnderlayTypeHeader, underlayType)
522+
}
511523
self.Rx(m)
512524
}
513525
}
@@ -592,7 +604,7 @@ func (self *UnderlayConstraints) countsShowValidState(ch MultiChannel, counts ma
592604

593605
func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayFactory) {
594606
log := pfxlog.Logger().WithField("conn", ch.Label())
595-
log.Info("starting constraint check")
607+
log.Debug("starting constraint check")
596608

597609
if ch.IsClosed() {
598610
return
@@ -619,7 +631,7 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF
619631
log.WithField("underlayType", underlayType).
620632
WithField("numDesired", constraint.numDesired).
621633
WithField("current", counts[underlayType]).
622-
Info("checking constraint")
634+
Debug("checking constraint")
623635
if constraint.numDesired > counts[underlayType] {
624636
dialElapsed := time.Since(self.lastDial.Load())
625637

@@ -636,7 +648,7 @@ func (self *UnderlayConstraints) Apply(ch MultiChannel, factory GroupedUnderlayF
636648
}
637649

638650
if allSatisfied {
639-
pfxlog.Logger().WithField("conn", ch.Label()).Info("constraints satisfied")
651+
pfxlog.Logger().WithField("conn", ch.Label()).Debug("constraints satisfied")
640652
return
641653
}
642654
}

multi_test.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,18 @@ package channel
1919
import (
2020
"errors"
2121
"fmt"
22+
"io"
23+
"math/rand"
24+
"sync/atomic"
25+
"testing"
26+
"time"
27+
2228
"github.com/michaelquigley/pfxlog"
2329
"github.com/openziti/foundation/v2/goroutines"
2430
"github.com/openziti/identity"
2531
"github.com/openziti/transport/v2"
2632
"github.com/openziti/transport/v2/tcp"
2733
"github.com/stretchr/testify/require"
28-
"io"
29-
"math/rand"
30-
"sync/atomic"
31-
"testing"
32-
"time"
3334
)
3435

3536
func Test_MultiUnderlayChannels(t *testing.T) {
@@ -111,7 +112,7 @@ func Test_MultiUnderlayChannels(t *testing.T) {
111112
wrapper := &TypeLoggingUnderlay{
112113
wrapped: underlay,
113114
}
114-
underlayHandler := NewListenerPriorityChannel(wrapper)
115+
underlayHandler := NewListenerPriorityChannel()
115116
return newMultiChannel("listener", underlayHandler, wrapper, closeCallback)
116117
}, func(underlay Underlay) error {
117118
return errors.New("this implementation only accepts grouped channel")
@@ -185,7 +186,7 @@ func Test_MultiUnderlayChannels(t *testing.T) {
185186
req.NoError(asyncErr, "no async errors should have occurred")
186187
}
187188

188-
func newPriorityChannelBase(underlay Underlay) *priorityChannelBase {
189+
func newPriorityChannelBase() *priorityChannelBase {
189190
senderContext := NewSenderContext()
190191

191192
defaultMsgChan := make(chan Sendable, 4)
@@ -194,7 +195,6 @@ func newPriorityChannelBase(underlay Underlay) *priorityChannelBase {
194195

195196
result := &priorityChannelBase{
196197
SenderContext: senderContext,
197-
id: underlay.ConnectionId(),
198198
prioritySender: NewSingleChSender(senderContext, priorityMsgChan),
199199
defaultSender: NewSingleChSender(senderContext, defaultMsgChan),
200200
priorityMsgChan: priorityMsgChan,
@@ -205,7 +205,6 @@ func newPriorityChannelBase(underlay Underlay) *priorityChannelBase {
205205
}
206206

207207
type priorityChannelBase struct {
208-
id string
209208
SenderContext
210209
prioritySender Sender
211210
defaultSender Sender
@@ -215,6 +214,8 @@ type priorityChannelBase struct {
215214
retryMsgChan chan Sendable
216215
}
217216

217+
func (self *priorityChannelBase) ChannelCreated(MultiChannel) {}
218+
218219
func (self *priorityChannelBase) GetDefaultSender() Sender {
219220
return self.defaultSender
220221
}
@@ -289,9 +290,9 @@ func (self *priorityChannelBase) CloseRandom(ch MultiChannel) {
289290
}
290291
}
291292

292-
func NewDialPriorityChannel(dialer *classicDialer, underlay Underlay) PriorityChannel {
293+
func NewDialPriorityChannel(dialer *classicDialer, _ Underlay) PriorityChannel {
293294
result := &dialPriorityChannel{
294-
priorityChannelBase: *newPriorityChannelBase(underlay),
295+
priorityChannelBase: *newPriorityChannelBase(),
295296
dialer: dialer,
296297
}
297298

@@ -351,9 +352,9 @@ func (self *dialPriorityChannel) CreateGroupedUnderlay(groupId string, groupSecr
351352
}, nil
352353
}
353354

354-
func NewListenerPriorityChannel(underlay Underlay) PriorityChannel {
355+
func NewListenerPriorityChannel() PriorityChannel {
355356
result := &listenerPriorityChannel{
356-
priorityChannelBase: *newPriorityChannelBase(underlay),
357+
priorityChannelBase: *newPriorityChannelBase(),
357358
}
358359

359360
result.constraints.AddConstraint("default", 2, 1)

version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4.2
1+
4.3

0 commit comments

Comments
 (0)