Skip to content

Commit f580cf9

Browse files
committed
Support multi-underlay channels for edge connections. Fixes #701
1 parent f174869 commit f580cf9

File tree

13 files changed

+304
-51
lines changed

13 files changed

+304
-51
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/michaelquigley/pfxlog v0.6.10
1818
github.com/mitchellh/go-ps v1.0.0
1919
github.com/mitchellh/mapstructure v1.5.0
20-
github.com/openziti/channel/v3 v3.0.39
20+
github.com/openziti/channel/v4 v4.0.0-20250326223108-f50f2013e5b9
2121
github.com/openziti/edge-api v0.26.42
2222
github.com/openziti/foundation/v2 v2.0.59
2323
github.com/openziti/identity v1.0.100

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,8 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
299299
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
300300
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
301301
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
302-
github.com/openziti/channel/v3 v3.0.39 h1:UM0iY0tbz4EbOVT3tX4mfN1wSAXxkkWIrKmQ7RhE/Hg=
303-
github.com/openziti/channel/v3 v3.0.39/go.mod h1:7k3mQhtWlgX0HaQBkllDTOH5WAf7DcyyMLqJXrL+/fI=
302+
github.com/openziti/channel/v4 v4.0.0-20250326223108-f50f2013e5b9 h1:hLI/eXdRfzrcnOfkNj7/sO3FtVdadYFRbNW5otXaqpU=
303+
github.com/openziti/channel/v4 v4.0.0-20250326223108-f50f2013e5b9/go.mod h1:0bJQDf0Cw+WuTTdEqax/5PzhrbD5okbLFrtIz/7exEU=
304304
github.com/openziti/edge-api v0.26.42 h1:Wi/BUttSUvedT9XGht7vi/zI/TNGc3ApvjkAviWhauA=
305305
github.com/openziti/edge-api v0.26.42/go.mod h1:sYHVpm26Jr1u7VooNJzTb2b2nGSlmCHMnbGC8XfWSng=
306306
github.com/openziti/foundation/v2 v2.0.59 h1:PJwrcTq62x+cONBeKMlnsuphsTlOvTz8j8prYnehm8o=

ziti/edge/addr.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@ func (e *Addr) Network() string {
2929
}
3030

3131
func (e *Addr) String() string {
32-
return fmt.Sprintf("ziti-edge-router connId=%v, logical=%v", e.MsgCh.Id(), e.MsgCh.LogicalName())
32+
return fmt.Sprintf("ziti-edge-router connId=%v, logical=%v", e.MsgCh.Id(), e.MsgCh.GetChannel().LogicalName())
3333
}

