Skip to content

Commit 1c05506

Browse files
conorevansfujimotos
authored andcommitted
Add periodic reconnection functionality (#111)
This adds a new option "AsyncReconnectInterval" that defines the interval at which an async connection to FluentdHost is refreshed. This option is useful when FluentdHost is backed by a service pool, where a set of healthy Fluentd workers are managed dynamically behind a host name. Signed-off-by: Conor Evans <[email protected]> Reviewed-by: Alex Hamlin <[email protected]> Signed-off-by: Fujimoto Seiji <[email protected]>
1 parent 0b652e8 commit 1c05506

File tree

3 files changed

+27
-1
lines changed

3 files changed

+27
-1
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,11 @@ When Async is enabled, if this is callback is provided, it will be called on eve
132132
takes two arguments - a `[]byte` of the message that was to be sent and an `error`. If the `error` is not nil this means the
133133
delivery of the message was unsuccessful.
134134

135+
### AsyncReconnectInterval
136+
When async is enabled, this option defines the interval (ms) at which the connection
137+
to the fluentd-address is re-established. This option is useful if the address
138+
may resolve to one or more IP addresses, e.g. a Consul service address.
139+
135140
### SubSecondPrecision
136141

137142
Enable time encoding as EventTime, which contains sub-second precision values. The messages encoded with this option can be received only by Fluentd v0.14 or later.

fluent/fluent.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ type Config struct {
6565
AsyncConnect bool `json:"async_connect"`
6666
MarshalAsJSON bool `json:"marshal_as_json"`
6767

68+
// AsyncReconnectInterval defines the interval (ms) at which the connection
69+
// to the fluentd-address is re-established. This option is useful if the address
70+
// may resolve to one or more IP addresses, e.g. a Consul service address.
71+
AsyncReconnectInterval int `json:"async_reconnect_interval"`
72+
6873
// Sub-second precision timestamps are only possible for those using fluentd
6974
// v0.14+ and serializing their messages with msgpack.
7075
SubSecondPrecision bool `json:"sub_second_precision"`
@@ -108,6 +113,9 @@ type Fluent struct {
108113
closed bool
109114
wg sync.WaitGroup
110115

116+
// time at which the most recent connection to fluentd-address was established.
117+
latestReconnectTime time.Time
118+
111119
muconn sync.RWMutex
112120
conn net.Conn
113121
}
@@ -447,6 +455,10 @@ func (f *Fluent) connect(ctx context.Context) (err error) {
447455
err = NewErrUnknownNetwork(f.Config.FluentNetwork)
448456
}
449457

458+
if err == nil {
459+
f.latestReconnectTime = time.Now()
460+
}
461+
450462
return err
451463
}
452464

@@ -508,6 +520,15 @@ func (f *Fluent) run(ctx context.Context) {
508520
return
509521
}
510522

523+
if f.AsyncReconnectInterval > 0 {
524+
if time.Since(f.latestReconnectTime) > time.Duration(f.AsyncReconnectInterval)*time.Millisecond {
525+
f.muconn.Lock()
526+
f.close()
527+
f.connectWithRetry(ctx)
528+
f.muconn.Unlock()
529+
}
530+
}
531+
511532
err := f.writeWithRetry(ctx, entry)
512533
if err != nil && err != errIsClosing {
513534
fmt.Fprintf(os.Stderr, "[%s] Unable to send logs to fluentd, reconnecting...\n", time.Now().Format(time.RFC3339))

fluent/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
package fluent
22

3-
const Version = "1.4.0"
3+
const Version = "1.9.0"

0 commit comments

Comments
 (0)