@@ -23,6 +23,12 @@ enum TcpStateEnum {
2323 TIME_WAIT = 10 ,
2424}
2525
26+ enum ProcessingResult {
27+ SUCCESS = 0 ,
28+ DISCARD = 1 ,
29+ POSTPONE = 2 ,
30+ }
31+
2632const MAX_BUFFER_SIZE = 0xffff ;
2733const MAX_SEGMENT_SIZE = 1460 ;
2834const u32_MODULUS = 0x100000000 ; // 2^32
@@ -78,6 +84,8 @@ export class TcpState {
7884 private connectionQueue = new AsyncQueue < undefined > ( ) ;
7985 private retransmissionQueue : RetransmissionQueue ;
8086
87+ private recvQueue = new ReceivedSegmentsQueue ( ) ;
88+
8189 // Buffer of data received
8290 private readBuffer = new BytesBuffer ( MAX_BUFFER_SIZE ) ;
8391 private readChannel = new AsyncQueue < number > ( ) ;
@@ -193,7 +201,7 @@ export class TcpState {
193201 return true ;
194202 }
195203
196- private handleSegment ( segment : TcpSegment ) {
204+ private handleSegment ( segment : TcpSegment ) : ProcessingResult {
197205 // Sanity check: ports match with expected
198206 if (
199207 segment . sourcePort !== this . dstPort ||
@@ -209,16 +217,16 @@ export class TcpState {
209217 if ( ack <= this . initialSendSeqNum || ack > this . sendNext ) {
210218 if ( flags . rst ) {
211219 console . debug ( "Invalid SYN_SENT ACK with RST" ) ;
212- return false ;
220+ return ProcessingResult . DISCARD ;
213221 }
214222 console . debug ( "Invalid SYN_SENT ACK, sending RST" ) ;
215223 this . newSegment ( ack , 0 ) . withFlags ( new Flags ( ) . withRst ( ) ) ;
216- return false ;
224+ return ProcessingResult . DISCARD ;
217225 }
218226 // Try to process ACK
219227 if ( ! this . isAckValid ( segment . acknowledgementNumber ) ) {
220228 console . debug ( "Invalid SYN_SENT ACK" ) ;
221- return false ;
229+ return ProcessingResult . DISCARD ;
222230 }
223231 }
224232 if ( flags . rst ) {
@@ -228,7 +236,7 @@ export class TcpState {
228236 throw new Error ( "error: connection reset" ) ;
229237 } else {
230238 console . debug ( "SYN_SENT RST without ACK, dropping segment" ) ;
231- return false ;
239+ return ProcessingResult . DISCARD ;
232240 }
233241 }
234242 if ( flags . syn ) {
@@ -242,11 +250,11 @@ export class TcpState {
242250 // Process the segment normally
243251 this . state = TcpStateEnum . ESTABLISHED ;
244252 this . connectionQueue . push ( undefined ) ;
245- if ( ! this . handleSegmentData ( segment ) ) {
253+ if ( this . handleSegmentData ( segment ) !== ProcessingResult . SUCCESS ) {
246254 console . debug ( "Segment data processing failed" ) ;
247- return false ;
255+ return ProcessingResult . DISCARD ;
248256 }
249- return true ;
257+ return ProcessingResult . SUCCESS ;
250258 } else {
251259 // It's a SYN
252260 if ( segment . data . length > 0 ) {
@@ -264,16 +272,16 @@ export class TcpState {
264272 }
265273 if ( ! ( flags . rst || flags . syn ) ) {
266274 console . debug ( "SYN_SENT segment without SYN or RST" ) ;
267- return false ;
275+ return ProcessingResult . DISCARD ;
268276 }
269- return true ;
277+ return ProcessingResult . SUCCESS ;
270278 }
271279 // Check the sequence number is valid
272280 const segSeq = segment . sequenceNumber ;
273281 const segLen = segment . data . length ;
274282 if ( ! this . isSeqNumValid ( segSeq , segLen ) ) {
275283 console . debug ( "Sequence number not valid" ) ;
276- return false ;
284+ return ProcessingResult . DISCARD ;
277285 }
278286
279287 // TODO: handle RST or SYN flags
@@ -285,15 +293,15 @@ export class TcpState {
285293 // If the ACK bit is off, drop the segment.
286294 if ( ! flags . ack ) {
287295 console . debug ( "ACK bit is off, dropping segment" ) ;
288- return false ;
296+ return ProcessingResult . DISCARD ;
289297 }
290298 if ( this . state === TcpStateEnum . SYN_RECEIVED ) {
291299 if ( ! this . isAckValid ( segment . acknowledgementNumber ) ) {
292300 console . debug ( "ACK invalid, dropping segment" ) ;
293301 this . newSegment ( segment . acknowledgementNumber , 0 ) . withFlags (
294302 new Flags ( ) . withRst ( ) ,
295303 ) ;
296- return false ;
304+ return ProcessingResult . DISCARD ;
297305 }
298306 this . state = TcpStateEnum . ESTABLISHED ;
299307 this . connectionQueue . push ( undefined ) ;
@@ -324,7 +332,7 @@ export class TcpState {
324332 this . newSegment ( this . sendNext , this . recvNext ) . withFlags (
325333 new Flags ( ) . withAck ( ) ,
326334 ) ;
327- return false ;
335+ return ProcessingResult . DISCARD ;
328336 } else {
329337 this . processAck ( segment ) ;
330338 }
@@ -345,9 +353,13 @@ export class TcpState {
345353 }
346354
347355 // Process the segment data
348- if ( ! this . handleSegmentData ( segment ) ) {
356+ const result = this . handleSegmentData ( segment ) ;
357+ if ( result === ProcessingResult . DISCARD ) {
349358 console . debug ( "Segment data processing failed, dropping segment" ) ;
350- return false ;
359+ return result ;
360+ } else if ( result === ProcessingResult . POSTPONE ) {
361+ console . debug ( "Segment data processing postponed" ) ;
362+ return result ;
351363 }
352364
353365 if ( flags . fin ) {
@@ -357,7 +369,7 @@ export class TcpState {
357369 this . notifySendPackets ( ) ;
358370 }
359371
360- return true ;
372+ return ProcessingResult . SUCCESS ;
361373 }
362374
363375 private dropSegment ( segment : TcpSegment ) {
@@ -366,13 +378,16 @@ export class TcpState {
366378 dropPacket ( this . srcHost . viewgraph , this . srcHost . id , frame ) ;
367379 }
368380
369- private handleSegmentData ( segment : TcpSegment ) {
370- // NOTE: for simplicity, we ignore cases where RCV.NXT != SEG.SEQ
381+ private handleSegmentData ( segment : TcpSegment ) : ProcessingResult {
371382 const seqNum = segment . flags . syn
372383 ? ( segment . sequenceNumber + 1 ) % u32_MODULUS
373384 : segment . sequenceNumber ;
374- if ( seqNum !== this . recvNext ) {
375- return false ;
385+ if ( seqNum > this . recvNext ) {
386+ // Postpone the segment
387+ return ProcessingResult . POSTPONE ;
388+ } else if ( seqNum < this . recvNext ) {
389+ // Drop the segment
390+ return ProcessingResult . DISCARD ;
376391 }
377392 const receivedData = segment . data ;
378393 // NOTE: for simplicity, we ignore cases where the data is only partially
@@ -386,7 +401,7 @@ export class TcpState {
386401 this . recvWindow = MAX_BUFFER_SIZE - this . readBuffer . bytesAvailable ( ) ;
387402 // We should send back an ACK segment
388403 this . notifySendPackets ( ) ;
389- return true ;
404+ return ProcessingResult . SUCCESS ;
390405 }
391406
392407 async read ( output : Uint8Array ) : Promise < number > {
@@ -546,8 +561,36 @@ export class TcpState {
546561 this . notifiedSendPackets = false ;
547562 } else if ( "segment" in result ) {
548563 receivedSegmentPromise = this . tcpQueue . pop ( ) ;
549- if ( ! this . handleSegment ( result . segment ) ) {
550- this . dropSegment ( result . segment ) ;
564+ let segment = result . segment ;
565+
566+ let processingResult = this . handleSegment ( segment ) ;
567+
568+ if ( processingResult === ProcessingResult . DISCARD ) {
569+ this . dropSegment ( segment ) ;
570+ continue ;
571+ }
572+
573+ while (
574+ processingResult !== ProcessingResult . POSTPONE &&
575+ ! this . recvQueue . isEmpty ( )
576+ ) {
577+ segment = this . recvQueue . dequeue ( ) ;
578+ processingResult = this . handleSegment ( segment ) ;
579+ if ( processingResult === ProcessingResult . DISCARD ) {
580+ this . dropSegment ( segment ) ;
581+ }
582+ }
583+
584+ if ( processingResult === ProcessingResult . POSTPONE ) {
585+ // Enqueue the segment for later processing
586+ this . recvQueue . enqueue ( segment ) ;
587+ }
588+
589+ if ( processingResult === ProcessingResult . POSTPONE ) {
590+ // Enqueue the segment for later processing
591+ this . recvQueue . enqueue ( segment ) ;
592+ } else if ( processingResult === ProcessingResult . DISCARD ) {
593+ this . dropSegment ( segment ) ;
551594 }
552595 continue ;
553596 } else if ( "seqNum" in result ) {
@@ -976,3 +1019,20 @@ class RTTEstimator {
9761019 this . currentSample . rtt += ticker . elapsedMS * this . ctx . getCurrentSpeed ( ) ;
9771020 }
9781021}
1022+
1023+ class ReceivedSegmentsQueue {
1024+ private queue : TcpSegment [ ] = [ ] ;
1025+
1026+ enqueue ( segment : TcpSegment ) {
1027+ this . queue . push ( segment ) ;
1028+ this . queue . sort ( ( a , b ) => a . sequenceNumber - b . sequenceNumber ) ;
1029+ }
1030+
1031+ dequeue ( ) : TcpSegment | undefined {
1032+ return this . queue . shift ( ) ;
1033+ }
1034+
1035+ isEmpty ( ) {
1036+ return this . queue . length === 0 ;
1037+ }
1038+ }
0 commit comments