diff --git a/eventer.go b/eventer.go index b8612c89e..e066dd86d 100644 --- a/eventer.go +++ b/eventer.go @@ -1,6 +1,15 @@ package gobot -import "sync" +import ( + "sync" +) + +const ( + defaultEventChanBufferSize = 10 + maxEventChanBufferSize = 10240 + maxChanWorkerCount = 128 + EventCrash = "crash" +) type eventChannel chan *Event @@ -14,12 +23,16 @@ type eventer struct { // map of out channels used by subscribers outs map[eventChannel]eventChannel + // the in/out channel length + bufferSize int + + // controls the maximum number of concurrent executions when eventer is Published to + workerCount int + // mutex to protect the eventChannel map eventsMutex sync.Mutex } -const eventChanBufferSize = 10 - // Eventer is the interface which describes how a Driver or Adaptor // handles events. type Eventer interface { @@ -52,25 +65,57 @@ type Eventer interface { Once(name string, f func(s interface{})) (err error) } +// EventerOptionFn allows the configurable parameters of an eventer to be modified +type EventerOptionFn func(*eventer) + +// WithBufferSize allows an eventer's buffer size to be configured +func WithBufferSize(bufferSize int) EventerOptionFn { + return func(e *eventer) { + if bufferSize >= maxEventChanBufferSize { + e.bufferSize = maxEventChanBufferSize + } else if bufferSize <= 0 { + e.bufferSize = defaultEventChanBufferSize + } else { + e.bufferSize = bufferSize + } + } +} + +// WithWorkerCount allows an eventer's workerCount to be configured +func WithWorkerCount(workerCount int) EventerOptionFn { + return func(e *eventer) { + if workerCount >= maxChanWorkerCount { + e.workerCount = maxChanWorkerCount + } else if workerCount <= 0 { + e.workerCount = 1 + } else { + e.workerCount = workerCount + } + } +} + // NewEventer returns a new Eventer. -func NewEventer() Eventer { +func NewEventer(fns ...EventerOptionFn) Eventer { evtr := &eventer{ - eventnames: make(map[string]string), - in: make(eventChannel, eventChanBufferSize), - outs: make(map[eventChannel]eventChannel), + eventnames: make(map[string]string), + outs: make(map[eventChannel]eventChannel), + bufferSize: defaultEventChanBufferSize, + workerCount: 1, + } + + for _, fn := range fns { + fn(evtr) } // goroutine to cascade "in" events to all "out" event channels + evtr.in = make(eventChannel, evtr.bufferSize) go func() { - for { - select { - case evt := <-evtr.in: - evtr.eventsMutex.Lock() - for _, out := range evtr.outs { - out <- evt - } - evtr.eventsMutex.Unlock() + for evt := range evtr.in { + evtr.eventsMutex.Lock() + for _, out := range evtr.outs { + out <- evt } + evtr.eventsMutex.Unlock() } }() @@ -108,7 +153,7 @@ func (e *eventer) Publish(name string, data interface{}) { func (e *eventer) Subscribe() eventChannel { e.eventsMutex.Lock() defer e.eventsMutex.Unlock() - out := make(eventChannel, eventChanBufferSize) + out := make(eventChannel, e.bufferSize) e.outs[out] = out return out } @@ -123,16 +168,22 @@ func (e *eventer) Unsubscribe(events eventChannel) { // On executes the event handler f when e is Published to. func (e *eventer) On(n string, f func(s interface{})) (err error) { out := e.Subscribe() - go func() { - for { - select { - case evt := <-out: + for i := 0; i < e.workerCount; i++ { + go func() { + // Add panic handling for goroutines to prevent panics caused by the callback function `f` + defer func() { + if r := recover(); r != nil { + e.Publish(EventCrash, r) + } + }() + + for evt := range out { if evt.Name == n { f(evt.Data) } } - } - }() + }() + } return }