Skip to content

Commit 14d9a08

Browse files
committed
feat:优化stream停止
1 parent 6f77dc5 commit 14d9a08

File tree

11 files changed

+863
-728
lines changed

11 files changed

+863
-728
lines changed

stream/handler_data.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,16 @@ func (s *Stream) safeGetDataChan() chan map[string]interface{} {
4242

4343
// safeSendToDataChan safely sends data to dataChan
4444
func (s *Stream) safeSendToDataChan(data map[string]interface{}) bool {
45+
// Check if stream is stopped before attempting to send
46+
if atomic.LoadInt32(&s.stopped) == 1 {
47+
return false
48+
}
49+
4550
dataChan := s.safeGetDataChan()
51+
if dataChan == nil {
52+
return false
53+
}
54+
4655
select {
4756
case dataChan <- data:
4857
return true

stream/processor_data.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,19 @@ func (dp *DataProcessor) startWindowProcessing() {
305305
}
306306
}()
307307

308-
for batch := range dp.stream.Window.OutputChan() {
309-
dp.processWindowBatch(batch)
308+
outputChan := dp.stream.Window.OutputChan()
309+
for {
310+
select {
311+
case batch, ok := <-outputChan:
312+
if !ok {
313+
// Channel closed, exit
314+
return
315+
}
316+
dp.processWindowBatch(batch)
317+
case <-dp.stream.done:
318+
// Stream stopped, exit
319+
return
320+
}
310321
}
311322
}()
312323
}

stream/strategy.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,34 @@ func NewBlockingStrategy() *BlockingStrategy {
6666

6767
// ProcessData implements blocking mode data processing
6868
func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
69+
// Check if stream is stopped
70+
if atomic.LoadInt32(&bs.stream.stopped) == 1 {
71+
return
72+
}
73+
6974
if bs.stream.blockingTimeout <= 0 {
7075
// No timeout limit, block permanently until success
7176
dataChan := bs.stream.safeGetDataChan()
72-
dataChan <- data
73-
return
77+
if dataChan == nil {
78+
return
79+
}
80+
select {
81+
case dataChan <- data:
82+
return
83+
case <-bs.stream.done:
84+
return
85+
}
7486
}
7587

7688
// Blocking with timeout
7789
timer := time.NewTimer(bs.stream.blockingTimeout)
7890
defer timer.Stop()
7991

8092
dataChan := bs.stream.safeGetDataChan()
93+
if dataChan == nil {
94+
return
95+
}
96+
8197
select {
8298
case dataChan <- data:
8399
// Successfully added data
@@ -87,7 +103,17 @@ func (bs *BlockingStrategy) ProcessData(data map[string]interface{}) {
87103
logger.Error("Data addition timeout, but continue waiting to avoid data loss")
88104
// Continue blocking indefinitely, re-get current channel reference
89105
finalDataChan := bs.stream.safeGetDataChan()
90-
finalDataChan <- data
106+
if finalDataChan == nil {
107+
return
108+
}
109+
select {
110+
case finalDataChan <- data:
111+
return
112+
case <-bs.stream.done:
113+
return
114+
}
115+
case <-bs.stream.done:
116+
return
91117
}
92118
}
93119

@@ -135,7 +161,15 @@ func (es *ExpansionStrategy) ProcessData(data map[string]interface{}) {
135161

136162
// If still full after expansion, block and wait
137163
dataChan := es.stream.safeGetDataChan()
138-
dataChan <- data
164+
if dataChan == nil {
165+
return
166+
}
167+
select {
168+
case dataChan <- data:
169+
return
170+
case <-es.stream.done:
171+
return
172+
}
139173
}
140174

141175
// GetStrategyName gets strategy name

stream/stream.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ func (s *Stream) Stop() {
220220

221221
close(s.done)
222222

223+
// Stop window operations first to prevent new window triggers
224+
if s.Window != nil {
225+
s.Window.Stop()
226+
}
227+
228+
// Close dataChan to signal DataProcessor to exit
229+
s.dataChanMux.Lock()
230+
if s.dataChan != nil {
231+
close(s.dataChan)
232+
s.dataChan = nil // Set to nil to prevent sending to closed channel
233+
}
234+
s.dataChanMux.Unlock()
235+
223236
// Stop and clean up data processing strategy resources
224237
if s.dataStrategy != nil {
225238
if err := s.dataStrategy.Stop(); err != nil {

0 commit comments

Comments
 (0)