diff --git a/go.mod b/go.mod index 95100e2..e21b1a3 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,23 @@ module github.com/superproj/design-pattern -go 1.22.2 +go 1.23 -require github.com/stretchr/testify v1.9.0 +toolchain go1.24.2 + +require ( + github.com/stretchr/testify v1.9.0 + go.uber.org/zap v1.27.0 + google.golang.org/grpc v1.72.0 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + golang.org/x/net v0.35.0 // indirect + golang.org/x/sys v0.30.0 // indirect + golang.org/x/text v0.22.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 60ce688..277312a 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,49 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= +go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= +go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= +go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= +go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= +golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= +google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= +google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/messaging/fanout/fan_out_complex.go b/messaging/fanout/fan_out_complex.go index aeab437..1d6b839 100644 --- a/messaging/fanout/fan_out_complex.go +++ b/messaging/fanout/fan_out_complex.go @@ -5,13 +5,11 @@ import ( "sync" "sync/atomic" - "go.uber.org/zap" //https://github.com/uber-go/zap - //Blazing fast, structured, leveled logging in Go. + "go.uber.org/zap" // https://github.com/uber-go/zap + // Blazing fast, structured, leveled logging in Go. ) -var ( - log, _ = zap.NewDevelopment() -) +var log, _ = zap.NewDevelopment() // Settings of pipeline const ( @@ -35,7 +33,7 @@ type worker struct { chain chan interface{} debug bool idle uint32 - dispatcher IDispatcher //hold a dispacher,需要自己实现一个dispatcher 工厂 + dispatcher IDispatcher // hold a dispacher,需要自己实现一个dispatcher 工厂 } // Pipeline of workers @@ -52,9 +50,6 @@ func (p *Pipeline) Start(ctx context.Context) { go func(pipe *Pipeline) { for { expectationWorkers := len(pipe.chain) % MaxWorkers - if expectationWorkers >= MaxWorkers { - expectationWorkers = 0 - } select { case <-ctx.Done(): return @@ -75,7 +70,6 @@ func (p *Pipeline) Dispatch(msg interface{}) { // NewPipeline create a Workflow with a dispacher builder and some workers func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { - ch := make(chan interface{}, MasterQueueSize) wk := make(map[int]*worker) @@ -86,7 +80,7 @@ func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { mutex: new(sync.Mutex), debug: debug, idle: idle, - dispatcher: d(), //build real dispatcher + dispatcher: d(), // build real dispatcher } } return &Pipeline{workers: wk, chain: ch} @@ -94,51 +88,53 @@ func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline { func (c *worker) stream(val interface{}) { c.chain <- val - if !c.running { - c.mutex.Lock() - c.running = true - ctx, cancel := context.WithCancel(context.Background()) - defer func(w *worker, cancel context.CancelFunc) { - if w.debug { - log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) - } - - if c.dispatcher != nil { - err := c.dispatcher.After() - if err != nil { - log.Error("can not finish track issue", zap.Error(err)) - } - } + if c.running { + return + } - cancel() - w.mutex.Unlock() - w.running = false - }(c, cancel) + c.mutex.Lock() + c.running = true + ctx, cancel := context.WithCancel(context.Background()) + defer func(w *worker, cancel context.CancelFunc) { + if w.debug { + log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle)) + } if c.dispatcher != nil { - err := c.dispatcher.Before(ctx) + err := c.dispatcher.After() if err != nil { - log.Error("can not start worker", zap.Error(err)) + log.Error("can not finish track issue", zap.Error(err)) } } - var idle uint32 = 0 - for { - select { - case msg := <-c.chain: - atomic.StoreUint32(&idle, 0) - if msg != nil && c.dispatcher != nil { - err := c.dispatcher.Process(msg) - if err != nil { - log.Error("can not process message", zap.Any("msg", &msg), zap.Error(err)) - } + cancel() + w.mutex.Unlock() + w.running = false + }(c, cancel) + + if c.dispatcher != nil { + err := c.dispatcher.Before(ctx) + if err != nil { + log.Error("can not start worker", zap.Error(err)) + } + } + + var idle uint32 = 0 + for { + select { + case msg := <-c.chain: + atomic.StoreUint32(&idle, 0) + if msg != nil && c.dispatcher != nil { + err := c.dispatcher.Process(msg) + if err != nil { + log.Error("can not process message", zap.Any("msg", &msg), zap.Error(err)) } - default: - atomic.AddUint32(&idle, 1) - if i := atomic.LoadUint32(&idle); i > 0 { - if i > c.idle { - return - } + } + default: + atomic.AddUint32(&idle, 1) + if i := atomic.LoadUint32(&idle); i > 0 { + if i > c.idle { + return } } } diff --git a/messaging/fanout/fan_out_complex_test.go b/messaging/fanout/fan_out_complex_test.go index 98488d1..3ed68dc 100644 --- a/messaging/fanout/fan_out_complex_test.go +++ b/messaging/fanout/fan_out_complex_test.go @@ -21,7 +21,6 @@ type messageContent struct { } func TestComplexStreamingFanOut(t *testing.T) { - builder := func() IDispatcher { return &taggingDispatcher{Address: "127.0.0.2"} } @@ -30,11 +29,26 @@ func TestComplexStreamingFanOut(t *testing.T) { pipeline: NewPipeline(builder, 2, true), } - tagging.pipeline.Dispatch(messageContent{"all,please stay home", 1000}) + exit := make(chan struct{}) + + // sender + go func() { + for { + select { + case <-exit: + return + default: + time.Sleep(time.Second) + tagging.pipeline.Dispatch(messageContent{"all, please stay home", 1000}) + } + } + }() tagging.pipeline.Start(context.Background()) - //模拟处理过程,让工作者线程完成工作 + // 模拟处理过程,让工作者线程完成工作 + time.Sleep(time.Second * 5) + close(exit) time.Sleep(time.Second * 2) t.Log("Done") } @@ -45,7 +59,6 @@ type Tagging struct { } func (d *taggingDispatcher) Before(ctx context.Context) error { - fmt.Println("i'm doing somthing before processing") conn, err := grpc.Dial(d.Address, grpc.WithInsecure()) @@ -69,14 +82,13 @@ func (d *taggingDispatcher) After() error { // if e != nil { // log.Error("close connection error", field.Error(e)) // } - //return err + // return err fmt.Println("i'm doing somthing After processing") return nil } func (d *taggingDispatcher) Process(msg interface{}) error { - content := msg.(messageContent) - fmt.Println("i'm doing processing,with conten", content) + fmt.Println("i'm doing processing, with content", content) return nil }