@@ -10,6 +10,9 @@ import (
1010 "net"
1111 "reflect"
1212 "runtime"
13+ "strconv"
14+ "strings"
15+ "sync"
1316 "testing"
1417 "time"
1518
@@ -45,18 +48,18 @@ func newTestDialer() *testDialer {
4548// For instance, to test an async logger that have to dial 4 times before succeeding,
4649// the test should look like this:
4750//
48- // d := newTestDialer() // Create a new stubbed dialer
49- // cfg := Config{
50- // Async: true,
51- // // ...
52- // }
53- // f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
54- // f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
51+ // d := newTestDialer() // Create a new stubbed dialer
52+ // cfg := Config{
53+ // Async: true,
54+ // // ...
55+ // }
56+ // f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
57+ // f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
5558//
56- // d.waitForNextDialing(false, false) // 1st dialing attempt fails
57- // d.waitForNextDialing(false, false) // 2nd attempt fails too
58- // d.waitForNextDialing(false, false) // 3rd attempt fails too
59- // d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
59+ // d.waitForNextDialing(false, false) // 1st dialing attempt fails
60+ // d.waitForNextDialing(false, false) // 2nd attempt fails too
61+ // d.waitForNextDialing(false, false) // 3rd attempt fails too
62+ // d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
6063//
6164// Note that in the above example, the logger operates in async mode. As such,
6265// a call to Post, PostWithTime or EncodeAndPostData is needed *before* calling
@@ -67,20 +70,20 @@ func newTestDialer() *testDialer {
6770// case, you have to put the calls to newWithDialer() and to EncodeAndPostData()
6871// into their own goroutine. An example:
6972//
70- // d := newTestDialer() // Create a new stubbed dialer
71- // cfg := Config{
72- // Async: false,
73- // // ...
74- // }
75- // go func() {
76- // f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
77- // f.Close()
78- // }()
73+ // d := newTestDialer() // Create a new stubbed dialer
74+ // cfg := Config{
75+ // Async: false,
76+ // // ...
77+ // }
78+ // go func() {
79+ // f := newWithDialer(cfg, d) // Create a fluent logger using the stubbed dialer
80+ // f.Close()
81+ // }()
7982//
80- // d.waitForNextDialing(false, false) // 1st dialing attempt fails
81- // d.waitForNextDialing(false, false) // 2nd attempt fails too
82- // d.waitForNextDialing(false, false) // 3rd attempt fails too
83- // d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
83+ // d.waitForNextDialing(false, false) // 1st dialing attempt fails
84+ // d.waitForNextDialing(false, false) // 2nd attempt fails too
85+ // d.waitForNextDialing(false, false) // 3rd attempt fails too
86+ // d.waitForNextDialing(true, false) // Finally the 4th attempt succeeds
8487//
8588// Moreover, waitForNextDialing() returns a *Conn which extends net.Conn to provide testing
8689// facilities. For instance, you can call waitForNextWrite() on these connections, to
@@ -91,24 +94,24 @@ func newTestDialer() *testDialer {
9194//
9295// Here's a full example:
9396//
94- // d := newTestDialer()
95- // cfg := Config{Async: true}
97+ // d := newTestDialer()
98+ // cfg := Config{Async: true}
9699//
97- // f := newWithDialer(cfg, d)
98- // f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
100+ // f := newWithDialer(cfg, d)
101+ // f.EncodeAndPostData("tag_name", time.Unix(1482493046, 0), map[string]string{"foo": "bar"})
99102//
100- // conn := d.waitForNextDialing(true, false) // Accept the dialing
101- // conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message
103+ // conn := d.waitForNextDialing(true, false) // Accept the dialing
104+ // conn.waitForNextWrite(false, "") // Discard the 1st attempt to write the message
102105//
103- // conn := d.waitForNextDialing(true, false)
104- // assertReceived(t, // t is *testing.T
105- // conn.waitForNextWrite(true, ""),
106- // "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
106+ // conn := d.waitForNextDialing(true, false)
107+ // assertReceived(t, // t is *testing.T
108+ // conn.waitForNextWrite(true, ""),
109+ // "[\"tag_name\",1482493046,{\"foo\":\"bar\"},{}]")
107110//
108- // f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"})
109- // assertReceived(t, // t is *testing.T
110- // conn.waitForNextWrite(true, ""),
111- // "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]")
111+ // f.EncodeAndPostData("something_else", time.Unix(1482493050, 0), map[string]string{"bar": "baz"})
112+ // assertReceived(t, // t is *testing.T
113+ // conn.waitForNextWrite(true, ""),
114+ // "[\"something_else\",1482493050,{\"bar\":\"baz\"},{}]")
112115//
113116// In this example, the 1st connection dialing succeeds but the 1st attempt to write the
114117// message is discarded. As the logger discards the connection whenever a message
@@ -472,7 +475,10 @@ func TestPostWithTime(t *testing.T) {
472475 _ = f .PostWithTime ("tag_name" , time .Unix (1482493046 , 0 ), map [string ]string {"foo" : "bar" })
473476 _ = f .PostWithTime ("tag_name" , time .Unix (1482493050 , 0 ), map [string ]string {"fluentd" : "is awesome" })
474477 _ = f .PostWithTime ("tag_name" , time .Unix (1634263200 , 0 ),
475- struct {Welcome string `msg:"welcome"` ; cannot string }{"to use" , "see me" })
478+ struct {
479+ Welcome string `msg:"welcome"`
480+ cannot string
481+ }{"to use" , "see me" })
476482 }()
477483
478484 conn := d .waitForNextDialing (true , false )
@@ -755,16 +761,62 @@ func TestSyncWriteAfterCloseFails(t *testing.T) {
755761 err = f .PostWithTime ("tag_name" , time .Unix (1482493050 , 0 ), map [string ]string {"foo" : "buzz" })
756762
757763 // The event submission must fail,
758- assert .NotEqual (t , err , nil );
764+ assert .NotEqual (t , err , nil )
759765
760766 // and also must keep Fluentd closed.
761- assert .NotEqual (t , f .closed , false );
767+ assert .NotEqual (t , f .closed , false )
762768 }()
763769
764770 conn := d .waitForNextDialing (true , false )
765771 conn .waitForNextWrite (true , "" )
766772}
767773
774+ func TestPendingChannelThreadSafety (t * testing.T ) {
775+ f , err := New (Config {
776+ Async : true ,
777+ ForceStopAsyncSend : true ,
778+ })
779+ if err != nil {
780+ t .Fatalf ("Failed to create logger: %v" , err )
781+ }
782+
783+ // Start multiple goroutines posting messages
784+ const numGoroutines = 10
785+ const messagesPerGoroutine = 100
786+ var wg sync.WaitGroup
787+ wg .Add (numGoroutines )
788+
789+ for i := 0 ; i < numGoroutines ; i ++ {
790+ go func (id int ) {
791+ defer wg .Done ()
792+ for j := 0 ; j < messagesPerGoroutine ; j ++ {
793+ // Post a message
794+ err := f .Post ("tag" , map [string ]string {
795+ "goroutine" : strconv .Itoa (id ),
796+ "message" : strconv .Itoa (j ),
797+ })
798+
799+ // If the logger is closed, we expect an error
800+ if err != nil && ! strings .Contains (err .Error (), "already closed" ) {
801+ t .Errorf ("Unexpected error: %v" , err )
802+ }
803+
804+ // Add a small delay to increase the chance of race conditions
805+ time .Sleep (time .Millisecond )
806+ }
807+ }(i )
808+ }
809+
810+ // Wait a bit to let some messages be posted
811+ time .Sleep (10 * time .Millisecond )
812+
813+ // Close the logger while goroutines are still posting
814+ f .Close ()
815+
816+ // Wait for all goroutines to finish
817+ wg .Wait ()
818+ }
819+
768820func Benchmark_PostWithShortMessage (b * testing.B ) {
769821 b .StopTimer ()
770822 d := newTestDialer ()
0 commit comments