@@ -7,13 +7,7 @@ import (
77 "github.com/btcsuite/btclog"
88)
99
10- // queue is a fixed size queue with a sliding window that has a base and a top
11- // modulo s.
12- type queue struct {
13- // content is the current content of the queue. This is always a slice
14- // of length s but can contain nil elements if the queue isn't full.
15- content []* PacketData
16-
10+ type queueCfg struct {
1711 // s is the maximum sequence number used to label packets. Packets
1812 // are labelled with incrementing sequence numbers modulo s.
1913 // s must be strictly larger than the window size, n. This
@@ -23,6 +17,22 @@ type queue struct {
2317 // no way to tell.
2418 s uint8
2519
20+ timeout time.Duration
21+
22+ log btclog.Logger
23+
24+ sendPkt func (packet * PacketData ) error
25+ }
26+
27+ // queue is a fixed size queue with a sliding window that has a base and a top
28+ // modulo s.
29+ type queue struct {
30+ cfg * queueCfg
31+
32+ // content is the current content of the queue. This is always a slice
33+ // of length s but can contain nil elements if the queue isn't full.
34+ content []* PacketData
35+
2636 // sequenceBase keeps track of the base of the send window and so
2737 // represents the next ack that we expect from the receiver. The
2838 // maximum value of sequenceBase is s.
@@ -41,26 +51,18 @@ type queue struct {
4151 // topMtx is used to guard sequenceTop.
4252 topMtx sync.RWMutex
4353
44- lastResend time.Time
45- handshakeTimeout time.Duration
46-
47- log btclog.Logger
54+ lastResend time.Time
4855}
4956
5057// newQueue creates a new queue.
51- func newQueue (s uint8 , handshakeTimeout time.Duration ,
52- logger btclog.Logger ) * queue {
53-
54- log := log
55- if logger != nil {
56- log = logger
58+ func newQueue (cfg * queueCfg ) * queue {
59+ if cfg .log == nil {
60+ cfg .log = log
5761 }
5862
5963 return & queue {
60- content : make ([]* PacketData , s ),
61- s : s ,
62- handshakeTimeout : handshakeTimeout ,
63- log : log ,
64+ cfg : cfg ,
65+ content : make ([]* PacketData , cfg .s ),
6466 }
6567}
6668
@@ -76,7 +78,7 @@ func (q *queue) size() uint8 {
7678 return q .sequenceTop - q .sequenceBase
7779 }
7880
79- return q .sequenceTop + (q .s - q .sequenceBase )
81+ return q .sequenceTop + (q .cfg . s - q .sequenceBase )
8082}
8183
8284// addPacket adds a new packet to the queue.
@@ -86,13 +88,13 @@ func (q *queue) addPacket(packet *PacketData) {
8688
8789 packet .Seq = q .sequenceTop
8890 q .content [q .sequenceTop ] = packet
89- q .sequenceTop = (q .sequenceTop + 1 ) % q .s
91+ q .sequenceTop = (q .sequenceTop + 1 ) % q .cfg . s
9092}
9193
9294// resend invokes the callback for each packet that needs to be re-sent.
93- func (q * queue ) resend (cb func ( packet * PacketData ) error ) error {
94- if time .Since (q .lastResend ) < q .handshakeTimeout {
95- q .log .Tracef ("Resent the queue recently." )
95+ func (q * queue ) resend () error {
96+ if time .Since (q .lastResend ) < q .cfg . timeout {
97+ q .cfg . log .Tracef ("Resent the queue recently." )
9698
9799 return nil
98100 }
@@ -115,17 +117,17 @@ func (q *queue) resend(cb func(packet *PacketData) error) error {
115117 return nil
116118 }
117119
118- q .log .Tracef ("Resending the queue" )
120+ q .cfg . log .Tracef ("Resending the queue" )
119121
120122 for base != top {
121123 packet := q .content [base ]
122124
123- if err := cb (packet ); err != nil {
125+ if err := q . cfg . sendPkt (packet ); err != nil {
124126 return err
125127 }
126- base = (base + 1 ) % q .s
128+ base = (base + 1 ) % q .cfg . s
127129
128- q .log .Tracef ("Resent %d" , packet .Seq )
130+ q .cfg . log .Tracef ("Resent %d" , packet .Seq )
129131 }
130132
131133 return nil
@@ -136,8 +138,8 @@ func (q *queue) processACK(seq uint8) bool {
136138
137139 // If our queue is empty, an ACK should not have any effect.
138140 if q .size () == 0 {
139- q .log .Tracef ("Received ack %d, but queue is empty. Ignoring." ,
140- seq )
141+ q .cfg . log .Tracef ("Received ack %d, but queue is empty. " +
142+ "Ignoring." , seq )
141143
142144 return false
143145 }
@@ -150,9 +152,9 @@ func (q *queue) processACK(seq uint8) bool {
150152 // equal to the one we were expecting. So we increase our base
151153 // accordingly and send a signal to indicate that the queue size
152154 // has decreased.
153- q .log .Tracef ("Received correct ack %d" , seq )
155+ q .cfg . log .Tracef ("Received correct ack %d" , seq )
154156
155- q .sequenceBase = (q .sequenceBase + 1 ) % q .s
157+ q .sequenceBase = (q .sequenceBase + 1 ) % q .cfg . s
156158
157159 // We did receive an ACK.
158160 return true
@@ -162,7 +164,8 @@ func (q *queue) processACK(seq uint8) bool {
162164 // This could be a duplicate ACK before or it could be that we just
163165 // missed the ACK for the current base and this is actually an ACK for
164166 // another packet in the queue.
165- q .log .Tracef ("Received wrong ack %d, expected %d" , seq , q .sequenceBase )
167+ q .cfg .log .Tracef ("Received wrong ack %d, expected %d" , seq ,
168+ q .sequenceBase )
166169
167170 q .topMtx .RLock ()
168171 defer q .topMtx .RUnlock ()
@@ -171,9 +174,10 @@ func (q *queue) processACK(seq uint8) bool {
171174 // just missed a previous ACK. We can bump the base to be equal to this
172175 // sequence number.
173176 if containsSequence (q .sequenceBase , q .sequenceTop , seq ) {
174- q .log .Tracef ("Sequence %d is in the queue. Bump the base." , seq )
177+ q .cfg .log .Tracef ("Sequence %d is in the queue. Bump the base." ,
178+ seq )
175179
176- q .sequenceBase = (seq + 1 ) % q .s
180+ q .sequenceBase = (seq + 1 ) % q .cfg . s
177181
178182 // We did receive an ACK.
179183 return true
@@ -191,7 +195,7 @@ func (q *queue) processNACK(seq uint8) (bool, bool) {
191195 q .topMtx .RLock ()
192196 defer q .topMtx .RUnlock ()
193197
194- q .log .Tracef ("Received NACK %d" , seq )
198+ q .cfg . log .Tracef ("Received NACK %d" , seq )
195199
196200 // If the NACK is the same as sequenceTop, it probably means that queue
197201 // was sent successfully, but we just missed the necessary ACKs. So we
0 commit comments