@@ -296,7 +296,7 @@ func (f *Fluent) Close() (err error) {
296296 close (f .pending )
297297 f .wg .Wait ()
298298 }
299- f .close ()
299+ f .close (f . conn )
300300 return
301301}
302302
@@ -311,9 +311,9 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
311311}
312312
313313// close closes the connection.
314- func (f * Fluent ) close () {
314+ func (f * Fluent ) close (c net. Conn ) {
315315 f .muconn .Lock ()
316- if f .conn != nil {
316+ if f .conn != nil && f . conn == c {
317317 f .conn .Close ()
318318 f .conn = nil
319319 }
@@ -355,11 +355,11 @@ func e(x, y float64) int {
355355}
356356
357357func (f * Fluent ) write (msg * msgToSend ) error {
358-
358+ var c net. Conn
359359 for i := 0 ; i < f .Config .MaxRetry ; i ++ {
360-
360+ c = f . conn
361361 // Connect if needed
362- if f . conn == nil {
362+ if c == nil {
363363 f .muconn .Lock ()
364364 if f .conn == nil {
365365 err := f .connect ()
@@ -373,32 +373,33 @@ func (f *Fluent) write(msg *msgToSend) error {
373373 continue
374374 }
375375 }
376+ c = f .conn
376377 f .muconn .Unlock ()
377378 }
378379
379380 // We're connected, write msg
380381 t := f .Config .WriteTimeout
381382 if time .Duration (0 ) < t {
382- f . conn .SetWriteDeadline (time .Now ().Add (t ))
383+ c .SetWriteDeadline (time .Now ().Add (t ))
383384 } else {
384- f . conn .SetWriteDeadline (time.Time {})
385+ c .SetWriteDeadline (time.Time {})
385386 }
386- _ , err := f . conn .Write (msg .data )
387+ _ , err := c .Write (msg .data )
387388 if err != nil {
388- f .close ()
389+ f .close (c )
389390 } else {
390391 // Acknowledgment check
391392 if msg .ack != "" {
392393 resp := & AckResp {}
393394 if f .Config .MarshalAsJSON {
394- dec := json .NewDecoder (f . conn )
395+ dec := json .NewDecoder (c )
395396 err = dec .Decode (resp )
396397 } else {
397- r := msgp .NewReader (f . conn )
398+ r := msgp .NewReader (c )
398399 err = resp .DecodeMsg (r )
399400 }
400401 if err != nil || resp .Ack != msg .ack {
401- f .close ()
402+ f .close (c )
402403 continue
403404 }
404405 }
0 commit comments