Skip to content

Commit 558b595

Browse files
committed
Update reassembly_queue to RFC 9260
1 parent 803e488 commit 558b595

File tree

1 file changed

+61
-52
lines changed

1 file changed

+61
-52
lines changed

reassembly_queue.go

Lines changed: 61 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ func newChunkSet(ssn uint16, ppi PayloadProtocolIdentifier) *chunkSet {
3838
}
3939

4040
func (set *chunkSet) push(chunk *chunkPayloadData) bool {
41+
// Enforce same PPI for all fragments of a user message (deliver one PPI). RFC 9260 sec 6.9.
42+
if chunk.payloadType != set.ppi {
43+
return false
44+
}
45+
4146
// check if dup
4247
for _, c := range set.chunks {
4348
if c.tsn == chunk.tsn {
@@ -50,19 +55,15 @@ func (set *chunkSet) push(chunk *chunkPayloadData) bool {
5055
sortChunksByTSN(set.chunks)
5156

5257
// Check if we now have a complete set
53-
complete := set.isComplete()
54-
55-
return complete
58+
return set.isComplete()
5659
}
5760

5861
func (set *chunkSet) isComplete() bool {
59-
// Condition for complete set
60-
// 0. Has at least one chunk.
61-
// 1. Begins with beginningFragment set to true
62-
// 2. Ends with endingFragment set to true
63-
// 3. TSN monotinically increase by 1 from beginning to end
64-
65-
// 0.
62+
// Complete when:
63+
// 0) has at least one fragment
64+
// 1) first has beginningFragment=1, endingFragment=0
65+
// 2) last has beginningFragment=0, endingFragment=1
66+
// 3) all TSNs are strictly sequential (monotonically increasing) RFC 9260 sec 6.9
6667
nChunks := len(set.chunks)
6768
if nChunks == 0 {
6869
return false
@@ -82,12 +83,11 @@ func (set *chunkSet) isComplete() bool {
8283
var lastTSN uint32
8384
for i, chunk := range set.chunks {
8485
if i > 0 {
85-
// Fragments must have contiguous TSN
86-
// From RFC 4960 Section 3.3.1:
87-
// When a user message is fragmented into multiple chunks, the TSNs are
88-
// used by the receiver to reassemble the message. This means that the
89-
// TSNs for each fragment of a fragmented user message MUST be strictly
90-
// sequential.
86+
// RFC 9260 sec 3.3.1:
87+
// When a user message is fragmented into multiple chunks, the TSNs are
88+
// used by the receiver to reassemble the message. This means that the
89+
// TSNs for each fragment of a fragmented user message MUST be strictly
90+
// sequential.
9191
if chunk.tsn != lastTSN+1 {
9292
// mid or end fragment is missing
9393
return false
@@ -102,7 +102,7 @@ func (set *chunkSet) isComplete() bool {
102102

103103
type reassemblyQueue struct {
104104
si uint16
105-
nextSSN uint16 // expected SSN for next ordered chunk
105+
nextSSN uint16 // expected SSN for next ordered message RFC 9260 sec 6.5, 6.6
106106
ordered []*chunkSet
107107
unordered []*chunkSet
108108
unorderedChunks []*chunkPayloadData
@@ -112,14 +112,16 @@ type reassemblyQueue struct {
112112
var errTryAgain = errors.New("try again")
113113

114114
func newReassemblyQueue(si uint16) *reassemblyQueue {
115-
// From RFC 4960 Sec 6.5:
116-
// The Stream Sequence Number in all the streams MUST start from 0 when
117-
// the association is established. Also, when the Stream Sequence
118-
// Number reaches the value 65535 the next Stream Sequence Number MUST
119-
// be set to 0.
115+
// From RFC 9260 sec 6.5:
116+
// The Stream Sequence Number in all the outgoing streams MUST start from 0 when
117+
// the association is established. The Stream Sequence Number of an outgoing
118+
// stream MUST be incremented by 1 for each ordered user message sent on that
119+
// outgoing stream. In particular, when the Stream Sequence Number reaches the
120+
// value 65535, the next Stream Sequence Number MUST be set to 0. For unordered
121+
// user messages, the Stream Sequence Number MUST NOT be changed.
120122
return &reassemblyQueue{
121123
si: si,
122-
nextSSN: 0, // From RFC 4960 Sec 6.5:
124+
nextSSN: 0, // From RFC 9260 Sec 6.5:
123125
ordered: make([]*chunkSet, 0),
124126
unordered: make([]*chunkSet, 0),
125127
}
@@ -133,7 +135,13 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
133135
}
134136

135137
if chunk.unordered {
136-
// First, insert into unorderedChunks array
138+
// Keep only one instance per TSN (duplicate DATA should not double count).
139+
for _, c := range r.unorderedChunks {
140+
if c.tsn == chunk.tsn {
141+
return false
142+
}
143+
}
144+
137145
r.unorderedChunks = append(r.unorderedChunks, chunk)
138146
atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData)))
139147
sortChunksByTSN(r.unorderedChunks)
@@ -152,7 +160,6 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
152160
}
153161

154162
// This is an ordered chunk
155-
156163
if sna16LT(chunk.streamSequenceNumber, r.nextSSN) {
157164
return false
158165
}
@@ -169,6 +176,8 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
169176
// nolint:godox
170177
// TODO: this slice can get pretty big; it may be worth maintaining a map
171178
// for O(1) lookups at the cost of 2x memory.
179+
180+
// Only add to an existing fragmented set with same SSN.
172181
if set.ssn == chunk.streamSequenceNumber && set.chunks[0].isFragmented() {
173182
cset = set
174183

@@ -181,9 +190,7 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
181190
if cset == nil {
182191
cset = newChunkSet(chunk.streamSequenceNumber, chunk.payloadType)
183192
r.ordered = append(r.ordered, cset)
184-
if !chunk.unordered {
185-
sortChunksBySSN(r.ordered)
186-
}
193+
sortChunksBySSN(r.ordered)
187194
}
188195

189196
atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData)))
@@ -195,14 +202,16 @@ func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet {
195202
startIdx := -1
196203
nChunks := 0
197204
var lastTSN uint32
205+
var ppi PayloadProtocolIdentifier
198206
var found bool
199207

200208
for i, chunk := range r.unorderedChunks {
201-
// seek beigining
209+
// look for a beginning fragment
202210
if chunk.beginningFragment {
203211
startIdx = i
204212
nChunks = 1
205213
lastTSN = chunk.tsn
214+
ppi = chunk.payloadType
206215

207216
if chunk.endingFragment {
208217
found = true
@@ -217,8 +226,8 @@ func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet {
217226
continue
218227
}
219228

220-
// Check if contiguous in TSN
221-
if chunk.tsn != lastTSN+1 {
229+
// same PPI and strictly contiguous TSN required across fragments. RFC 9260 sec 6.9.
230+
if chunk.payloadType != ppi || chunk.tsn != lastTSN+1 {
222231
startIdx = -1
223232

224233
continue
@@ -238,34 +247,32 @@ func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet {
238247
return nil
239248
}
240249

241-
// Extract the range of chunks
242-
var chunks []*chunkPayloadData
243-
chunks = append(chunks, r.unorderedChunks[startIdx:startIdx+nChunks]...)
250+
// Extract the contiguous range [startIdx, startIdx+nChunks)
251+
chunks := append([]*chunkPayloadData(nil), r.unorderedChunks[startIdx:startIdx+nChunks]...)
244252

253+
// Remove them from unorderedChunks
245254
r.unorderedChunks = append(
246255
r.unorderedChunks[:startIdx],
247256
r.unorderedChunks[startIdx+nChunks:]...)
248257

249-
chunkSet := newChunkSet(0, chunks[0].payloadType)
258+
chunkSet := newChunkSet(0, chunks[0].payloadType) // SSN ignored for unordered (RFC 9260 sec 6.6)
250259
chunkSet.chunks = chunks
251260

252261
return chunkSet
253262
}
254263

255264
func (r *reassemblyQueue) isReadable() bool {
256-
// Check unordered first
265+
// Unordered messages can be delivered as soon as a complete set exists. RFC 9260 sec 6.6.
257266
if len(r.unordered) > 0 {
258267
// The chunk sets in r.unordered should all be complete.
259268
return true
260269
}
261270

262-
// Check ordered sets
271+
// Ordered delivery: only the complete set whose SSN == nextSSN is eligible. RFC 9260 sec 6.6.
263272
if len(r.ordered) > 0 {
264273
cset := r.ordered[0]
265-
if cset.isComplete() {
266-
if sna16LTE(cset.ssn, r.nextSSN) {
267-
return true
268-
}
274+
if cset.isComplete() && sna16LTE(cset.ssn, r.nextSSN) {
275+
return true
269276
}
270277
}
271278

@@ -286,9 +293,12 @@ func (r *reassemblyQueue) read(buf []byte) (int, PayloadProtocolIdentifier, erro
286293
isUnordered = true
287294
case len(r.ordered) > 0:
288295
cset = r.ordered[0]
296+
289297
if !cset.isComplete() {
290298
return 0, 0, errTryAgain
291299
}
300+
301+
// For ordered, gate on exact SSN == nextSSN (hold higher SSNs). RFC 9260 sec 6.6.
292302
if sna16GT(cset.ssn, r.nextSSN) {
293303
return 0, 0, errTryAgain
294304
}
@@ -313,6 +323,8 @@ func (r *reassemblyQueue) read(buf []byte) (int, PayloadProtocolIdentifier, erro
313323
r.unordered = r.unordered[1:]
314324
default:
315325
r.ordered = r.ordered[1:]
326+
327+
// Advance only when we delivered the exact expected SSN
316328
if cset.ssn == r.nextSSN {
317329
r.nextSSN++
318330
}
@@ -324,19 +336,16 @@ func (r *reassemblyQueue) read(buf []byte) (int, PayloadProtocolIdentifier, erro
324336
}
325337

326338
func (r *reassemblyQueue) forwardTSNForOrdered(lastSSN uint16) {
327-
// Use lastSSN to locate a chunkSet then remove it if the set has
328-
// not been complete
329-
keep := []*chunkSet{}
339+
// Only drop sets that are <= lastSSN and NOT complete (abandoned).
340+
keep := r.ordered[:0]
330341
for _, set := range r.ordered {
331-
if sna16LTE(set.ssn, lastSSN) {
332-
if !set.isComplete() {
333-
// drop the set
334-
for _, c := range set.chunks {
335-
r.subtractNumBytes(len(c.userData))
336-
}
337-
338-
continue
342+
if sna16LTE(set.ssn, lastSSN) && !set.isComplete() {
343+
// drop the set
344+
for _, c := range set.chunks {
345+
r.subtractNumBytes(len(c.userData))
339346
}
347+
348+
continue
340349
}
341350
keep = append(keep, set)
342351
}

0 commit comments

Comments
 (0)