@@ -104,8 +104,6 @@ type Fluent struct {
104104 Config
105105
106106 dialer dialer
107- // stopRunning is used in async mode to signal to run() it should abort.
108- stopRunning chan struct {}
109107 // cancelDialings is used by Close() to stop any in-progress dialing.
110108 cancelDialings context.CancelFunc
111109 pending chan * msgToSend
@@ -176,7 +174,6 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
176174 f = & Fluent {
177175 Config : config ,
178176 dialer : d ,
179- stopRunning : make (chan struct {}),
180177 cancelDialings : cancel ,
181178 pending : make (chan * msgToSend , config .BufferLimit ),
182179 pendingMutex : sync.RWMutex {},
@@ -200,27 +197,26 @@ func newWithDialer(config Config, d dialer) (f *Fluent, err error) {
200197//
201198// Examples:
202199//
203- // // send map[string]
204- // mapStringData := map[string]string{
205- // "foo": "bar",
206- // }
207- // f.Post("tag_name", mapStringData)
200+ // // send map[string]
201+ // mapStringData := map[string]string{
202+ // "foo": "bar",
203+ // }
204+ // f.Post("tag_name", mapStringData)
208205//
209- // // send message with specified time
210- // mapStringData := map[string]string{
211- // "foo": "bar",
212- // }
213- // tm := time.Now()
214- // f.PostWithTime("tag_name", tm, mapStringData)
215- //
216- // // send struct
217- // structData := struct {
218- // Name string `msg:"name"`
219- // } {
220- // "john smith",
221- // }
222- // f.Post("tag_name", structData)
206+ // // send message with specified time
207+ // mapStringData := map[string]string{
208+ // "foo": "bar",
209+ // }
210+ // tm := time.Now()
211+ // f.PostWithTime("tag_name", tm, mapStringData)
223212//
213+ // // send struct
214+ // structData := struct {
215+ // Name string `msg:"name"`
216+ // } {
217+ // "john smith",
218+ // }
219+ // f.Post("tag_name", structData)
224220func (f * Fluent ) Post (tag string , message interface {}) error {
225221 timeNow := time .Now ()
226222 return f .PostWithTime (tag , timeNow , message )
@@ -380,7 +376,6 @@ func (f *Fluent) Close() (err error) {
380376 f .pendingMutex .Unlock ()
381377
382378 if f .Config .ForceStopAsyncSend {
383- close (f .stopRunning )
384379 f .cancelDialings ()
385380 }
386381
@@ -513,7 +508,7 @@ func (f *Fluent) run(ctx context.Context) {
513508 for {
514509 select {
515510 case entry , ok := <- f .pending :
516- // f.stopRunning is closed before f.pending only when ForceStopAsyncSend
511+ // The context is cancelled before f.pending only when ForceStopAsyncSend
517512 // is enabled. Otherwise, f.pending is closed when Close() is called.
518513 if ! ok {
519514 f .wg .Done ()
@@ -540,9 +535,9 @@ func (f *Fluent) run(ctx context.Context) {
540535 }
541536 f .AsyncResultCallback (data , err )
542537 }
543- case <- f .stopRunning :
538+ case <- ctx .Done ():
539+ // Context was canceled, which means ForceStopAsyncSend was enabled
544540 fmt .Fprintf (os .Stderr , "[%s] Discarding queued events...\n " , time .Now ().Format (time .RFC3339 ))
545-
546541 f .wg .Done ()
547542 return
548543 }
0 commit comments