Skip to content

Commit 6864609

Browse files
committed
Use maps to speed up reassembly queue lookup
1 parent 558b595 commit 6864609

File tree

1 file changed

+83
-42
lines changed

1 file changed

+83
-42
lines changed

reassembly_queue.go

Lines changed: 83 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@ func sortChunksByTSN(a []*chunkPayloadData) {
1616
})
1717
}
1818

19-
func sortChunksBySSN(a []*chunkSet) {
20-
sort.Slice(a, func(i, j int) bool {
21-
return sna16LT(a[i].ssn, a[j].ssn)
22-
})
23-
}
24-
2519
// chunkSet is a set of chunks that share the same SSN.
2620
type chunkSet struct {
2721
ssn uint16 // used only with the ordered chunks
@@ -101,12 +95,19 @@ func (set *chunkSet) isComplete() bool {
10195
}
10296

10397
type reassemblyQueue struct {
104-
si uint16
105-
nextSSN uint16 // expected SSN for next ordered message RFC 9260 sec 6.5, 6.6
106-
ordered []*chunkSet
107-
unordered []*chunkSet
98+
si uint16
99+
nextSSN uint16 // expected SSN for next ordered message RFC 9260 sec 6.5, 6.6
100+
101+
ordered []*chunkSet
102+
// Fast lookup for fragmented ordered messages by SSN (cleared once complete).
103+
orderedIndex map[uint16]*chunkSet
104+
105+
unordered []*chunkSet
106+
// Fast duplicate filter for unordered chunks by TSN.
107+
unorderedIndex map[uint32]*chunkPayloadData
108108
unorderedChunks []*chunkPayloadData
109-
nBytes uint64
109+
110+
nBytes uint64
110111
}
111112

112113
var errTryAgain = errors.New("try again")
@@ -120,11 +121,47 @@ func newReassemblyQueue(si uint16) *reassemblyQueue {
120121
// value 65535, the next Stream Sequence Number MUST be set to 0. For unordered
121122
// user messages, the Stream Sequence Number MUST NOT be changed.
122123
return &reassemblyQueue{
123-
si: si,
124-
nextSSN: 0, // From RFC 9260 Sec 6.5:
125-
ordered: make([]*chunkSet, 0),
126-
unordered: make([]*chunkSet, 0),
124+
si: si,
125+
nextSSN: 0, // From RFC 9260 Sec 6.5:
126+
ordered: make([]*chunkSet, 0),
127+
orderedIndex: make(map[uint16]*chunkSet),
128+
unordered: make([]*chunkSet, 0),
129+
unorderedIndex: make(map[uint32]*chunkPayloadData),
130+
}
131+
}
132+
133+
// insertChunkByTSN keeps r.unorderedChunks sorted by TSN with a binary insertion.
134+
func (r *reassemblyQueue) insertChunkByTSN(chunk *chunkPayloadData) {
135+
lo, hi := 0, len(r.unorderedChunks)
136+
for lo < hi {
137+
mid := (lo + hi) >> 1
138+
if sna32LT(r.unorderedChunks[mid].tsn, chunk.tsn) {
139+
lo = mid + 1
140+
} else {
141+
hi = mid
142+
}
143+
}
144+
145+
r.unorderedChunks = append(r.unorderedChunks, nil)
146+
copy(r.unorderedChunks[lo+1:], r.unorderedChunks[lo:])
147+
r.unorderedChunks[lo] = chunk
148+
}
149+
150+
// insertSetBySSN keeps r.ordered sorted by SSN.
151+
func (r *reassemblyQueue) insertSetBySSN(cs *chunkSet) {
152+
lo, hi := 0, len(r.ordered)
153+
for lo < hi {
154+
mid := (lo + hi) >> 1
155+
if sna16LT(r.ordered[mid].ssn, cs.ssn) {
156+
lo = mid + 1
157+
} else {
158+
hi = mid
159+
}
127160
}
161+
162+
r.ordered = append(r.ordered, nil)
163+
copy(r.ordered[lo+1:], r.ordered[lo:])
164+
r.ordered[lo] = cs
128165
}
129166

130167
func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
@@ -135,16 +172,15 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
135172
}
136173

