@@ -73,23 +73,11 @@ class Exchange extends EventEmitter {
7373 */
7474 this . maxConnectionsPerApi = 50
7575
76- /**
77- * Define if the incoming trades should be queued
78- * @type {boolean }
79- */
80- this . shouldQueueTrades = false
81-
8276 /**
8377 * Pending recovery ranges
8478 * @type {{pair: string, from: number, to: number}[] }
8579 */
8680 this . recoveryRanges = [ ]
87-
88- /**
89- * Trades goes theres while we wait for historical response
90- * @type {Trade[] }
91- */
92- this . queuedTrades = [ ]
9381 }
9482
9583 /**
@@ -262,7 +250,7 @@ class Exchange extends EventEmitter {
262250 return
263251 }
264252
265- if ( ! / p i n g | p o n g / . test ( data ) ) {
253+ if ( ! / p i n g | p o n g / i . test ( data ) ) {
266254 console . debug (
267255 `[${ this . id } .createWs] sending ${ data . substr ( 0 , 64 ) } ${
268256 data . length > 64 ? '...' : ''
@@ -547,12 +535,24 @@ class Exchange extends EventEmitter {
547535 this . recoveryRanges . push ( range )
548536
549537 if ( ! recovering [ this . id ] ) {
538+ console . log ( `[${ this . id } .registerRangeForRecovery] exchange isn't recovering yet -> start recovering` )
550539 this . recoverNextRange ( )
551540 }
552541 }
553542
554543 async recoverNextRange ( sequencial ) {
555544 if ( ! this . recoveryRanges . length || ( recovering [ this . id ] && ! sequencial ) ) {
545+ if ( recovering [ this . id ] && ! sequencial ) {
546+ console . log ( `[${ this . id } ] attempted to start manual recovery while already recovering` )
547+ }
548+ if ( ! this . recoveryRanges . length ) {
549+ console . log ( `[${ this . id } ] no more range to recover` )
550+ if ( sequencial ) {
551+ console . log ( `[${ this . id } ] set recovering[this.id] to false` )
552+ delete recovering [ this . id ]
553+
554+ }
555+ }
556556 return
557557 }
558558
@@ -609,7 +609,7 @@ class Exchange extends EventEmitter {
609609 + range . to
610610 ) . toISOString ( ) } )`
611611 }
612- connection . timestamp = range . to
612+ connection . timestamp = Math . max ( connection . timestamp , range . to )
613613 }
614614
615615 // in rare case of slow recovery and fast reconnection happening, propagate to pending ranges for that pair
@@ -637,31 +637,9 @@ class Exchange extends EventEmitter {
637637 }
638638
639639 if ( ! this . recoveryRanges . length ) {
640- console . log ( `[${ this . id } ] no more ranges to recover` )
640+ console . log ( `[${ this . id } ] no more ranges to recover (delete recovering[this.id]) ` )
641641
642642 delete recovering [ this . id ]
643-
644- if ( this . queuedTrades . length ) {
645- const sortedQueuedTrades = this . queuedTrades . sort (
646- ( a , b ) => a . timestamp - b . timestamp
647- )
648-
649- console . log (
650- `[${ this . id } ] release trades queue (${
651- sortedQueuedTrades . length
652- } trades, ${ new Date ( + sortedQueuedTrades [ 0 ] . timestamp )
653- . toISOString ( )
654- . split ( 'T' )
655- . pop ( ) } to ${ new Date (
656- + sortedQueuedTrades [ sortedQueuedTrades . length - 1 ] . timestamp
657- )
658- . toISOString ( )
659- . split ( 'T' )
660- . pop ( ) } )`
661- )
662- this . emit ( 'trades' , sortedQueuedTrades )
663- this . queuedTrades = [ ]
664- }
665643 } else {
666644 return this . waitBeforeContinueRecovery ( ) . then ( ( ) =>
667645 this . recoverNextRange ( true )
@@ -830,8 +808,6 @@ class Exchange extends EventEmitter {
830808 * @param {string[] } pairs pairs attached to ws at opening
831809 */
832810 async onOpen ( event , api ) {
833- this . queueNextTrades ( )
834-
835811 const pairs = [ ...api . _pending , ...api . _connected ]
836812
837813 console . debug (
@@ -956,11 +932,11 @@ class Exchange extends EventEmitter {
956932 * @param {Trade[] } trades
957933 */
958934 emitTrades ( source , trades ) {
959- if ( source && this . promisesOfApiReconnections [ source ] ) {
960- return
935+ if ( ! trades || ! trades . length ) {
936+ return null
961937 }
962938
963- this . queueControl ( source , trades , 'trades' )
939+ this . emit ( 'trades' , trades , source )
964940
965941 return true
966942 }
@@ -971,26 +947,13 @@ class Exchange extends EventEmitter {
971947 * @param {Trade[] } trades
972948 */
973949 emitLiquidations ( source , trades ) {
974- if ( source && this . promisesOfApiReconnections [ source ] ) {
975- return
976- }
977-
978- this . queueControl ( source , trades , 'liquidations' )
979-
980- return true
981- }
982-
983- queueControl ( source , trades , type ) {
984950 if ( ! trades || ! trades . length ) {
985951 return null
986952 }
987953
988- if ( this . shouldQueueTrades || recovering [ this . id ] ) {
989- Array . prototype . push . apply ( this . queuedTrades , trades )
990- return null
991- }
954+ this . emit ( 'liquidations' , trades , source )
992955
993- this . emit ( type , trades , source )
956+ return true
994957 }
995958
996959 startKeepAlive ( api , payload = { event : 'ping' } , every = 30000 ) {
@@ -1110,20 +1073,6 @@ class Exchange extends EventEmitter {
11101073 return true
11111074 }
11121075
1113- queueNextTrades ( duration = 100 ) {
1114- this . shouldQueueTrades = true
1115-
1116- if ( this . _unlockTimeout ) {
1117- clearTimeout ( this . _unlockTimeout )
1118- }
1119-
1120- this . _unlockTimeout = setTimeout ( ( ) => {
1121- this . _unlockTimeout = null
1122-
1123- this . shouldQueueTrades = false
1124- } , duration )
1125- }
1126-
11271076 /**
11281077 * Wait between requests to prevent 429 HTTP errors
11291078 * @returns {Promise<void> }
0 commit comments