@@ -81,7 +81,7 @@ type Config struct {
8181 RequestAck bool `json:"request_ack"`
8282
8383 // Flag to skip verifying insecure certs on TLS connections
84- TlsInsecureSkipVerify bool `json: "tls_insecure_skip_verify"`
84+ TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
8585}
8686
8787type ErrUnknownNetwork struct {
@@ -280,7 +280,7 @@ func (f *Fluent) EncodeAndPostData(tag string, tm time.Time, message interface{}
280280 var msg * msgToSend
281281 var err error
282282 if msg , err = f .EncodeData (tag , tm , message ); err != nil {
283- return fmt .Errorf ("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%v " , message , err )
283+ return fmt .Errorf ("fluent#EncodeAndPostData: can't convert '%#v' to msgpack:%w " , message , err )
284284 }
285285 return f .postRawData (msg )
286286}
@@ -594,31 +594,46 @@ func (f *Fluent) writeWithRetry(ctx context.Context, msg *msgToSend) error {
594594 return fmt .Errorf ("fluent#write: failed to write after %d attempts" , f .Config .MaxRetry )
595595}
596596
597- func (f * Fluent ) syncWriteMessage (msg * msgToSend ) error {
598- f .muconn .RLock ()
599- defer f .muconn .RUnlock ()
597+ func (f * Fluent ) syncWriteMessage (ctx context.Context , msg * msgToSend ) error {
598+ f .muconn .Lock ()
599+ defer f .muconn .Unlock ()
600+
601+ // Check if context is cancelled. If it is, we can return early here.
602+ if err := ctx .Err (); err != nil {
603+ return errIsClosing
604+ }
600605
601606 if f .conn == nil {
602- return fmt .Errorf ("connection has been closed before writing to it" )
607+ return fmt .Errorf ("fluent#write: connection has been closed before writing to it" )
603608 }
604609
605610 t := f .Config .WriteTimeout
611+ var err error
606612 if time .Duration (0 ) < t {
607- f .conn .SetWriteDeadline (time .Now ().Add (t ))
613+ err = f .conn .SetWriteDeadline (time .Now ().Add (t ))
608614 } else {
609- f .conn .SetWriteDeadline (time.Time {})
615+ err = f .conn .SetWriteDeadline (time.Time {})
610616 }
611617
612- _ , err := f .conn .Write (msg .data )
618+ if err != nil {
619+ return fmt .Errorf ("fluent#write: failed to set write deadline: %w" , err )
620+ }
621+ _ , err = f .conn .Write (msg .data )
613622 return err
614623}
615624
616- func (f * Fluent ) syncReadAck () (* AckResp , error ) {
617- f .muconn .RLock ()
618- defer f .muconn .RUnlock ()
625+ func (f * Fluent ) syncReadAck (ctx context. Context ) (* AckResp , error ) {
626+ f .muconn .Lock ()
627+ defer f .muconn .Unlock ()
619628
620629 resp := & AckResp {}
621630 var err error
631+
632+ // Check if context is cancelled. If it is, we can return early here.
633+ if err := ctx .Err (); err != nil {
634+ return resp , errIsClosing
635+ }
636+
622637 if f .Config .MarshalAsJSON {
623638 dec := json .NewDecoder (f .conn )
624639 err = dec .Decode (resp )
@@ -639,19 +654,19 @@ func (f *Fluent) write(ctx context.Context, msg *msgToSend) (bool, error) {
639654 if err := f .syncConnectWithRetry (ctx ); err != nil {
640655 // Here, we don't want to retry the write since connectWithRetry already
641656 // retries Config.MaxRetry times to connect.
642- return false , fmt .Errorf ("fluent#write: %v " , err )
657+ return false , fmt .Errorf ("fluent#write: %w " , err )
643658 }
644659
645- if err := f .syncWriteMessage (msg ); err != nil {
660+ if err := f .syncWriteMessage (ctx , msg ); err != nil {
646661 f .syncClose (false )
647- return true , fmt .Errorf ("fluent#write: %v " , err )
662+ return true , fmt .Errorf ("fluent#write: %w " , err )
648663 }
649664
650665 // Acknowledgment check
651666 if msg .ack != "" {
652- resp , err := f .syncReadAck ()
667+ resp , err := f .syncReadAck (ctx )
653668 if err != nil {
654- fmt .Fprintf (os .Stderr , "fluent#write: error reading message response ack %v" , err )
669+ fmt .Fprintf (os .Stderr , "fluent#write: error reading message response ack %v. Closing connection... " , err )
655670 f .syncClose (false )
656671 return true , err
657672 }
0 commit comments