@@ -21,22 +21,24 @@ import (
2121 "github.com/emirpasic/gods/trees/btree"
2222 "github.com/emirpasic/gods/utils"
2323 "github.com/michaelquigley/pfxlog"
24+ "sync"
2425 "sync/atomic"
25- "time"
2626)
2727
2828type LinkReceiveBuffer struct {
29- tree * btree.Tree
30- sequence int32
31- maxSequence int32
32- size uint32
33- lastBufferSizeSent uint32
29+ sync.Mutex
30+ tree * btree.Tree
31+ sequence int32
32+ maxSequence int32
33+ size uint32
34+ txQueue chan * Payload
3435}
3536
36- func NewLinkReceiveBuffer () * LinkReceiveBuffer {
37+ func NewLinkReceiveBuffer (txQueueSize int32 ) * LinkReceiveBuffer {
3738 return & LinkReceiveBuffer {
3839 tree : btree .NewWith (10240 , utils .Int32Comparator ),
3940 sequence : - 1 ,
41+ txQueue : make (chan * Payload , txQueueSize ),
4042 }
4143}
4244
@@ -45,6 +47,9 @@ func (buffer *LinkReceiveBuffer) Size() uint32 {
4547}
4648
4749func (buffer * LinkReceiveBuffer ) ReceiveUnordered (x * Xgress , payload * Payload , maxSize uint32 ) bool {
50+ buffer .Lock ()
51+ defer buffer .Unlock ()
52+
4853 if payload .GetSequence () <= buffer .sequence {
4954 x .dataPlane .GetMetrics ().MarkDuplicatePayload ()
5055 return true
@@ -67,47 +72,56 @@ func (buffer *LinkReceiveBuffer) ReceiveUnordered(x *Xgress, payload *Payload, m
6772 } else {
6873 x .dataPlane .GetMetrics ().MarkDuplicatePayload ()
6974 }
75+
76+ buffer .queueNext ()
77+
7078 return true
7179}
7280
73- func (buffer * LinkReceiveBuffer ) PeekHead () * Payload {
81+ func (buffer * LinkReceiveBuffer ) queueNext () {
7482 if val := buffer .tree .LeftValue (); val != nil {
7583 payload := val .(* Payload )
7684 if payload .Sequence == buffer .sequence + 1 {
77- return payload
85+ select {
86+ case buffer .txQueue <- payload :
87+ buffer .tree .Remove (payload .Sequence )
88+ buffer .sequence = payload .Sequence
89+ default :
90+ }
7891 }
7992 }
80- return nil
8193}
8294
83- func (buffer * LinkReceiveBuffer ) Remove (payload * Payload ) {
84- buffer .tree .Remove (payload .Sequence )
85- buffer .sequence = payload .Sequence
86- }
95+ func (buffer * LinkReceiveBuffer ) NextPayload (closeNotify <- chan struct {}) * Payload {
96+ select {
97+ case payload := <- buffer .txQueue :
98+ return payload
99+ default :
100+ }
87101
88- func ( buffer * LinkReceiveBuffer ) getLastBufferSizeSent () uint32 {
89- return atomic . LoadUint32 ( & buffer .lastBufferSizeSent )
90- }
102+ buffer . Lock ()
103+ buffer .queueNext ( )
104+ buffer . Unlock ()
91105
92- func (buffer * LinkReceiveBuffer ) Inspect (x * Xgress ) * RecvBufferDetail {
93- timeout := time .After (100 * time .Millisecond )
94- inspectEvent := & receiveBufferInspectEvent {
95- buffer : buffer ,
96- notifyComplete : make (chan * RecvBufferDetail , 1 ),
106+ select {
107+ case payload := <- buffer .txQueue :
108+ return payload
109+ case <- closeNotify :
97110 }
98111
99- if x . dataPlane . GetPayloadIngester (). inspect ( inspectEvent , timeout ) {
100- select {
101- case result := <- inspectEvent . notifyComplete :
102- return result
103- case <- timeout :
104- }
112+ // closed, check if there's anything pending in the queue
113+ select {
114+ case payload := <- buffer . txQueue :
115+ return payload
116+ default :
117+ return nil
105118 }
106-
107- return buffer .inspectIncomplete ()
108119}
109120
110- func (buffer * LinkReceiveBuffer ) inspectComplete () * RecvBufferDetail {
121+ func (buffer * LinkReceiveBuffer ) Inspect () * RecvBufferDetail {
122+ buffer .Lock ()
123+ defer buffer .Unlock ()
124+
111125 nextPayload := "none"
112126 if head := buffer .tree .LeftValue (); head != nil {
113127 payload := head .(* Payload )
@@ -117,31 +131,9 @@ func (buffer *LinkReceiveBuffer) inspectComplete() *RecvBufferDetail {
117131 return & RecvBufferDetail {
118132 Size : buffer .Size (),
119133 PayloadCount : uint32 (buffer .tree .Size ()),
120- LastSizeSent : buffer .getLastBufferSizeSent (),
121134 Sequence : buffer .sequence ,
122135 MaxSequence : buffer .maxSequence ,
123136 NextPayload : nextPayload ,
124137 AcquiredSafely : true ,
125138 }
126139}
127-
128- func (buffer * LinkReceiveBuffer ) inspectIncomplete () * RecvBufferDetail {
129- return & RecvBufferDetail {
130- Size : buffer .Size (),
131- LastSizeSent : buffer .getLastBufferSizeSent (),
132- Sequence : buffer .sequence ,
133- MaxSequence : buffer .maxSequence ,
134- NextPayload : "unsafe to check" ,
135- AcquiredSafely : false ,
136- }
137- }
138-
139- type receiveBufferInspectEvent struct {
140- buffer * LinkReceiveBuffer
141- notifyComplete chan * RecvBufferDetail
142- }
143-
144- func (self * receiveBufferInspectEvent ) handle () {
145- result := self .buffer .inspectComplete ()
146- self .notifyComplete <- result
147- }
0 commit comments