ziti/edge/channel.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package edge
2+
3+
import (
4+
"github.com/michaelquigley/pfxlog"
5+
"github.com/openziti/channel/v4"
6+
"io"
7+
"sync/atomic"
8+
"time"
9+
)
10+
11+
const (
12+
ChannelTypeControl string = "edge.control"
13+
ChannelTypeDefault string = "edge.default"
14+
)
15+
16+
func NewBaseSdkChannel(underlay channel.Underlay) *BaseSdkChannel {
17+
senderContext := channel.NewSenderContext()
18+
19+
defaultMsgChan := make(chan channel.Sendable, 4)
20+
controlMsgChan := make(chan channel.Sendable, 4)
21+
retryMsgChan := make(chan channel.Sendable, 4)
22+
23+
result := &BaseSdkChannel{
24+
SenderContext: senderContext,
25+
id: underlay.ConnectionId(),
26+
defaultSender: channel.NewSingleChSender(senderContext, defaultMsgChan),
27+
controlSender: channel.NewSingleChSender(senderContext, controlMsgChan),
28+
controlMsgChan: controlMsgChan,
29+
defaultMsgChan: defaultMsgChan,
30+
retryMsgChan: retryMsgChan,
31+
}
32+
return result
33+
}
34+
35+
type BaseSdkChannel struct {
36+
id string
37+
ch channel.MultiChannel
38+
channel.SenderContext
39+
controlSender channel.Sender
40+
defaultSender channel.Sender
41+
42+
controlChannelAvailable atomic.Bool
43+
controlMsgChan chan channel.Sendable
44+
defaultMsgChan chan channel.Sendable
45+
retryMsgChan chan channel.Sendable
46+
}
47+
48+
func (self *BaseSdkChannel) InitChannel(ch channel.MultiChannel) {
49+
self.ch = ch
50+
}
51+
52+
func (self *BaseSdkChannel) GetChannel() channel.Channel {
53+
return self.ch
54+
}
55+
56+
func (self *BaseSdkChannel) GetDefaultSender() channel.Sender {
57+
return self.defaultSender
58+
}
59+
60+
func (self *BaseSdkChannel) GetControlSender() channel.Sender {
61+
return self.controlSender
62+
}
63+
64+
func (self *BaseSdkChannel) GetNextMsgDefault() (channel.Sendable, error) {
65+
if self.controlChannelAvailable.Load() {
66+
select {
67+
case msg := <-self.defaultMsgChan:
68+
return msg, nil
69+
case msg := <-self.retryMsgChan:
70+
return msg, nil
71+
case <-self.GetCloseNotify():
72+
return nil, io.EOF
73+
}
74+
} else {
75+
select {
76+
case msg := <-self.defaultMsgChan:
77+
return msg, nil
78+
case msg := <-self.controlMsgChan:
79+
return msg, nil
80+
case msg := <-self.retryMsgChan:
81+
return msg, nil
82+
case <-self.GetCloseNotify():
83+
return nil, io.EOF
84+
}
85+
}
86+
}
87+
88+
func (self *BaseSdkChannel) GetNextControlMsg() (channel.Sendable, error) {
89+
select {
90+
case msg := <-self.controlMsgChan:
91+
return msg, nil
92+
case msg := <-self.retryMsgChan:
93+
return msg, nil
94+
case <-self.GetCloseNotify():
95+
return nil, io.EOF
96+
}
97+
}
98+
99+
func (self *BaseSdkChannel) GetMessageSource(underlay channel.Underlay) channel.MessageSourceF {
100+
if channel.GetUnderlayType(underlay) == ChannelTypeControl {
101+
return self.GetNextControlMsg
102+
}
103+
return self.GetNextMsgDefault
104+
}
105+
106+
func (self *BaseSdkChannel) HandleTxFailed(_ channel.Underlay, sendable channel.Sendable) bool {
107+
select {
108+
case self.retryMsgChan <- sendable:
109+
return true
110+
case self.defaultMsgChan <- sendable:
111+
return true
112+
default:
113+
return false
114+
}
115+
}
116+
117+
func (self *BaseSdkChannel) HandleUnderlayAccepted(ch channel.MultiChannel, underlay channel.Underlay) {
118+
self.UpdateCtrlChannelAvailable(ch)
119+
pfxlog.Logger().
120+
WithField("id", ch.Label()).
121+
WithField("underlays", ch.GetUnderlayCountsByType()).
122+
WithField("underlayType", channel.GetUnderlayType(underlay)).
123+
WithField("controlAvailable", self.controlChannelAvailable.Load()).
124+
Info("underlay added")
125+
}
126+
127+
func (self *BaseSdkChannel) UpdateCtrlChannelAvailable(ch channel.MultiChannel) {
128+
self.controlChannelAvailable.Store(ch.GetUnderlayCountsByType()[ChannelTypeControl] > 0)
129+
}
130+
131+
func NewDialSdkChannel(dialer channel.DialUnderlayFactory, underlay channel.Underlay) UnderlayHandlerSdkChannel {
132+
result := &DialSdkChannel{
133+
BaseSdkChannel: *NewBaseSdkChannel(underlay),
134+
dialer: dialer,
135+
}
136+
137+
result.constraints.AddConstraint(ChannelTypeDefault, 1, 1)
138+
result.constraints.AddConstraint(ChannelTypeControl, 1, 0)
139+
140+
return result
141+
}
142+
143+
type UnderlayHandlerSdkChannel interface {
144+
SdkChannel
145+
channel.UnderlayHandler
146+
}
147+
148+
type SdkChannel interface {
149+
InitChannel(channel.MultiChannel)
150+
GetChannel() channel.Channel
151+
GetDefaultSender() channel.Sender
152+
GetControlSender() channel.Sender
153+
}
154+
155+
type DialSdkChannel struct {
156+
BaseSdkChannel
157+
dialer channel.DialUnderlayFactory
158+
constraints channel.UnderlayConstraints
159+
}
160+
161+
func (self *DialSdkChannel) Start(channel channel.MultiChannel) {
162+
self.constraints.Apply(channel, self)
163+
}
164+
165+
func (self *DialSdkChannel) HandleUnderlayClose(ch channel.MultiChannel, underlay channel.Underlay) {
166+
pfxlog.Logger().
167+
WithField("id", ch.Label()).
168+
WithField("underlays", ch.GetUnderlayCountsByType()).
169+
WithField("underlayType", channel.GetUnderlayType(underlay)).
170+
Info("underlay closed")
171+
self.UpdateCtrlChannelAvailable(ch)
172+
self.constraints.Apply(ch, self)
173+
}
174+
175+
func (self *DialSdkChannel) DialFailed(_ channel.MultiChannel, _ string, attempt int) {
176+
delay := 2 * time.Duration(attempt) * time.Second
177+
if delay > time.Minute {
178+
delay = time.Minute
179+
}
180+
time.Sleep(delay)
181+
}
182+
183+
func (self *DialSdkChannel) CreateGroupedUnderlay(groupId string, underlayType string, timeout time.Duration) (channel.Underlay, error) {
184+
return self.dialer.CreateWithHeaders(timeout, map[int32][]byte{
185+
channel.TypeHeader: []byte(underlayType),
186+
channel.ConnectionIdHeader: []byte(groupId),
187+
channel.IsGroupedHeader: {1},
188+
})
189+
}
190+
191+
func NewSingleSdkChannel(ch channel.Channel) SdkChannel {
192+
return &SingleSdkChannel{
193+
ch: ch,
194+
}
195+
}
196+
197+
type SingleSdkChannel struct {
198+
ch channel.Channel
199+
}
200+
201+
func (self *SingleSdkChannel) InitChannel(channel.MultiChannel) {
202+
}
203+
204+
func (self *SingleSdkChannel) GetChannel() channel.Channel {
205+
return self.ch
206+
}
207+
208+
func (self *SingleSdkChannel) GetDefaultSender() channel.Sender {
209+
return self.ch
210+
}
211+
212+
func (self *SingleSdkChannel) GetControlSender() channel.Sender {
213+
return self.ch
214+
}

