Skip to content

Commit e07e0a9

Browse files
authored
Merge pull request #18 from rulego/dev
Dev
2 parents fd0ad5c + e8b78fc commit e07e0a9

File tree

4 files changed

+38
-20
lines changed

4 files changed

+38
-20
lines changed

aggregator/group_aggregator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@ func NewGroupAggregator(groupFields []string, aggregationFields []AggregationFie
5050
aggregators := make(map[string]AggregatorFunction)
5151

5252
// 为每个聚合字段创建聚合器
53-
for _, field := range aggregationFields {
54-
if field.OutputAlias == "" {
53+
for i := range aggregationFields {
54+
if aggregationFields[i].OutputAlias == "" {
5555
// 如果没有指定别名,使用输入字段名
56-
field.OutputAlias = field.InputField
56+
aggregationFields[i].OutputAlias = aggregationFields[i].InputField
5757
}
58-
aggregators[field.OutputAlias] = CreateBuiltinAggregator(field.AggregateType)
58+
aggregators[aggregationFields[i].OutputAlias] = CreateBuiltinAggregator(aggregationFields[i].AggregateType)
5959
}
6060

6161
return &GroupAggregator{

functions/expr_bridge.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"strconv"
66
"strings"
7+
"sync"
78

89
"github.com/expr-lang/expr"
910
"github.com/expr-lang/expr/vm"
@@ -15,6 +16,7 @@ type ExprBridge struct {
1516
streamSQLFunctions map[string]Function
1617
exprProgram *vm.Program
1718
exprEnv map[string]interface{}
19+
mutex sync.RWMutex // 添加读写锁保护并发访问
1820
}
1921

2022
// NewExprBridge 创建新的表达式桥接器
@@ -27,6 +29,9 @@ func NewExprBridge() *ExprBridge {
2729

2830
// RegisterStreamSQLFunctionsToExpr 将StreamSQL函数注册到expr环境中
2931
func (bridge *ExprBridge) RegisterStreamSQLFunctionsToExpr() []expr.Option {
32+
bridge.mutex.Lock()
33+
defer bridge.mutex.Unlock()
34+
3035
options := make([]expr.Option, 0)
3136

3237
// 将所有StreamSQL函数注册到expr环境
@@ -58,6 +63,9 @@ func (bridge *ExprBridge) RegisterStreamSQLFunctionsToExpr() []expr.Option {
5863

5964
// CreateEnhancedExprEnvironment 创建增强的expr执行环境
6065
func (bridge *ExprBridge) CreateEnhancedExprEnvironment(data map[string]interface{}) map[string]interface{} {
66+
bridge.mutex.RLock()
67+
defer bridge.mutex.RUnlock()
68+
6169
// 合并数据和函数环境
6270
env := make(map[string]interface{})
6371

@@ -323,6 +331,9 @@ func (bridge *ExprBridge) toFloat64(val interface{}) (float64, error) {
323331

324332
// GetFunctionInfo 获取函数信息,统一两个系统的函数
325333
func (bridge *ExprBridge) GetFunctionInfo() map[string]interface{} {
334+
bridge.mutex.RLock()
335+
defer bridge.mutex.RUnlock()
336+
326337
info := make(map[string]interface{})
327338

328339
// StreamSQL函数信息
@@ -394,6 +405,9 @@ func (bridge *ExprBridge) GetFunctionInfo() map[string]interface{} {
394405

395406
// ResolveFunction 解析函数调用,优先使用StreamSQL函数
396407
func (bridge *ExprBridge) ResolveFunction(name string) (interface{}, bool, string) {
408+
bridge.mutex.RLock()
409+
defer bridge.mutex.RUnlock()
410+
397411
// 进行大小写不敏感的查找
398412
lowerName := strings.ToLower(name)
399413

@@ -445,9 +459,23 @@ func (bridge *ExprBridge) IsExprLangFunction(name string) bool {
445459

446460
// 全局桥接器实例
447461
var globalBridge *ExprBridge
462+
var globalBridgeMutex sync.RWMutex
448463

449464
// GetExprBridge 获取全局桥接器实例
450465
func GetExprBridge() *ExprBridge {
466+
// 首先使用读锁检查是否已初始化
467+
globalBridgeMutex.RLock()
468+
if globalBridge != nil {
469+
defer globalBridgeMutex.RUnlock()
470+
return globalBridge
471+
}
472+
globalBridgeMutex.RUnlock()
473+
474+
// 使用写锁进行初始化
475+
globalBridgeMutex.Lock()
476+
defer globalBridgeMutex.Unlock()
477+
478+
// 双重检查模式,防止并发初始化
451479
if globalBridge == nil {
452480
globalBridge = NewExprBridge()
453481
}

window/sliding_window.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -212,18 +212,13 @@ func (sw *SlidingWindow) Trigger() {
212212
sw.data = newData
213213
sw.currentSlot = next
214214

215-
// 非阻塞发送到输出通道
216-
sw.sendResultNonBlocking(resultData)
217-
}
218-
219-
// sendResultNonBlocking 非阻塞地发送结果到输出通道
220-
func (sw *SlidingWindow) sendResultNonBlocking(resultData []types.Row) {
215+
// 非阻塞发送到输出通道并直接更新统计信息(已在锁内)
221216
select {
222217
case sw.outputChan <- resultData:
223-
// 成功发送
218+
// 成功发送,更新统计信息(已在锁内)
224219
sw.sentCount++
225220
default:
226-
// 通道已满,丢弃结果
221+
// 通道已满,丢弃结果并更新统计信息(已在锁内)
227222
sw.droppedCount++
228223
}
229224
}

window/tumbling_window.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -209,18 +209,13 @@ func (tw *TumblingWindow) Trigger() {
209209
tw.data = newData
210210
tw.currentSlot = next
211211

212-
// 非阻塞发送到输出通道
213-
tw.sendResultNonBlocking(resultData)
214-
}
215-
216-
// sendResultNonBlocking 非阻塞地发送结果到输出通道
217-
func (tw *TumblingWindow) sendResultNonBlocking(resultData []types.Row) {
212+
// 非阻塞发送到输出通道并更新统计信息
218213
select {
219214
case tw.outputChan <- resultData:
220-
// 成功发送
215+
// 成功发送,更新统计信息(已在锁内)
221216
tw.sentCount++
222217
default:
223-
// 通道已满,丢弃结果(可选:记录日志或触发告警
218+
// 通道已满,丢弃结果并更新统计信息(已在锁内
224219
tw.droppedCount++
225220
// 可选:在这里添加日志记录
226221
// log.Printf("Window output channel full, dropped result with %d rows", len(resultData))

0 commit comments

Comments
 (0)