Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
module github.com/superproj/design-pattern

go 1.22.2
go 1.23

require github.com/stretchr/testify v1.9.0
toolchain go1.24.2

require (
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.72.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
golang.org/x/text v0.22.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
40 changes: 40 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,49 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A=
go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU=
go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk=
go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
92 changes: 44 additions & 48 deletions messaging/fanout/fan_out_complex.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import (
"sync"
"sync/atomic"

"go.uber.org/zap" //https://github.com/uber-go/zap
//Blazing fast, structured, leveled logging in Go.
"go.uber.org/zap" // https://github.com/uber-go/zap
// Blazing fast, structured, leveled logging in Go.
)

var (
log, _ = zap.NewDevelopment()
)
var log, _ = zap.NewDevelopment()

// Settings of pipeline
const (
Expand All @@ -35,7 +33,7 @@ type worker struct {
chain chan interface{}
debug bool
idle uint32
dispatcher IDispatcher //hold a dispacher,需要自己实现一个dispatcher 工厂
dispatcher IDispatcher // hold a dispacher,需要自己实现一个dispatcher 工厂
}

// Pipeline of workers
Expand All @@ -52,9 +50,6 @@ func (p *Pipeline) Start(ctx context.Context) {
go func(pipe *Pipeline) {
for {
expectationWorkers := len(pipe.chain) % MaxWorkers
if expectationWorkers >= MaxWorkers {
expectationWorkers = 0
}
select {
case <-ctx.Done():
return
Expand All @@ -75,7 +70,6 @@ func (p *Pipeline) Dispatch(msg interface{}) {

// NewPipeline create a Workflow with a dispacher builder and some workers
func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline {

ch := make(chan interface{}, MasterQueueSize)

wk := make(map[int]*worker)
Expand All @@ -86,59 +80,61 @@ func NewPipeline(d DispatcherBuilder, idle uint32, debug bool) *Pipeline {
mutex: new(sync.Mutex),
debug: debug,
idle: idle,
dispatcher: d(), //build real dispatcher
dispatcher: d(), // build real dispatcher
}
}
return &Pipeline{workers: wk, chain: ch}
}

func (c *worker) stream(val interface{}) {
c.chain <- val
if !c.running {
c.mutex.Lock()
c.running = true
ctx, cancel := context.WithCancel(context.Background())
defer func(w *worker, cancel context.CancelFunc) {
if w.debug {
log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle))
}

if c.dispatcher != nil {
err := c.dispatcher.After()
if err != nil {
log.Error("can not finish track issue", zap.Error(err))
}
}
if c.running {
return
}

cancel()
w.mutex.Unlock()
w.running = false
}(c, cancel)
c.mutex.Lock()
c.running = true
ctx, cancel := context.WithCancel(context.Background())
defer func(w *worker, cancel context.CancelFunc) {
if w.debug {
log.Info("Worker leaving", zap.Any("index", w.index), zap.Any("idle", w.idle))
}

if c.dispatcher != nil {
err := c.dispatcher.Before(ctx)
err := c.dispatcher.After()
if err != nil {
log.Error("can not start worker", zap.Error(err))
log.Error("can not finish track issue", zap.Error(err))
}
}

var idle uint32 = 0
for {
select {
case msg := <-c.chain:
atomic.StoreUint32(&idle, 0)
if msg != nil && c.dispatcher != nil {
err := c.dispatcher.Process(msg)
if err != nil {
log.Error("can not process message", zap.Any("msg", &msg), zap.Error(err))
}
cancel()
w.mutex.Unlock()
w.running = false
}(c, cancel)

if c.dispatcher != nil {
err := c.dispatcher.Before(ctx)
if err != nil {
log.Error("can not start worker", zap.Error(err))
}
}

var idle uint32 = 0
for {
select {
case msg := <-c.chain:
atomic.StoreUint32(&idle, 0)
if msg != nil && c.dispatcher != nil {
err := c.dispatcher.Process(msg)
if err != nil {
log.Error("can not process message", zap.Any("msg", &msg), zap.Error(err))
}
default:
atomic.AddUint32(&idle, 1)
if i := atomic.LoadUint32(&idle); i > 0 {
if i > c.idle {
return
}
}
default:
atomic.AddUint32(&idle, 1)
if i := atomic.LoadUint32(&idle); i > 0 {
if i > c.idle {
return
}
}
}
Expand Down
26 changes: 19 additions & 7 deletions messaging/fanout/fan_out_complex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type messageContent struct {
}

func TestComplexStreamingFanOut(t *testing.T) {

builder := func() IDispatcher {
return &taggingDispatcher{Address: "127.0.0.2"}
}
Expand All @@ -30,11 +29,26 @@ func TestComplexStreamingFanOut(t *testing.T) {
pipeline: NewPipeline(builder, 2, true),
}

tagging.pipeline.Dispatch(messageContent{"all,please stay home", 1000})
exit := make(chan struct{})

// sender
go func() {
for {
select {
case <-exit:
return
default:
time.Sleep(time.Second)
tagging.pipeline.Dispatch(messageContent{"all, please stay home", 1000})
}
}
}()

tagging.pipeline.Start(context.Background())

//模拟处理过程,让工作者线程完成工作
// 模拟处理过程,让工作者线程完成工作
time.Sleep(time.Second * 5)
close(exit)
time.Sleep(time.Second * 2)
t.Log("Done")
}
Expand All @@ -45,7 +59,6 @@ type Tagging struct {
}

func (d *taggingDispatcher) Before(ctx context.Context) error {

fmt.Println("i'm doing somthing before processing")

conn, err := grpc.Dial(d.Address, grpc.WithInsecure())
Expand All @@ -69,14 +82,13 @@ func (d *taggingDispatcher) After() error {
// if e != nil {
// log.Error("close connection error", field.Error(e))
// }
//return err
// return err
fmt.Println("i'm doing somthing After processing")
return nil
}

func (d *taggingDispatcher) Process(msg interface{}) error {

content := msg.(messageContent)
fmt.Println("i'm doing processing,with conten", content)
fmt.Println("i'm doing processing, with content", content)
return nil
}