ziti/edge/conn.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828

2929
"github.com/google/uuid"
3030
"github.com/michaelquigley/pfxlog"
31-
"github.com/openziti/channel/v3"
31+
"github.com/openziti/channel/v4"
3232
"github.com/openziti/foundation/v2/sequence"
3333
)
3434

@@ -103,7 +103,7 @@ type Conn interface {
103103
const forever = time.Hour * 24 * 365 * 100
104104

105105
type MsgChannel struct {
106-
channel.Channel
106+
SdkChannel
107107
id uint32
108108
msgIdSeq *sequence.Sequence
109109
writeDeadline time.Time
@@ -118,17 +118,17 @@ type TraceRouteResult struct {
118118
Error string
119119
}
120120

121-
func NewEdgeMsgChannel(ch channel.Channel, connId uint32) *MsgChannel {
121+
func NewEdgeMsgChannel(ch SdkChannel, connId uint32) *MsgChannel {
122122
traceEnabled := strings.EqualFold("true", os.Getenv("ZITI_TRACE_ENABLED"))
123123
if traceEnabled {
124124
pfxlog.Logger().Info("Ziti message tracing ENABLED")
125125
}
126126

127127
return &MsgChannel{
128-
Channel: ch,
129-
id: connId,
130-
msgIdSeq: sequence.NewSequence(),
131-
trace: traceEnabled,
128+
SdkChannel: ch,
129+
id: connId,
130+
msgIdSeq: sequence.NewSequence(),
131+
trace: traceEnabled,
132132
}
133133
}
134134

@@ -178,9 +178,9 @@ func (ec *MsgChannel) WriteTraced(data []byte, msgUUID []byte, hdrs map[int32][]
178178
// it is retained, and we can cause data corruption
179179
var err error
180180
if ec.writeDeadline.IsZero() {
181-
err = msg.WithTimeout(forever).SendAndWaitForWire(ec.Channel)
181+
err = msg.WithTimeout(forever).SendAndWaitForWire(ec.GetDefaultSender())
182182
} else {
183-
err = msg.WithTimeout(time.Until(ec.writeDeadline)).SendAndWaitForWire(ec.Channel)
183+
err = msg.WithTimeout(time.Until(ec.writeDeadline)).SendAndWaitForWire(ec.GetDefaultSender())
184184
}
185185

186186
if err != nil {
@@ -193,7 +193,7 @@ func (ec *MsgChannel) WriteTraced(data []byte, msgUUID []byte, hdrs map[int32][]
193193
func (ec *MsgChannel) SendState(msg *channel.Message) error {
194194
msg.PutUint32Header(SeqHeader, ec.msgIdSeq.Next())
195195
ec.TraceMsg("SendState", msg)
196-
return msg.WithTimeout(5 * time.Second).SendAndWaitForWire(ec.Channel)
196+
return msg.WithTimeout(5 * time.Second).SendAndWaitForWire(ec.GetDefaultSender())
197197
}
198198

199199
func (ec *MsgChannel) TraceMsg(source string, msg *channel.Message) {

ziti/edge/messages.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package edge
1818

1919
import (
2020
"encoding/binary"
21-
"github.com/openziti/channel/v3"
21+
"github.com/openziti/channel/v4"
2222
"github.com/openziti/foundation/v2/uuidz"
2323
"github.com/openziti/sdk-golang/pb/edge_client_pb"
2424
"github.com/pkg/errors"

ziti/edge/msg_mux.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package edge
1919
import (
2020
"fmt"
2121
"github.com/michaelquigley/pfxlog"
22-
"github.com/openziti/channel/v3"
22+
"github.com/openziti/channel/v4"
2323
"github.com/pkg/errors"
2424
"github.com/sirupsen/logrus"
2525
"math"

0 commit comments

Comments
 (0)