Skip to content

Commit a2a548e

Browse files
authored
Merge pull request #880 from openziti/xg-capabilities-fix-eof
Handle differences in xgress eof/end-of-circuit handling by adding a capabilities exchange. Fixes #877
2 parents e874bc4 + a968648 commit a2a548e

File tree

10 files changed

+109
-15
lines changed

10 files changed

+109
-15
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
# Release notes 1.5.1
2+
3+
## Issues Fixed and Dependency Updates
4+
5+
* github.com/openziti/sdk-golang: [v1.5.0 -> v1.5.1](https://github.com/openziti/sdk-golang/compare/v1.5.0...v1.5.1)
6+
* [Issue #877](https://github.com/openziti/sdk-golang/issues/877) - Handle differences in xgress eof/end-of-circuit handling by adding a capabilities exchange
7+
8+
* github.com/openziti/edge-api: [v0.26.56 -> v0.27.0](https://github.com/openziti/edge-api/compare/v0.26.56...v0.27.0)
9+
10+
111
# Release notes 1.5.0
212

313
## Notes

example/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/google/uuid v1.6.0
1111
github.com/gorilla/mux v1.8.1
1212
github.com/michaelquigley/pfxlog v0.6.10
13-
github.com/openziti/edge-api v0.26.56
13+
github.com/openziti/edge-api v0.27.0
1414
github.com/openziti/foundation/v2 v2.0.87
1515
github.com/openziti/runzmd v1.0.83
1616
github.com/openziti/sdk-golang v1.2.6

example/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,8 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
372372
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
373373
github.com/openziti/channel/v4 v4.3.6 h1:WvkDuYkoDOKJM5q4uAUcHNJ6XhOl7kmbZBn5YVWWwMI=
374374
github.com/openziti/channel/v4 v4.3.6/go.mod h1:N0y43HqOpLqMr545Dt3g62tNy07g65UylmnwVs+c4JM=
375-
github.com/openziti/edge-api v0.26.56 h1:8n9o3rgi3Mkyy2D45zIDad2N2ah3KJEUaae3rMhZyoI=
376-
github.com/openziti/edge-api v0.26.56/go.mod h1:Sj8HEql6ol2Oqp0yd3ZbGayCg8t/XTlH7q608UDHrwE=
375+
github.com/openziti/edge-api v0.27.0 h1:fN9Deue5fLDU0HqtXoSf4I4NExUNrv7s09K5hX1t2Kw=
376+
github.com/openziti/edge-api v0.27.0/go.mod h1:Sj8HEql6ol2Oqp0yd3ZbGayCg8t/XTlH7q608UDHrwE=
377377
github.com/openziti/foundation/v2 v2.0.87 h1:tD1Lba3C/MfBmDnLSmlyKB8hxjry4HAaNrdJqN+fqxQ=
378378
github.com/openziti/foundation/v2 v2.0.87/go.mod h1:LrE/z8YXQUbwfyGwg3HgFs9ElGOq/T61EXbkagkDozQ=
379379
github.com/openziti/go-term-markdown v1.0.1 h1:9uzMpK4tav6OtvRxRt99WwPTzAzCh+Pj9zWU2FBp3Qg=

example/influxdb-client-go/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ require (
104104
github.com/muhlemmer/gu v0.3.1 // indirect
105105
github.com/oklog/ulid v1.3.1 // indirect
106106
github.com/openziti/channel/v4 v4.3.6 // indirect
107-
github.com/openziti/edge-api v0.26.56 // indirect
107+
github.com/openziti/edge-api v0.27.0 // indirect
108108
github.com/openziti/foundation/v2 v2.0.87 // indirect
109109
github.com/openziti/identity v1.0.125 // indirect
110110
github.com/openziti/metrics v1.4.3 // indirect

example/influxdb-client-go/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -448,8 +448,8 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
448448
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
449449
github.com/openziti/channel/v4 v4.3.6 h1:WvkDuYkoDOKJM5q4uAUcHNJ6XhOl7kmbZBn5YVWWwMI=
450450
github.com/openziti/channel/v4 v4.3.6/go.mod h1:N0y43HqOpLqMr545Dt3g62tNy07g65UylmnwVs+c4JM=
451-
github.com/openziti/edge-api v0.26.56 h1:8n9o3rgi3Mkyy2D45zIDad2N2ah3KJEUaae3rMhZyoI=
452-
github.com/openziti/edge-api v0.26.56/go.mod h1:Sj8HEql6ol2Oqp0yd3ZbGayCg8t/XTlH7q608UDHrwE=
451+
github.com/openziti/edge-api v0.27.0 h1:fN9Deue5fLDU0HqtXoSf4I4NExUNrv7s09K5hX1t2Kw=
452+
github.com/openziti/edge-api v0.27.0/go.mod h1:Sj8HEql6ol2Oqp0yd3ZbGayCg8t/XTlH7q608UDHrwE=
453453
github.com/openziti/foundation/v2 v2.0.87 h1:tD1Lba3C/MfBmDnLSmlyKB8hxjry4HAaNrdJqN+fqxQ=
454454
github.com/openziti/foundation/v2 v2.0.87/go.mod h1:LrE/z8YXQUbwfyGwg3HgFs9ElGOq/T61EXbkagkDozQ=
455455
github.com/openziti/identity v1.0.125 h1:vrtasPXwQHpHYDhdtzCEMcMMIIx794LiC3P+Bh2Lm3Y=

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ require (
1717
github.com/mitchellh/go-ps v1.0.0
1818
github.com/mitchellh/mapstructure v1.5.0
1919
github.com/openziti/channel/v4 v4.3.6
20-
github.com/openziti/edge-api v0.26.56
20+
github.com/openziti/edge-api v0.27.0
2121
github.com/openziti/foundation/v2 v2.0.87
2222
github.com/openziti/identity v1.0.125
2323
github.com/openziti/metrics v1.4.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,8 @@ github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
329329
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
330330
github.com/openziti/channel/v4 v4.3.6 h1:WvkDuYkoDOKJM5q4uAUcHNJ6XhOl7kmbZBn5YVWWwMI=
331331
github.com/openziti/channel/v4 v4.3.6/go.mod h1:N0y43HqOpLqMr545Dt3g62tNy07g65UylmnwVs+c4JM=
332-
github.com/openziti/edge-api v0.26.56 h1:8n9o3rgi3Mkyy2D45zIDad2N2ah3KJEUaae3rMhZyoI=
333-
github.com/openziti/edge-api v0.26.56/go.mod h1:Sj8HEql6ol2Oqp0yd3ZbGayCg8t/XTlH7q608UDHrwE=
332+
github.com/openziti/edge-api v0.27.0 h1:fN9Deue5fLDU0HqtXoSf4I4NExUNrv7s09K5hX1t2Kw=
333+
github.com/openziti/edge-api v0.27.0/go.mod h1:Sj8HEql6ol2Oqp0yd3ZbGayCg8t/XTlH7q608UDHrwE=
334334
github.com/openziti/foundation/v2 v2.0.87 h1:tD1Lba3C/MfBmDnLSmlyKB8hxjry4HAaNrdJqN+fqxQ=
335335
github.com/openziti/foundation/v2 v2.0.87/go.mod h1:LrE/z8YXQUbwfyGwg3HgFs9ElGOq/T61EXbkagkDozQ=
336336
github.com/openziti/identity v1.0.125 h1:vrtasPXwQHpHYDhdtzCEMcMMIIx794LiC3P+Bh2Lm3Y=

xgress/messages.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ package xgress
1919
import (
2020
"encoding/binary"
2121
"fmt"
22+
"math"
23+
2224
"github.com/openziti/channel/v4"
2325
"github.com/openziti/foundation/v2/info"
2426
"github.com/openziti/foundation/v2/uuidz"
2527
"github.com/pkg/errors"
2628
"github.com/sirupsen/logrus"
27-
"math"
2829
)
2930

3031
const (
@@ -82,6 +83,10 @@ const (
8283
PayloadFlagWriteFailed Flag = 64
8384
)
8485

86+
const (
87+
HeaderKeyCapabilities uint8 = 1
88+
)
89+
8590
func NewAcknowledgement(circuitId string, originator Originator) *Acknowledgement {
8691
return &Acknowledgement{
8792
CircuitId: circuitId,

xgress/xgress.go

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ const (
4848
endOfCircuitSentFlag = 3
4949
closedTxer = 4
5050
rxPushModeFlag = 5 // false == pull, use rx(), 1 == push, use WriteAdapter
51+
52+
// bits 8-16 reserved for peer capabilities (same values used in wire format)
53+
CapabilityEOFIndex = 8
54+
CapabilityEOFMask = 1 << CapabilityEOFIndex
55+
capabilitiesMask = 0x1FF00 // bits 8-16
5156
)
5257

5358
var ErrWriteClosed = errors.New("write closed")
@@ -224,11 +229,38 @@ func (self *Xgress) markCircuitEndReceived() {
224229
self.flags.Set(endOfCircuitRecvdFlag, true)
225230
}
226231

232+
func (self *Xgress) capabilities() uint32 {
233+
return CapabilityEOFMask
234+
}
235+
236+
func (self *Xgress) capabilitiesHeader() []byte {
237+
buf := make([]byte, 4)
238+
binary.BigEndian.PutUint32(buf, self.capabilities())
239+
return buf
240+
}
241+
242+
func (self *Xgress) setPeerCapabilities(caps []byte) {
243+
if len(caps) >= 4 {
244+
v := binary.BigEndian.Uint32(caps) & capabilitiesMask
245+
for {
246+
current := self.flags.Load()
247+
next := current | v
248+
if self.flags.CompareAndSetAll(current, next) {
249+
return
250+
}
251+
}
252+
}
253+
}
254+
255+
func (self *Xgress) peerSupportsEOF() bool {
256+
return self.flags.IsSet(CapabilityEOFIndex)
257+
}
258+
227259
func (self *Xgress) IsCircuitStarted() bool {
228260
return !self.IsTerminator() || self.flags.IsSet(rxerStartedFlag)
229261
}
230262

231-
func (self *Xgress) firstCircuitStartReceived() bool {
263+
func (self *Xgress) isRxStarted() bool {
232264
return self.flags.CompareAndSet(rxerStartedFlag, false, true)
233265
}
234266

@@ -244,7 +276,7 @@ func (self *Xgress) Start() {
244276
if self.Options.CircuitStartTimeout > time.Second {
245277
time.AfterFunc(self.Options.CircuitStartTimeout, self.terminateIfNotStarted)
246278
}
247-
} else {
279+
} else if self.isRxStarted() {
248280
log.Debug("initiator: sending circuit start")
249281
go self.payloadBuffer.run()
250282
_ = self.forwardPayload(self.GetStartCircuit(), context.Background())
@@ -273,6 +305,9 @@ func (self *Xgress) GetStartCircuit() *Payload {
273305
Flags: SetOriginatorFlag(uint32(PayloadFlagCircuitStart), self.originator),
274306
Sequence: int32(self.nextReceiveSequence()),
275307
Data: nil,
308+
Headers: map[uint8][]byte{
309+
HeaderKeyCapabilities: self.capabilitiesHeader(),
310+
},
276311
}
277312
return startCircuit
278313
}
@@ -423,9 +458,21 @@ func (self *Xgress) HandleControlReceive(controlType ControlType, headers channe
423458
}
424459

425460
func (self *Xgress) acceptPayload(payload *Payload) {
426-
if payload.IsCircuitStartFlagSet() && self.firstCircuitStartReceived() {
461+
if payload.IsCircuitStartFlagSet() && self.isRxStarted() {
427462
pfxlog.ContextLogger(self.Label()).Debug("start received")
463+
464+
var peerSentCapabilities bool
465+
if caps, ok := payload.Headers[HeaderKeyCapabilities]; ok {
466+
self.setPeerCapabilities(caps)
467+
peerSentCapabilities = true
468+
}
469+
428470
go self.payloadBuffer.run()
471+
472+
if peerSentCapabilities {
473+
self.sendCapabilitiesResponse()
474+
}
475+
429476
if !self.flags.IsSet(rxPushModeFlag) {
430477
go self.rx()
431478
}
@@ -473,6 +520,16 @@ func (self *Xgress) tx() {
473520
return false
474521
}
475522

523+
// Intercept capabilities header from peer
524+
if caps, ok := payload.Headers[HeaderKeyCapabilities]; ok {
525+
self.setPeerCapabilities(caps)
526+
delete(payload.Headers, HeaderKeyCapabilities)
527+
payloadLogger.Debug("peer capabilities received")
528+
if len(payload.Data) == 0 && !payload.IsCircuitStartFlagSet() {
529+
return true // capabilities-only payload, consume it
530+
}
531+
}
532+
476533
payloadLogger.Debug("sending")
477534

478535
for _, peekHandler := range self.peekHandlers {
@@ -566,6 +623,19 @@ func (self *Xgress) tx() {
566623
}
567624
}
568625

626+
func (self *Xgress) sendCapabilitiesResponse() {
627+
payload := &Payload{
628+
CircuitId: self.circuitId,
629+
Flags: SetOriginatorFlag(0, self.originator),
630+
Sequence: int32(self.nextReceiveSequence()),
631+
Data: nil,
632+
Headers: map[uint8][]byte{
633+
HeaderKeyCapabilities: self.capabilitiesHeader(),
634+
},
635+
}
636+
_ = self.forwardPayload(payload, context.Background())
637+
}
638+
569639
func (self *Xgress) sendEOF() {
570640
log := pfxlog.ContextLogger(self.Label())
571641
log.Debug("sendEOF")
@@ -575,9 +645,18 @@ func (self *Xgress) sendEOF() {
575645
return
576646
}
577647

648+
var flag Flag
649+
if self.peerSupportsEOF() {
650+
flag = PayloadFlagEOF
651+
log.Debug("peer supports EOF, sending EOF flag")
652+
} else {
653+
flag = PayloadFlagCircuitEnd
654+
log.Debug("peer does not support EOF, falling back to CircuitEnd")
655+
}
656+
578657
payload := &Payload{
579658
CircuitId: self.circuitId,
580-
Flags: SetOriginatorFlag(uint32(PayloadFlagEOF), self.originator),
659+
Flags: SetOriginatorFlag(uint32(flag), self.originator),
581660
Sequence: int32(self.nextReceiveSequence()),
582661
Data: nil,
583662
}

ziti/sdkinfo/build_info.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)