@@ -48,6 +48,11 @@ const (
4848 endOfCircuitSentFlag = 3
4949 closedTxer = 4
5050 rxPushModeFlag = 5 // false == pull, use rx(), 1 == push, use WriteAdapter
51+
52+ // bits 8-16 reserved for peer capabilities (same values used in wire format)
53+ CapabilityEOFIndex = 8
54+ CapabilityEOFMask = 1 << CapabilityEOFIndex
55+ capabilitiesMask = 0x1FF00 // bits 8-16
5156)
5257
5358var ErrWriteClosed = errors .New ("write closed" )
@@ -224,11 +229,38 @@ func (self *Xgress) markCircuitEndReceived() {
224229 self .flags .Set (endOfCircuitRecvdFlag , true )
225230}
226231
232+ func (self * Xgress ) capabilities () uint32 {
233+ return CapabilityEOFMask
234+ }
235+
236+ func (self * Xgress ) capabilitiesHeader () []byte {
237+ buf := make ([]byte , 4 )
238+ binary .BigEndian .PutUint32 (buf , self .capabilities ())
239+ return buf
240+ }
241+
242+ func (self * Xgress ) setPeerCapabilities (caps []byte ) {
243+ if len (caps ) >= 4 {
244+ v := binary .BigEndian .Uint32 (caps ) & capabilitiesMask
245+ for {
246+ current := self .flags .Load ()
247+ next := current | v
248+ if self .flags .CompareAndSetAll (current , next ) {
249+ return
250+ }
251+ }
252+ }
253+ }
254+
255+ func (self * Xgress ) peerSupportsEOF () bool {
256+ return self .flags .IsSet (CapabilityEOFIndex )
257+ }
258+
227259func (self * Xgress ) IsCircuitStarted () bool {
228260 return ! self .IsTerminator () || self .flags .IsSet (rxerStartedFlag )
229261}
230262
231- func (self * Xgress ) firstCircuitStartReceived () bool {
263+ func (self * Xgress ) isRxStarted () bool {
232264 return self .flags .CompareAndSet (rxerStartedFlag , false , true )
233265}
234266
@@ -244,7 +276,7 @@ func (self *Xgress) Start() {
244276 if self .Options .CircuitStartTimeout > time .Second {
245277 time .AfterFunc (self .Options .CircuitStartTimeout , self .terminateIfNotStarted )
246278 }
247- } else {
279+ } else if self . isRxStarted () {
248280 log .Debug ("initiator: sending circuit start" )
249281 go self .payloadBuffer .run ()
250282 _ = self .forwardPayload (self .GetStartCircuit (), context .Background ())
@@ -273,6 +305,9 @@ func (self *Xgress) GetStartCircuit() *Payload {
273305 Flags : SetOriginatorFlag (uint32 (PayloadFlagCircuitStart ), self .originator ),
274306 Sequence : int32 (self .nextReceiveSequence ()),
275307 Data : nil ,
308+ Headers : map [uint8 ][]byte {
309+ HeaderKeyCapabilities : self .capabilitiesHeader (),
310+ },
276311 }
277312 return startCircuit
278313}
@@ -423,9 +458,21 @@ func (self *Xgress) HandleControlReceive(controlType ControlType, headers channe
423458}
424459
425460func (self * Xgress ) acceptPayload (payload * Payload ) {
426- if payload .IsCircuitStartFlagSet () && self .firstCircuitStartReceived () {
461+ if payload .IsCircuitStartFlagSet () && self .isRxStarted () {
427462 pfxlog .ContextLogger (self .Label ()).Debug ("start received" )
463+
464+ var peerSentCapabilities bool
465+ if caps , ok := payload .Headers [HeaderKeyCapabilities ]; ok {
466+ self .setPeerCapabilities (caps )
467+ peerSentCapabilities = true
468+ }
469+
428470 go self .payloadBuffer .run ()
471+
472+ if peerSentCapabilities {
473+ self .sendCapabilitiesResponse ()
474+ }
475+
429476 if ! self .flags .IsSet (rxPushModeFlag ) {
430477 go self .rx ()
431478 }
@@ -473,6 +520,16 @@ func (self *Xgress) tx() {
473520 return false
474521 }
475522
523+ // Intercept capabilities header from peer
524+ if caps , ok := payload .Headers [HeaderKeyCapabilities ]; ok {
525+ self .setPeerCapabilities (caps )
526+ delete (payload .Headers , HeaderKeyCapabilities )
527+ payloadLogger .Debug ("peer capabilities received" )
528+ if len (payload .Data ) == 0 && ! payload .IsCircuitStartFlagSet () {
529+ return true // capabilities-only payload, consume it
530+ }
531+ }
532+
476533 payloadLogger .Debug ("sending" )
477534
478535 for _ , peekHandler := range self .peekHandlers {
@@ -566,6 +623,19 @@ func (self *Xgress) tx() {
566623 }
567624}
568625
626+ func (self * Xgress ) sendCapabilitiesResponse () {
627+ payload := & Payload {
628+ CircuitId : self .circuitId ,
629+ Flags : SetOriginatorFlag (0 , self .originator ),
630+ Sequence : int32 (self .nextReceiveSequence ()),
631+ Data : nil ,
632+ Headers : map [uint8 ][]byte {
633+ HeaderKeyCapabilities : self .capabilitiesHeader (),
634+ },
635+ }
636+ _ = self .forwardPayload (payload , context .Background ())
637+ }
638+
569639func (self * Xgress ) sendEOF () {
570640 log := pfxlog .ContextLogger (self .Label ())
571641 log .Debug ("sendEOF" )
@@ -575,9 +645,18 @@ func (self *Xgress) sendEOF() {
575645 return
576646 }
577647
648+ var flag Flag
649+ if self .peerSupportsEOF () {
650+ flag = PayloadFlagEOF
651+ log .Debug ("peer supports EOF, sending EOF flag" )
652+ } else {
653+ flag = PayloadFlagCircuitEnd
654+ log .Debug ("peer does not support EOF, falling back to CircuitEnd" )
655+ }
656+
578657 payload := & Payload {
579658 CircuitId : self .circuitId ,
580- Flags : SetOriginatorFlag (uint32 (PayloadFlagEOF ), self .originator ),
659+ Flags : SetOriginatorFlag (uint32 (flag ), self .originator ),
581660 Sequence : int32 (self .nextReceiveSequence ()),
582661 Data : nil ,
583662 }
0 commit comments