File tree Expand file tree Collapse file tree 3 files changed +17
-5
lines changed Expand file tree Collapse file tree 3 files changed +17
-5
lines changed Original file line number Diff line number Diff line change @@ -39,6 +39,7 @@ class CoreClient {
39
39
this . reconnectCount = 0 ;
40
40
this . logstats ;
41
41
this . options = { } ;
42
+ this . endState = false ;
42
43
}
43
44
44
45
/**
@@ -48,6 +49,7 @@ class CoreClient {
48
49
* @param {integer } closeSleep - time in ms for scheduling closing, undefined = close now
49
50
*/
50
51
close ( context , closeSleep ) {
52
+ this . endState = true ;
51
53
if ( closeSleep ) {
52
54
this . timeoutClose ( context , closeSleep ) ;
53
55
} else {
@@ -109,8 +111,10 @@ class CoreClient {
109
111
* @param {object } context - event context
110
112
*/
111
113
resetTimeout ( context ) {
112
- this . cancelTimeout ( ) ;
113
- this . timeoutClose ( context , this . timeout ) ;
114
+ if ( ! this . endState ) {
115
+ this . cancelTimeout ( ) ;
116
+ this . timeoutClose ( context , this . timeout ) ;
117
+ }
114
118
}
115
119
116
120
/**
Original file line number Diff line number Diff line change @@ -83,15 +83,15 @@ class Receiver extends CoreClient {
83
83
Utils . printStatistic ( context ) ;
84
84
}
85
85
86
- //reset timer (simulaton of fetch)
87
- self . resetTimeout ( context , self . options . recvListen ) ;
88
-
89
86
//if received all expected messages and timeout is 0 close connection
90
87
if ( ( ( ! self . options . processReplyTo ) || self . options . recvListen ) && self . received === self . expected ) {
91
88
self . cancelTimeout ( ) ;
92
89
self . close ( context , self . options . closeSleep ) ;
93
90
}
94
91
92
+ //reset timer
93
+ self . resetTimeout ( context , self . options . recvListen ) ;
94
+
95
95
//add credit for drain
96
96
if ( ! ( self . options . duration > 0 ) &&
97
97
( self . expected === 0 ) &&
Original file line number Diff line number Diff line change @@ -125,6 +125,10 @@ class Sender extends CoreClient {
125
125
126
126
Utils . printMessage ( message , this . options ) ;
127
127
128
+ if ( this . options . timeout > 0 ) {
129
+ this . resetTimeout ( context , false ) ;
130
+ }
131
+
128
132
if ( this . options . logStats === 'endpoints' ) {
129
133
Utils . printStatistic ( context ) ;
130
134
}
@@ -149,6 +153,10 @@ class Sender extends CoreClient {
149
153
context . sender . send ( message ) ;
150
154
}
151
155
156
+ if ( self . options . timeout > 0 ) {
157
+ self . resetTimeout ( context , false ) ;
158
+ }
159
+
152
160
Utils . printMessage ( message , self . options ) ;
153
161
154
162
if ( self . options . logStats === 'endpoints' ) {
You can’t perform that action at this time.
0 commit comments