Skip to content

Commit 1390e8e

Browse files
committed
feat(os/gevent): refactor event management and add new features
- Refactored the core logic of event management to improve code readability and maintainability - Added support for event handling priorities, enabling flexible control over the event processing order - Introduced a recovery function mechanism for event handling to manage exceptions during processing - Optimized event publishing and subscription interfaces to simplify usage - Added unit test cases to improve test coverage
1 parent 2f06f5c commit 1390e8e

File tree

2 files changed

+328
-176
lines changed

2 files changed

+328
-176
lines changed

os/gevent/gevent.go

Lines changed: 90 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,31 @@ import (
1212
"sync"
1313

1414
"github.com/gogf/gf/v2/container/gmap"
15+
"github.com/gogf/gf/v2/errors/gerror"
1516
"github.com/gogf/gf/v2/util/gutil"
1617
)
1718

1819
// Handler is a function type for handling events.
1920
// It receives the topic name and message data as parameters.
2021
type Handler func(topic string, message any)
2122

23+
type RecoverFunc func(topic string, message any)
24+
2225
// handlerInfo stores information about an event handler.
2326
type handlerInfo struct {
24-
id int64 // Unique identifier for the handler
25-
topic string // Topic name
26-
handler Handler // Handler function
27-
priority Priority // Handler priority
27+
id int64 // Unique identifier for the handler
28+
topic string // Topic name
29+
handler Handler // Handler function
30+
recoverFunc RecoverFunc // Recover function
31+
priority Priority // Handler priority
2832
}
2933

3034
// Event is the main struct for event management.
3135
type Event struct {
3236
mu sync.RWMutex // Read-write mutex for concurrent safety
3337
handlers *gmap.StrAnyMap // Map to store handlers by topic
3438
counter int64 // Counter for generating unique handler IDs
39+
closed bool // Indicates whether the event manager is closed
3540
}
3641

3742
// Subscriber represents a subscription to an event topic.
@@ -42,6 +47,8 @@ type Subscriber struct {
4247
once sync.Once // Ensures unsubscribe is called only once
4348
}
4449

50+
var EventClosedError = gerror.New("event manager is closed")
51+
4552
// Unsubscribe removes the subscription from the event manager.
4653
// It can be called multiple times safely, but only the first call will take effect.
4754
func (s *Subscriber) Unsubscribe() {
@@ -58,12 +65,12 @@ func New() *Event {
5865
}
5966
}
6067

61-
// Subscribe registers a handler function for a specific topic.
62-
// Handlers can be assigned a priority level, with higher priority handlers executed first.
63-
// Returns a Subscriber that can be used to unsubscribe the handler.
64-
func (e *Event) Subscribe(topic string, handler Handler, priority ...Priority) *Subscriber {
68+
func (e *Event) SubscribeWithRecover(topic string, handler Handler, recoverFunc RecoverFunc, priority ...Priority) (*Subscriber, error) {
6569
e.mu.Lock()
6670
defer e.mu.Unlock()
71+
if e.closed {
72+
return nil, EventClosedError
73+
}
6774
e.counter++
6875
level := PriorityNormal
6976
if len(priority) > 0 {
@@ -77,10 +84,11 @@ func (e *Event) Subscribe(topic string, handler Handler, priority ...Priority) *
7784
return make([]*handlerInfo, 0, 8)
7885
})
7986
h := &handlerInfo{
80-
id: e.counter,
81-
topic: topic,
82-
handler: handler,
83-
priority: level,
87+
id: e.counter,
88+
topic: topic,
89+
handler: handler,
90+
priority: level,
91+
recoverFunc: recoverFunc,
8492
}
8593
handlers := infos.([]*handlerInfo)
8694
handlers = append(handlers, h)
@@ -90,7 +98,14 @@ func (e *Event) Subscribe(topic string, handler Handler, priority ...Priority) *
9098
topic: topic,
9199
event: e,
92100
once: sync.Once{},
93-
}
101+
}, nil
102+
}
103+
104+
// Subscribe registers a handler function for a specific topic.
105+
// Handlers can be assigned a priority level, with higher priority handlers executed first.
106+
// Returns a Subscriber that can be used to unsubscribe the handler.
107+
func (e *Event) Subscribe(topic string, handler Handler, priority ...Priority) (*Subscriber, error) {
108+
return e.SubscribeWithRecover(topic, handler, nil, priority...)
94109
}
95110

