Skip to content

Commit 6e0342b

Browse files
committed
refactor(os/gevent): 优化 SeqEventBus 中的信号处理逻辑
- 调整了 handlerProcessor 中信号处理的顺序,确保 wg.Done() 在所有处理完成后执行 - 优化了 topicProcessor 的初始化逻辑,将 asyncProcess 的启动移至初始化之后
1 parent f7055c4 commit 6e0342b

File tree

2 files changed

+22
-10
lines changed

2 files changed

+22
-10
lines changed

os/gevent/gevent.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ package gevent
99

1010
// DefaultEventBus is the default event bus instance.
1111
var DefaultEventBus = NewSeqEventBus(SeqEventBusOption{
12-
QueueSize: 1000,
12+
QueueSize: 100,
1313
WorkerSize: 10,
1414
})

os/gevent/gevent_seq_event_bus.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ type handlerProcessor struct {
4040

4141
// SeqEventBusOption defines configuration options for SeqEventBus
4242
type SeqEventBusOption struct {
43-
QueueSize int // Size of the event queue channel
44-
WorkerSize int // Number of workers for parallel execution
43+
QueueSize int // Size of the event queue channel
44+
WorkerSize int // Number of workers for parallel execution
45+
CloneEvent bool // Whether to clone event for each handler execution
4546
}
4647

4748
// SeqEventBus is a sequential event bus implementation
@@ -137,6 +138,15 @@ func (tp *topicProcessor) close() {
137138
})
138139
}
139140

141+
// cloneEvent clones the event if necessary
142+
func (tp *topicProcessor) cloneEvent(event Event) Event {
143+
cloneEvent := event
144+
if tp.eventBus.option.CloneEvent {
145+
cloneEvent = event.Clone()
146+
}
147+
return cloneEvent
148+
}
149+
140150
// asyncProcess processes events asynchronously from the channel
141151
func (tp *topicProcessor) asyncProcess() {
142152
tp.eventBus.wg.Add(1)
@@ -151,9 +161,9 @@ func (tp *topicProcessor) asyncProcess() {
151161
}
152162
handlerProcessors := tp.filterHandlerProcessors()
153163
if event.GetExecModel() == Seq {
154-
// Sequential execution
155164
for _, processor := range handlerProcessors {
156-
err := tp.execute(event, processor)
165+
cloneEvent := tp.cloneEvent(event)
166+
err := tp.execute(cloneEvent, processor)
157167
if err != nil {
158168
if event.GetErrorModel() == Stop {
159169
return
@@ -171,17 +181,18 @@ func (tp *topicProcessor) asyncProcess() {
171181

172182
for _, processor := range handlerProcessors {
173183
wg.Add(1)
174-
go func(e Event, processor *handlerProcessor) {
184+
cloneEvent := tp.cloneEvent(event)
185+
go func(e Event, p *handlerProcessor) {
175186
semaphore <- struct{}{}
176187
defer func() { <-semaphore }()
177188
defer wg.Done()
178-
err := tp.execute(e, processor)
189+
err := tp.execute(e, p)
179190
if err != nil {
180191
if event.GetErrorModel() == Stop {
181192
return
182193
}
183194
}
184-
}(event, processor)
195+
}(cloneEvent, processor)
185196
}
186197
wg.Wait()
187198
}
@@ -192,8 +203,9 @@ func (tp *topicProcessor) asyncProcess() {
192203
// NewSeqEventBus creates a new sequential event bus with optional configuration
193204
func NewSeqEventBus(options ...SeqEventBusOption) *SeqEventBus {
194205
option := SeqEventBusOption{
195-
QueueSize: 100, // Default queue size
196-
WorkerSize: 10, // Default worker size for parallel execution
206+
QueueSize: 100, // Default queue size
207+
WorkerSize: 10, // Default worker size for parallel execution
208+
CloneEvent: false, // Default behavior is not to clone events
197209
}
198210
if len(options) > 0 {
199211
option = options[0]

0 commit comments

Comments
 (0)