137174
if chunk.unordered {
138-
// Keep only one instance per TSN (duplicate DATA should not double count).
139-
for _, c := range r.unorderedChunks {
140-
if c.tsn == chunk.tsn {
141-
return false
142-
}
175+
// O(1) duplicate filter by TSN.
176+
if _, dup := r.unorderedIndex[chunk.tsn]; dup {
177+
return false
143178
}
179+
r.unorderedIndex[chunk.tsn] = chunk
144180

145-
r.unorderedChunks = append(r.unorderedChunks, chunk)
181+
// Maintain TSN order without full re-sort each time
182+
r.insertChunkByTSN(chunk)
146183
atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData)))
147-
sortChunksByTSN(r.unorderedChunks)
148184

149185
// Scan unorderedChunks that are contiguous (in TSN)
150186
cset = r.findCompleteUnorderedChunkSet()
@@ -166,36 +202,31 @@ func (r *reassemblyQueue) push(chunk *chunkPayloadData) bool { //nolint:cyclop
166202

167203
// Check if a fragmented chunkSet with the fragmented SSN already exists
168204
if chunk.isFragmented() {
169-
for _, set := range r.ordered {
170-
// nolint:godox
171-
// TODO: add caution around SSN wrapping here... this helps only a little bit
172-
// by ensuring we don't add to an unfragmented cset (1 chunk). There's
173-
// a case where if the SSN does wrap around, we may see the same SSN
174-
// for a different chunk.
175-
176-
// nolint:godox
177-
// TODO: this slice can get pretty big; it may be worth maintaining a map
178-
// for O(1) lookups at the cost of 2x memory.
179-
180-
// Only add to an existing fragmented set with same SSN.
181-
if set.ssn == chunk.streamSequenceNumber && set.chunks[0].isFragmented() {
182-
cset = set
183-
184-
break
185-
}
205+
// O(1) lookup of an in-progress fragmented set.
206+
if set, ok := r.orderedIndex[chunk.streamSequenceNumber]; ok && len(set.chunks) > 0 && set.chunks[0].isFragmented() {
207+
cset = set
186208
}
187209
}
188210

189211
// If not found, create a new chunkSet
190212
if cset == nil {
191213
cset = newChunkSet(chunk.streamSequenceNumber, chunk.payloadType)
192-
r.ordered = append(r.ordered, cset)
193-
sortChunksBySSN(r.ordered)
214+
// Index only fragmented sequences, single-chunk messages don't need lookup.
215+
if chunk.isFragmented() {
216+
r.orderedIndex[cset.ssn] = cset
217+
}
218+
r.insertSetBySSN(cset)
194219
}
195220

196221
atomic.AddUint64(&r.nBytes, uint64(len(chunk.userData)))
197222

198-
return cset.push(chunk)
223+
complete := cset.push(chunk)
224+
if complete && chunk.isFragmented() {
225+
// No more fragments will be added, drop from index.
226+
delete(r.orderedIndex, cset.ssn)
227+
}
228+
229+
return complete
199230
}
200231

201232
func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet {
@@ -255,6 +286,11 @@ func (r *reassemblyQueue) findCompleteUnorderedChunkSet() *chunkSet {
255286
r.unorderedChunks[:startIdx],
256287
r.unorderedChunks[startIdx+nChunks:]...)
257288

289+
// Drop from index (all these TSNs are now consumed into a complete set).
290+
for _, c := range chunks {
291+
delete(r.unorderedIndex, c.tsn)
292+
}
293+
258294
chunkSet := newChunkSet(0, chunks[0].payloadType) // SSN ignored for unordered (RFC 9260 sec 6.6)
259295
chunkSet.chunks = chunks
260296

@@ -345,6 +381,9 @@ func (r *reassemblyQueue) forwardTSNForOrdered(lastSSN uint16) {
345381
r.subtractNumBytes(len(c.userData))
346382
}
347383

384+
// ensure index is clean too
385+
delete(r.orderedIndex, set.ssn)
386+
348387
continue
349388
}
350389
keep = append(keep, set)
@@ -370,9 +409,11 @@ func (r *reassemblyQueue) forwardTSNForUnordered(newCumulativeTSN uint32) {
370409
}
371410
lastIdx = i
372411
}
412+
373413
if lastIdx >= 0 {
374414
for _, c := range r.unorderedChunks[0 : lastIdx+1] {
375415
r.subtractNumBytes(len(c.userData))
416+
delete(r.unorderedIndex, c.tsn)
376417
}
377418
r.unorderedChunks = r.unorderedChunks[lastIdx+1:]
378419
}

0 commit comments

Comments
 (0)