96111
// UnSubscribe removes a handler from a topic by its ID.
@@ -120,48 +135,74 @@ func (e *Event) UnSubscribe(topic string, id int64) {
120135
e.handlers.Remove(topic)
121136
}
122137
}
138+
}
139+
140+
// executeHandlerWithRecover executes a handler function with a recover function for a topic.
141+
func (e *Event) executeHandlerWithRecover(info *handlerInfo, topic string, message any, async bool) {
142+
handler := info.handler
143+
recoverFunc := info.recoverFunc
144+
145+
wrapper := func() {
146+
defer func() {
147+
if err := recover(); err != nil {
148+
recoverFunc(topic, message)
149+
}
150+
}()
151+
handler(topic, message)
152+
}
123153

154+
if async {
155+
go wrapper()
156+
} else {
157+
wrapper()
158+
}
159+
}
160+
161+
// executeHandler executes a handler function for a topic.
162+
func (e *Event) executeHandler(handler Handler, topic string, message any, async bool) {
163+
if async {
164+
go handler(topic, message)
165+
} else {
166+
handler(topic, message)
167+
}
124168
}
125169

126170
// forEachHandler iterates through all handlers for a topic and executes them.
127171
// If async is true, handlers are executed in goroutines.
128-
func (e *Event) forEachHandler(topic string, message any, async bool) {
172+
func (e *Event) forEachHandler(topic string, message any, async bool) error {
173+
if e.closed {
174+
return EventClosedError
175+
}
129176
if m := e.handlers.Get(topic); m != nil {
130177
m.(*gmap.TreeMap).IteratorDesc(func(key, value any) bool {
131178
infos := value.([]*handlerInfo)
132179
for _, info := range infos {
133-
handler := info.handler
134-
if async {
135-
go handler(topic, message)
180+
if info.recoverFunc != nil {
181+
e.executeHandlerWithRecover(info, topic, message, async)
136182
} else {
137-
handler(topic, message)
183+
e.executeHandler(info.handler, topic, message, async)
138184
}
139185
}
140186
return true
141187
})
142188
}
189+
return nil
143190
}
144191

145192
// Publish sends a message to all handlers subscribed to a topic.
146193
// Handlers are executed asynchronously in separate goroutines.
147-
func (e *Event) Publish(topic string, message any) {
194+
func (e *Event) Publish(topic string, message any) error {
148195
e.mu.RLock()
149196
defer e.mu.RUnlock()
150-
e.forEachHandler(topic, message, true)
197+
return e.forEachHandler(topic, message, true)
151198
}
152199

153200
// PublishSync sends a message to all handlers subscribed to a topic.
154201
// Handlers are executed synchronously in the current goroutine.
155-
func (e *Event) PublishSync(topic string, message any) {
202+
func (e *Event) PublishSync(topic string, message any) error {
156203
e.mu.RLock()
157204
defer e.mu.RUnlock()
158-
e.forEachHandler(topic, message, false)
159-
}
160-
161-
// SubscribeFunc is a convenience method that allows subscribing with a plain function.
162-
// It converts the function to a Handler type and subscribes it.
163-
func (e *Event) SubscribeFunc(topic string, f func(topic string, message any), priority ...Priority) *Subscriber {
164-
return e.Subscribe(topic, f, priority...)
205+
return e.forEachHandler(topic, message, false)
165206
}
166207

167208
// Topics returns a list of all topics that currently have subscribers.
@@ -182,3 +223,24 @@ func (e *Event) SubscribersCount(topic string) int {
182223
}
183224
return count
184225
}
226+
227+
// Clear removes all handlers and subscribers from the event manager.
228+
func (e *Event) Clear() {
229+
e.mu.Lock()
230+
defer e.mu.Unlock()
231+
e.clear()
232+
}
233+
234+
// clear removes all handlers and subscribers from the event manager.
235+
func (e *Event) clear() {
236+
e.handlers.Clear()
237+
e.counter = 0
238+
}
239+
240+
// Close closes the event manager, preventing new subscribers from being added.
241+
func (e *Event) Close() {
242+
e.mu.Lock()
243+
defer e.mu.Unlock()
244+
e.closed = true
245+
e.clear()
246+
}

0 commit comments

Comments
 (0)