1- import { observable } from 'mobx' ;
1+ import { action , observable } from 'mobx' ;
22
33import { InputRawPassthrough , InputRawPassthroughData } from '../types' ;
44import { HTKEventBase } from './events/event-base' ;
@@ -8,6 +8,8 @@ export interface RawTunnelMessage extends InputRawPassthroughData {
88 isBinary : true ;
99}
1010
11+ const PACKET_COMBINE_LIMIT = 500_000 ;
12+
1113export class RawTunnel extends HTKEventBase {
1214
1315 constructor (
@@ -35,9 +37,33 @@ export class RawTunnel extends HTKEventBase {
3537 return true ;
3638 }
3739
40+ @observable
3841 readonly packets : StreamMessage [ ] = [ ] ;
3942
43+ @action
4044 addChunk ( dataEvent : InputRawPassthroughData ) {
45+ const lastPacket = this . packets [ this . packets . length - 1 ] as StreamMessage | undefined ;
46+
47+ // Combine together small sequential packets from the same client within 10ms, to
48+ // simplify & clarify busy streams
49+ if (
50+ lastPacket ?. direction === dataEvent . direction &&
51+ ( dataEvent . eventTimestamp - lastPacket . timestamp < 10 ) &&
52+ lastPacket . content . byteLength + dataEvent . content . byteLength < PACKET_COMBINE_LIMIT
53+ ) {
54+ this . packets [ this . packets . length - 1 ] = new StreamMessage (
55+ {
56+ id : this . id ,
57+ direction : lastPacket . direction ,
58+ eventTimestamp : lastPacket . timestamp , // Use the first packet in the chunk's timestamp
59+ content : Buffer . concat ( [ lastPacket . content , dataEvent . content ] ) ,
60+ isBinary : true
61+ } ,
62+ this . packets . length
63+ ) ;
64+ return ;
65+ }
66+
4167 this . packets . push (
4268 new StreamMessage ( Object . assign ( dataEvent , { isBinary : true } as const ) , this . packets . length )
4369 ) ;
@@ -46,6 +72,7 @@ export class RawTunnel extends HTKEventBase {
4672 @observable
4773 private open = true ;
4874
75+ @action
4976 markClosed ( closeEvent : InputRawPassthrough ) {
5077 this . timingEvents . disconnectTimestamp = closeEvent . timingEvents . disconnectTimestamp ;
5178 this . open = false ;
0 commit comments