Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion xgress/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ package xgress
import (
"encoding/binary"
"fmt"
"math"

"github.com/openziti/channel/v4"
"github.com/openziti/foundation/v2/info"
"github.com/openziti/foundation/v2/uuidz"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"math"
)

const (
Expand Down Expand Up @@ -82,6 +83,10 @@ const (
PayloadFlagWriteFailed Flag = 64
)

const (
HeaderKeyCapabilities uint8 = 1
)

func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
return &Acknowledgement{
CircuitId: circuitId,
Expand Down
87 changes: 83 additions & 4 deletions xgress/xgress.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ const (
endOfCircuitSentFlag = 3
closedTxer = 4
rxPushModeFlag = 5 // false == pull, use rx(), 1 == push, use WriteAdapter

// bits 8-16 reserved for peer capabilities (same values used in wire format)
CapabilityEOFIndex = 8
CapabilityEOFMask = 1 << CapabilityEOFIndex
capabilitiesMask = 0x1FF00 // bits 8-16
)

var ErrWriteClosed = errors.New("write closed")
Expand Down Expand Up @@ -224,11 +229,38 @@ func (self *Xgress) markCircuitEndReceived() {
self.flags.Set(endOfCircuitRecvdFlag, true)
}

func (self *Xgress) capabilities() uint32 {
return CapabilityEOFMask
}

func (self *Xgress) capabilitiesHeader() []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, self.capabilities())
return buf
}

func (self *Xgress) setPeerCapabilities(caps []byte) {
if len(caps) >= 4 {
v := binary.BigEndian.Uint32(caps) & capabilitiesMask
for {
current := self.flags.Load()
next := current | v
if self.flags.CompareAndSetAll(current, next) {
return
}
}
}
}

func (self *Xgress) peerSupportsEOF() bool {
return self.flags.IsSet(CapabilityEOFIndex)
}

func (self *Xgress) IsCircuitStarted() bool {
return !self.IsTerminator() || self.flags.IsSet(rxerStartedFlag)
}

func (self *Xgress) firstCircuitStartReceived() bool {
func (self *Xgress) isRxStarted() bool {
return self.flags.CompareAndSet(rxerStartedFlag, false, true)
}

Expand All @@ -244,7 +276,7 @@ func (self *Xgress) Start() {
if self.Options.CircuitStartTimeout > time.Second {
time.AfterFunc(self.Options.CircuitStartTimeout, self.terminateIfNotStarted)
}
} else {
} else if self.isRxStarted() {
log.Debug("initiator: sending circuit start")
go self.payloadBuffer.run()
_ = self.forwardPayload(self.GetStartCircuit(), context.Background())
Expand Down Expand Up @@ -273,6 +305,9 @@ func (self *Xgress) GetStartCircuit() *Payload {
Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitStart), self.originator),
Sequence: int32(self.nextReceiveSequence()),
Data: nil,
Headers: map[uint8][]byte{
HeaderKeyCapabilities: self.capabilitiesHeader(),
},
}
return startCircuit
}
Expand Down Expand Up @@ -423,9 +458,21 @@ func (self *Xgress) HandleControlReceive(controlType ControlType, headers channe
}

func (self *Xgress) acceptPayload(payload *Payload) {
if payload.IsCircuitStartFlagSet() && self.firstCircuitStartReceived() {
if payload.IsCircuitStartFlagSet() && self.isRxStarted() {
pfxlog.ContextLogger(self.Label()).Debug("start received")

var peerSentCapabilities bool
if caps, ok := payload.Headers[HeaderKeyCapabilities]; ok {
self.setPeerCapabilities(caps)
peerSentCapabilities = true
}

go self.payloadBuffer.run()

if peerSentCapabilities {
self.sendCapabilitiesResponse()
}

if !self.flags.IsSet(rxPushModeFlag) {
go self.rx()
}
Expand Down Expand Up @@ -473,6 +520,16 @@ func (self *Xgress) tx() {
return false
}

// Intercept capabilities header from peer
if caps, ok := payload.Headers[HeaderKeyCapabilities]; ok {
self.setPeerCapabilities(caps)
delete(payload.Headers, HeaderKeyCapabilities)
payloadLogger.Debug("peer capabilities received")
if len(payload.Data) == 0 && !payload.IsCircuitStartFlagSet() {
return true // capabilities-only payload, consume it
}
}

payloadLogger.Debug("sending")

for _, peekHandler := range self.peekHandlers {
Expand Down Expand Up @@ -566,6 +623,19 @@ func (self *Xgress) tx() {
}
}

func (self *Xgress) sendCapabilitiesResponse() {
payload := &Payload{
CircuitId: self.circuitId,
Flags: SetOriginatorFlag(0, self.originator),
Sequence: int32(self.nextReceiveSequence()),
Data: nil,
Headers: map[uint8][]byte{
HeaderKeyCapabilities: self.capabilitiesHeader(),
},
}
_ = self.forwardPayload(payload, context.Background())
}

func (self *Xgress) sendEOF() {
log := pfxlog.ContextLogger(self.Label())
log.Debug("sendEOF")
Expand All @@ -575,9 +645,18 @@ func (self *Xgress) sendEOF() {
return
}

var flag Flag
if self.peerSupportsEOF() {
flag = PayloadFlagEOF
log.Debug("peer supports EOF, sending EOF flag")
} else {
flag = PayloadFlagCircuitEnd
log.Debug("peer does not support EOF, falling back to CircuitEnd")
}

payload := &Payload{
CircuitId: self.circuitId,
Flags: SetOriginatorFlag(uint32(PayloadFlagEOF), self.originator),
Flags: SetOriginatorFlag(uint32(flag), self.originator),
Sequence: int32(self.nextReceiveSequence()),
Data: nil,
}
Expand Down
2 changes: 1 addition & 1 deletion ziti/edge/network/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (e MultipleErrors) Error() string {
buf := strings.Builder{}
buf.WriteString("multiple errors occurred")
for idx, err := range e {
buf.WriteString(fmt.Sprintf(" %v: %v", idx, err))
fmt.Fprintf(&buf, " %v: %v", idx, err)
}
return buf.String()
}
2 changes: 1 addition & 1 deletion ziti/sdkinfo/build_info.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading