@@ -398,10 +398,7 @@ func (f *Fluent) Close() (err error) {
398398 }
399399 }
400400
401- f .muconn .Lock ()
402- f .close ()
403- atomic .StoreInt32 (& f .closed , 1 )
404- f .muconn .Unlock ()
401+ f .syncClose (true )
405402
406403 // If ForceStopAsyncSend is true, we shall close the connection before waiting for
407404 // run() goroutine to exit to be sure we aren't waiting on ack message that might
@@ -436,6 +433,17 @@ func (f *Fluent) appendBuffer(msg *msgToSend) error {
436433 return nil
437434}
438435
436+ func (f * Fluent ) syncClose (setClosed bool ) {
437+ f .muconn .Lock ()
438+ defer f .muconn .Unlock ()
439+
440+ if setClosed {
441+ atomic .StoreInt32 (& f .closed , 1 )
442+ }
443+
444+ f .close ()
445+ }
446+
439447// close closes the connection. Callers should take care of locking muconn first.
440448func (f * Fluent ) close () {
441449 if f .conn != nil {
@@ -476,6 +484,17 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
476484
477485var errIsClosing = errors .New ("fluent logger is closing" )
478486
487+ func (f * Fluent ) syncConnectWithRetry (ctx context.Context ) error {
488+ f .muconn .Lock ()
489+ defer f .muconn .Unlock ()
490+
491+ if f .conn == nil {
492+ return f .connectWithRetry (ctx )
493+ }
494+
495+ return nil
496+ }
497+
479498// Caller should take care of locking muconn first.
480499func (f * Fluent ) connectWithRetry (ctx context.Context ) error {
481500 // A Time channel is used instead of time.Sleep() to avoid blocking this
@@ -575,75 +594,70 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
575594 return fmt .Errorf ("fluent#write: failed to write after %d attempts" , f .Config .MaxRetry )
576595}
577596
597+ func (f * Fluent ) syncWriteMessage (msg * msgToSend ) error {
598+ f .muconn .RLock ()
599+ defer f .muconn .RUnlock ()
600+
601+ if f .conn == nil {
602+ return fmt .Errorf ("connection has been closed before writing to it" )
603+ }
604+
605+ t := f .Config .WriteTimeout
606+ if time .Duration (0 ) < t {
607+ f .conn .SetWriteDeadline (time .Now ().Add (t ))
608+ } else {
609+ f .conn .SetWriteDeadline (time.Time {})
610+ }
611+
612+ _ , err := f .conn .Write (msg .data )
613+ return err
614+ }
615+
616+ func (f * Fluent ) syncReadAck () (* AckResp , error ) {
617+ f .muconn .RLock ()
618+ defer f .muconn .RUnlock ()
619+
620+ resp := & AckResp {}
621+ var err error
622+ if f .Config .MarshalAsJSON {
623+ dec := json .NewDecoder (f .conn )
624+ err = dec .Decode (resp )
625+ } else {
626+ r := msgp .NewReader (f .conn )
627+ err = resp .DecodeMsg (r )
628+ }
629+
630+ return resp , err
631+ }
632+
578633// write writes the provided msg to fluentd server. Its first return values is
579634// a bool indicating whether the write should be retried.
580635// This method relies on function literals to execute muconn.Unlock or
581636// muconn.RUnlock in deferred calls to ensure the mutex is unlocked even in
582637// the case of panic recovering.
583638func (f * Fluent ) write (ctx context.Context , msg * msgToSend ) (bool , error ) {
584- closer := func () {
585- f .muconn .Lock ()
586- defer f .muconn .Unlock ()
587-
588- f .close ()
589- }
590-
591- if err := func () (err error ) {
592- f .muconn .Lock ()
593- defer f .muconn .Unlock ()
594-
595- if f .conn == nil {
596- err = f .connectWithRetry (ctx )
597- }
598-
599- return err
600- }(); err != nil {
639+ if err := f .syncConnectWithRetry (ctx ); err != nil {
601640 // Here, we don't want to retry the write since connectWithRetry already
602641 // retries Config.MaxRetry times to connect.
603642 return false , fmt .Errorf ("fluent#write: %v" , err )
604643 }
605644
606- if err := func () (err error ) {
607- f .muconn .RLock ()
608- defer f .muconn .RUnlock ()
609-
610- if f .conn == nil {
611- return fmt .Errorf ("connection has been closed before writing to it" )
612- }
613-
614- t := f .Config .WriteTimeout
615- if time .Duration (0 ) < t {
616- f .conn .SetWriteDeadline (time .Now ().Add (t ))
617- } else {
618- f .conn .SetWriteDeadline (time.Time {})
619- }
620-
621- _ , err = f .conn .Write (msg .data )
622- return err
623- }(); err != nil {
624- closer ()
645+ if err := f .syncWriteMessage (msg ); err != nil {
646+ f .syncClose (false )
625647 return true , fmt .Errorf ("fluent#write: %v" , err )
626648 }
627649
628650 // Acknowledgment check
629651 if msg .ack != "" {
630- f .muconn .Lock ()
631-
632- resp := & AckResp {}
633- var err error
634- if f .Config .MarshalAsJSON {
635- dec := json .NewDecoder (f .conn )
636- err = dec .Decode (resp )
637- } else {
638- r := msgp .NewReader (f .conn )
639- err = resp .DecodeMsg (r )
652+ resp , err := f .syncReadAck ()
653+ if err != nil {
654+ fmt .Fprintf (os .Stderr , "fluent#write: error reading message response ack %v" , err )
655+ f .syncClose (false )
656+ return true , err
640657 }
641- f .muconn .Unlock ()
642-
643- if err != nil || resp .Ack != msg .ack {
658+ if resp .Ack != msg .ack {
644659 fmt .Fprintf (os .Stderr , "fluent#write: message ack (%s) doesn't match expected one (%s). Closing connection..." , resp .Ack , msg .ack )
645-
646- closer ()
660+ f .syncClose (false )
647661 return true , err
648662 }
649663 }
0 commit comments