@@ -35,12 +35,10 @@ const (
3535 defaultMaxRetryWait = 60000
3636 defaultMaxRetry = 13
3737 defaultReconnectWaitIncreRate = 1.5
38- // Default sub-second precision value to false since it is only compatible
39- // with fluentd versions v0.14 and above.
40- defaultSubSecondPrecision = false
4138
4239 // Default value whether to skip checking insecure certs on TLS connections.
4340 defaultTlsInsecureSkipVerify = false
41+ defaultReadTimeout = time .Duration (0 ) // Read() will not time out
4442)
4543
4644// randomGenerator is used by getUniqueId to generate ack hashes. Its value is replaced
@@ -82,6 +80,9 @@ type Config struct {
8280
8381 // Flag to skip verifying insecure certs on TLS connections
8482 TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
83+
84+ // ReadTimeout specifies the timeout on reads. Currently only acks are read.
85+ ReadTimeout time.Duration `json:"read_timeout"`
8586}
8687
8788type ErrUnknownNetwork struct {
@@ -153,6 +154,9 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
153154 if config .WriteTimeout == 0 {
154155 config .WriteTimeout = defaultWriteTimeout
155156 }
157+ if config .ReadTimeout == 0 {
158+ config .ReadTimeout = defaultReadTimeout
159+ }
156160 if config .BufferLimit == 0 {
157161 config .BufferLimit = defaultBufferLimit
158162 }
@@ -629,11 +633,25 @@ func (f *Fluent) syncReadAck(ctx context.Context) (*AckResp, error) {
629633 resp := & AckResp {}
630634 var err error
631635
636+ if f .conn == nil {
637+ return resp , fmt .Errorf ("fluent#read: connection has been closed before reading from it" )
638+ }
639+
632640 // Check if context is cancelled. If it is, we can return early here.
633641 if err := ctx .Err (); err != nil {
634642 return resp , errIsClosing
635643 }
636644
645+ t := f .Config .ReadTimeout
646+ if time .Duration (0 ) < t {
647+ err = f .conn .SetReadDeadline (time .Now ().Add (t ))
648+ } else {
649+ err = f .conn .SetReadDeadline (time.Time {})
650+ }
651+ if err != nil {
652+ return resp , fmt .Errorf ("fluent#read: failed to set read deadline: %w" , err )
653+ }
654+
637655 if f .Config .MarshalAsJSON {
638656 dec := json .NewDecoder (f .conn )
639657 err = dec .Decode (resp )
0 commit comments