@@ -299,8 +299,20 @@ class FluentSender {
299299 if ( this . _status === 'established' ) {
300300 return ;
301301 }
302+ if ( ! this . _socket ) {
303+ const error = new FluentLoggerError . HandshakeError ( 'socket went away before handshake' ) ;
304+ this . _handleEvent ( 'error' , error ) ;
305+ this . _disconnect ( ) ;
306+ return ;
307+ }
302308 this . _status = 'helo' ;
303309 this . _socket . once ( 'data' , ( data ) => {
310+ if ( ! this . _socket ) {
311+ const error = new FluentLoggerError . HandshakeError ( 'socket went away during handshake' ) ;
312+ this . _handleEvent ( 'error' , error ) ;
313+ this . _disconnect ( ) ;
314+ return ;
315+ }
304316 this . _socket . pause ( ) ;
305317 const heloStatus = this . _checkHelo ( data ) ;
306318 if ( ! heloStatus . succeeded ) {
@@ -310,6 +322,12 @@ class FluentSender {
310322 }
311323 this . _status = 'pingpong' ;
312324 this . _socket . write ( this . _generatePing ( ) , ( ) => {
325+ if ( ! this . _socket ) {
326+ const error = new FluentLoggerError . HandshakeError ( 'socket went away during ping' ) ;
327+ this . _handleEvent ( 'error' , error ) ;
328+ this . _disconnect ( ) ;
329+ return ;
330+ }
313331 this . _socket . resume ( ) ;
314332 this . _socket . once ( 'data' , ( data ) => {
315333 const pongStatus = this . _checkPong ( data ) ;
@@ -406,33 +424,40 @@ class FluentSender {
406424 const sendPacketSize = ( options && options . eventEntryDataSize ) || this . _sendQueueSize ;
407425 this . _socket . write ( packet , ( ) => {
408426 if ( this . requireAckResponse ) {
409- this . _socket . once ( 'data' , ( data ) => {
410- timeoutId && clearTimeout ( timeoutId ) ;
411- const response = msgpack . decode ( data , { codec : codec } ) ;
412- if ( response . ack !== options . chunk ) {
413- const error = new FluentLoggerError . ResponseError (
414- 'ack in response and chunk id in sent data are different' ,
415- { ack : response . ack , chunk : options . chunk }
416- ) ;
417- callbacks . forEach ( ( callback ) => {
418- this . _handleEvent ( 'error' , error , callback ) ;
419- } ) ;
420- } else { // no error on ack
421- callbacks . forEach ( ( callback ) => {
422- callback && callback ( ) ;
423- } ) ;
424- }
425- this . _sendQueueSize -= sendPacketSize ;
426- process . nextTick ( ( ) => {
427- this . _waitToWrite ( ) ;
428- } ) ;
429- } ) ;
430- timeoutId = setTimeout ( ( ) => {
431- const error = new FluentLoggerError . ResponseTimeout ( 'ack response timeout' ) ;
427+ if ( ! this . _socket ) {
428+ const error = new FluentLoggerError . ResponseError ( 'server went away' ) ;
432429 callbacks . forEach ( ( callback ) => {
433430 this . _handleEvent ( 'error' , error , callback ) ;
434431 } ) ;
435- } , this . ackResponseTimeout ) ;
432+ } else {
433+ this . _socket . once ( 'data' , ( data ) => {
434+ timeoutId && clearTimeout ( timeoutId ) ;
435+ const response = msgpack . decode ( data , { codec : codec } ) ;
436+ if ( response . ack !== options . chunk ) {
437+ const error = new FluentLoggerError . ResponseError (
438+ 'ack in response and chunk id in sent data are different' ,
439+ { ack : response . ack , chunk : options . chunk }
440+ ) ;
441+ callbacks . forEach ( ( callback ) => {
442+ this . _handleEvent ( 'error' , error , callback ) ;
443+ } ) ;
444+ } else { // no error on ack
445+ callbacks . forEach ( ( callback ) => {
446+ callback && callback ( ) ;
447+ } ) ;
448+ }
449+ this . _sendQueueSize -= sendPacketSize ;
450+ process . nextTick ( ( ) => {
451+ this . _waitToWrite ( ) ;
452+ } ) ;
453+ } ) ;
454+ timeoutId = setTimeout ( ( ) => {
455+ const error = new FluentLoggerError . ResponseTimeout ( 'ack response timeout' ) ;
456+ callbacks . forEach ( ( callback ) => {
457+ this . _handleEvent ( 'error' , error , callback ) ;
458+ } ) ;
459+ } , this . ackResponseTimeout ) ;
460+ }
436461 } else {
437462 this . _sendQueueSize -= sendPacketSize ;
438463 callbacks . forEach ( ( callback ) => {
0 commit comments