diff --git a/xgress/messages.go b/xgress/messages.go index dcd997d9..71e5eb2d 100644 --- a/xgress/messages.go +++ b/xgress/messages.go @@ -19,12 +19,13 @@ package xgress import ( "encoding/binary" "fmt" + "math" + "github.com/openziti/channel/v4" "github.com/openziti/foundation/v2/info" "github.com/openziti/foundation/v2/uuidz" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "math" ) const ( @@ -82,6 +83,10 @@ const ( PayloadFlagWriteFailed Flag = 64 ) +const ( + HeaderKeyCapabilities uint8 = 1 +) + func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement { return &Acknowledgement{ CircuitId: circuitId, diff --git a/xgress/xgress.go b/xgress/xgress.go index 4d673729..706d6e19 100644 --- a/xgress/xgress.go +++ b/xgress/xgress.go @@ -48,6 +48,11 @@ const ( endOfCircuitSentFlag = 3 closedTxer = 4 rxPushModeFlag = 5 // false == pull, use rx(), 1 == push, use WriteAdapter + + // bits 8-16 reserved for peer capabilities (same values used in wire format) + CapabilityEOFIndex = 8 + CapabilityEOFMask = 1 << CapabilityEOFIndex + capabilitiesMask = 0x1FF00 // bits 8-16 ) var ErrWriteClosed = errors.New("write closed") @@ -224,11 +229,38 @@ func (self *Xgress) markCircuitEndReceived() { self.flags.Set(endOfCircuitRecvdFlag, true) } +func (self *Xgress) capabilities() uint32 { + return CapabilityEOFMask +} + +func (self *Xgress) capabilitiesHeader() []byte { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, self.capabilities()) + return buf +} + +func (self *Xgress) setPeerCapabilities(caps []byte) { + if len(caps) >= 4 { + v := binary.BigEndian.Uint32(caps) & capabilitiesMask + for { + current := self.flags.Load() + next := current | v + if self.flags.CompareAndSetAll(current, next) { + return + } + } + } +} + +func (self *Xgress) peerSupportsEOF() bool { + return self.flags.IsSet(CapabilityEOFIndex) +} + func (self *Xgress) IsCircuitStarted() bool { return !self.IsTerminator() || self.flags.IsSet(rxerStartedFlag) } -func (self *Xgress) firstCircuitStartReceived() bool { +func (self *Xgress) isRxStarted() bool { return self.flags.CompareAndSet(rxerStartedFlag, false, true) } @@ -244,7 +276,7 @@ func (self *Xgress) Start() { if self.Options.CircuitStartTimeout > time.Second { time.AfterFunc(self.Options.CircuitStartTimeout, self.terminateIfNotStarted) } - } else { + } else if self.isRxStarted() { log.Debug("initiator: sending circuit start") go self.payloadBuffer.run() _ = self.forwardPayload(self.GetStartCircuit(), context.Background()) @@ -273,6 +305,9 @@ func (self *Xgress) GetStartCircuit() *Payload { Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitStart), self.originator), Sequence: int32(self.nextReceiveSequence()), Data: nil, + Headers: map[uint8][]byte{ + HeaderKeyCapabilities: self.capabilitiesHeader(), + }, } return startCircuit } @@ -423,9 +458,21 @@ func (self *Xgress) HandleControlReceive(controlType ControlType, headers channe } func (self *Xgress) acceptPayload(payload *Payload) { - if payload.IsCircuitStartFlagSet() && self.firstCircuitStartReceived() { + if payload.IsCircuitStartFlagSet() && self.isRxStarted() { pfxlog.ContextLogger(self.Label()).Debug("start received") + + var peerSentCapabilities bool + if caps, ok := payload.Headers[HeaderKeyCapabilities]; ok { + self.setPeerCapabilities(caps) + peerSentCapabilities = true + } + go self.payloadBuffer.run() + + if peerSentCapabilities { + self.sendCapabilitiesResponse() + } + if !self.flags.IsSet(rxPushModeFlag) { go self.rx() } @@ -473,6 +520,16 @@ func (self *Xgress) tx() { return false } + // Intercept capabilities header from peer + if caps, ok := payload.Headers[HeaderKeyCapabilities]; ok { + self.setPeerCapabilities(caps) + delete(payload.Headers, HeaderKeyCapabilities) + payloadLogger.Debug("peer capabilities received") + if len(payload.Data) == 0 && !payload.IsCircuitStartFlagSet() { + return true // capabilities-only payload, consume it + } + } + payloadLogger.Debug("sending") for _, peekHandler := range self.peekHandlers { @@ -566,6 +623,19 @@ func (self *Xgress) tx() { } } +func (self *Xgress) sendCapabilitiesResponse() { + payload := &Payload{ + CircuitId: self.circuitId, + Flags: SetOriginatorFlag(0, self.originator), + Sequence: int32(self.nextReceiveSequence()), + Data: nil, + Headers: map[uint8][]byte{ + HeaderKeyCapabilities: self.capabilitiesHeader(), + }, + } + _ = self.forwardPayload(payload, context.Background()) +} + func (self *Xgress) sendEOF() { log := pfxlog.ContextLogger(self.Label()) log.Debug("sendEOF") @@ -575,9 +645,18 @@ func (self *Xgress) sendEOF() { return } + var flag Flag + if self.peerSupportsEOF() { + flag = PayloadFlagEOF + log.Debug("peer supports EOF, sending EOF flag") + } else { + flag = PayloadFlagCircuitEnd + log.Debug("peer does not support EOF, falling back to CircuitEnd") + } + payload := &Payload{ CircuitId: self.circuitId, - Flags: SetOriginatorFlag(uint32(PayloadFlagEOF), self.originator), + Flags: SetOriginatorFlag(uint32(flag), self.originator), Sequence: int32(self.nextReceiveSequence()), Data: nil, } diff --git a/ziti/edge/network/listener.go b/ziti/edge/network/listener.go index 224b1caf..15dad168 100644 --- a/ziti/edge/network/listener.go +++ b/ziti/edge/network/listener.go @@ -450,7 +450,7 @@ func (e MultipleErrors) Error() string { buf := strings.Builder{} buf.WriteString("multiple errors occurred") for idx, err := range e { - buf.WriteString(fmt.Sprintf(" %v: %v", idx, err)) + fmt.Fprintf(&buf, " %v: %v", idx, err) } return buf.String() } diff --git a/ziti/sdkinfo/build_info.go b/ziti/sdkinfo/build_info.go index 3f8c2c85..2d63995f 100644 --- a/ziti/sdkinfo/build_info.go +++ b/ziti/sdkinfo/build_info.go @@ -20,5 +20,5 @@ package sdkinfo const ( - Version = "v1.2.4" + Version = "v1.2.4-patch1" )