Skip to content

Commit 3841f13

Browse files
committed
remove locks on series
1 parent bde8a3b commit 3841f13

File tree

5 files changed

+43
-192
lines changed

5 files changed

+43
-192
lines changed

core.go

Lines changed: 13 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,10 @@ func (e *BarEnv) NewSeries(data []float64) *Series {
9494
xlogs := make(map[int]*CrossLog)
9595
res := e.newSeries(data, nil, nil, nil, subs, xlogs)
9696
e.VNum += 1
97-
e.LockItems.Lock()
9897
if e.Items == nil {
9998
e.Items = make(map[int]*Series)
10099
}
101100
e.Items[res.ID] = res
102-
e.LockItems.Unlock()
103101
return res
104102
}
105103

@@ -112,19 +110,15 @@ func (e *BarEnv) newSeries(data []float64, cols []*Series, more interface{}, dup
112110
xlogs = make(map[int]*CrossLog)
113111
}
114112
res := &Series{
115-
ID: e.VNum,
116-
Env: e,
117-
Data: data,
118-
Cols: cols,
119-
Time: e.TimeStart,
120-
More: more,
121-
DupMore: dupMore,
122-
Subs: subs,
123-
XLogs: xlogs,
124-
LockSubMap: make(map[string]*sync.Mutex),
125-
}
126-
for fn := range res.Subs {
127-
res.LockSubMap[fn] = &sync.Mutex{}
113+
ID: e.VNum,
114+
Env: e,
115+
Data: data,
116+
Cols: cols,
117+
Time: e.TimeStart,
118+
More: more,
119+
DupMore: dupMore,
120+
Subs: subs,
121+
XLogs: xlogs,
128122
}
129123
return res
130124
}
@@ -170,9 +164,7 @@ func (e *BarEnv) Clone() *BarEnv {
170164
if e.Info != nil {
171165
res.Info = e.Info.CopyTo(res)
172166
}
173-
e.LockItems.RLock()
174167
itemList := maps.Values(e.Items)
175-
e.LockItems.RUnlock()
176168
for v := range itemList {
177169
v.CopyTo(res)
178170
}
@@ -189,7 +181,6 @@ func (e *BarEnv) ResetTo(env *BarEnv) {
189181
env.Volume.ID: true,
190182
env.Info.ID: true,
191183
}
192-
env.LockItems.Lock()
193184
var items = make([]*Series, 0, len(env.Items))
194185
for id, s := range env.Items {
195186
if _, ok := rootIds[id]; ok {
@@ -198,7 +189,6 @@ func (e *BarEnv) ResetTo(env *BarEnv) {
198189
delete(e.Items, id)
199190
items = append(items, s)
200191
}
201-
env.LockItems.Unlock()
202192
for _, s := range items {
203193
s.CopyTo(e)
204194
}
@@ -214,11 +204,9 @@ func (s *Series) Set(obj interface{}) *Series {
214204
if s.Cached() {
215205
return s
216206
}
217-
s.LockData.Lock()
218207
if !s.Cached() {
219208
s.Append(obj)
220209
}
221-
s.LockData.Unlock()
222210
return s
223211
}
224212

@@ -335,11 +323,9 @@ func (s *Series) Add(obj interface{}) *Series {
335323
if res.Cached() {
336324
return res
337325
}
338-
res.LockData.Lock()
339326
if !res.Cached() {
340327
res.Append(s.Get(0) + val)
341328
}
342-
res.LockData.Unlock()
343329
return res
344330
}
345331

@@ -348,11 +334,9 @@ func (s *Series) Sub(obj interface{}) *Series {
348334
if res.Cached() {
349335
return res
350336
}
351-
res.LockData.Lock()
352337
if !res.Cached() {
353338
res.Append(s.Get(0) - val)
354339
}
355-
res.LockData.Unlock()
356340
return res
357341
}
358342

@@ -361,11 +345,9 @@ func (s *Series) Mul(obj interface{}) *Series {
361345
if res.Cached() {
362346
return res
363347
}
364-
res.LockData.Lock()
365348
if !res.Cached() {
366349
res.Append(s.Get(0) * val)
367350
}
368-
res.LockData.Unlock()
369351
return res
370352
}
371353

@@ -374,11 +356,9 @@ func (s *Series) Div(obj interface{}) *Series {
374356
if res.Cached() {
375357
return res
376358
}
377-
res.LockData.Lock()
378359
if !res.Cached() {
379360
res.Append(s.Get(0) / val)
380361
}
381-
res.LockData.Unlock()
382362
return res
383363
}
384364

@@ -387,11 +367,9 @@ func (s *Series) Min(obj interface{}) *Series {
387367
if res.Cached() {
388368
return res
389369
}
390-
res.LockData.Lock()
391370
if !res.Cached() {
392371
res.Append(math.Min(s.Get(0), val))
393372
}
394-
res.LockData.Unlock()
395373
return res
396374
}
397375

@@ -400,11 +378,9 @@ func (s *Series) Max(obj interface{}) *Series {
400378
if res.Cached() {
401379
return res
402380
}
403-
res.LockData.Lock()
404381
if !res.Cached() {
405382
res.Append(math.Max(s.Get(0), val))
406383
}
407-
res.LockData.Unlock()
408384
return res
409385
}
410386

@@ -413,11 +389,9 @@ func (s *Series) Abs() *Series {
413389
if res.Cached() {
414390
return res
415391
}
416-
res.LockData.Lock()
417392
if !res.Cached() {
418393
res.Append(math.Abs(s.Get(0)))
419394
}
420-
res.LockData.Unlock()
421395
return res
422396
}
423397

@@ -429,23 +403,21 @@ func (s *Series) Len() int {
429403
}
430404

431405
func (s *Series) Cut(keepNum int) {
432-
s.LockSub.Lock()
406+
curLen := len(s.Data)
407+
if curLen <= keepNum {
408+
return
409+
}
433410
for _, dv := range s.Subs {
434411
for _, v := range dv {
435412
v.Cut(keepNum)
436413
}
437414
}
438-
s.LockSub.Unlock()
439415
if len(s.Cols) > 0 {
440416
for _, col := range s.Cols {
441417
col.Cut(keepNum)
442418
}
443419
return
444420
}
445-
curLen := len(s.Data)
446-
if curLen <= keepNum {
447-
return
448-
}
449421
s.Data = s.Data[curLen-keepNum:]
450422
}
451423

@@ -454,7 +426,6 @@ func (s *Series) Back(num int) *Series {
454426
if res.Cached() {
455427
return res
456428
}
457-
res.LockData.Lock()
458429
if !res.Cached() {
459430
endPos := len(s.Data) - num
460431
if endPos > 0 {
@@ -464,7 +435,6 @@ func (s *Series) Back(num int) *Series {
464435
}
465436
res.Time = s.Env.TimeStop
466437
}
467-
res.LockData.Unlock()
468438
return res
469439
}
470440

@@ -488,58 +458,43 @@ func (s *Series) objVal(rel string, obj interface{}) (*Series, float64) {
488458
}
489459

490460
func (s *Series) To(k string, v int) *Series {
491-
s.LockSub.Lock()
492461
sub, _ := s.Subs[k]
493462
if sub == nil {
494463
sub = make(map[int]*Series)
495464
s.Subs[k] = sub
496465
}
497-
lock, ok := s.LockSubMap[k]
498-
if !ok {
499-
lock = &sync.Mutex{}
500-
s.LockSubMap[k] = lock
501-
}
502-
s.LockSub.Unlock()
503-
lock.Lock()
504466
old, _ := sub[v]
505467
if old == nil {
506468
old = s.Env.NewSeries(nil)
507469
sub[v] = old
508470
}
509-
lock.Unlock()
510471
return old
511472
}
512473

513474
func (s *Series) CopyTo(e *BarEnv) *Series {
514475
if e == nil {
515476
e = s.Env
516477
}
517-
e.LockItems.Lock()
518478
if e.Items == nil {
519479
e.Items = make(map[int]*Series)
520480
}
521481
old, ok := e.Items[s.ID]
522-
e.LockItems.Unlock()
523482
if ok {
524483
return old
525484
}
526485
cols := make([]*Series, len(s.Cols))
527486
for i, v := range s.Cols {
528487
cols[i] = v.CopyTo(e)
529488
}
530-
s.LockSub.Lock()
531489
subs := maps.Clone(s.Subs)
532-
s.LockSub.Unlock()
533490
for fn, idMap := range subs {
534491
sub := make(map[int]*Series)
535492
for id, v := range idMap {
536493
sub[id] = v.CopyTo(e)
537494
}
538495
subs[fn] = sub
539496
}
540-
s.LockXLogs.Lock()
541497
xlogs := maps.Clone(s.XLogs)
542-
s.LockXLogs.Unlock()
543498
for id, v := range xlogs {
544499
xlogs[id] = v.Clone()
545500
}
@@ -548,16 +503,12 @@ func (s *Series) CopyTo(e *BarEnv) *Series {
548503
if s.DupMore != nil && s.More != nil {
549504
res.More = s.DupMore(s.More)
550505
}
551-
e.LockItems.Lock()
552506
e.Items[s.ID] = res
553-
e.LockItems.Unlock()
554507
return res
555508
}
556509

557510
func (s *Series) loadEnvSubs() {
558-
s.Env.LockItems.RLock()
559511
envItems := maps.Clone(s.Env.Items)
560-
s.Env.LockItems.RUnlock()
561512
for _, idMap := range s.Subs {
562513
for id := range idMap {
563514
dup, ok := envItems[id]
@@ -594,7 +545,6 @@ func (s *Series) Cross(obj2 interface{}) int {
594545
}
595546
var newData = false
596547
var log *CrossLog
597-
s.LockXLogs.Lock()
598548
if val, ok := s.XLogs[key]; ok {
599549
log = val
600550
if env.TimeStart > log.Time {
@@ -606,7 +556,6 @@ func (s *Series) Cross(obj2 interface{}) int {
606556
log = &CrossLog{env.TimeStart, math.NaN(), []*XState{}}
607557
s.XLogs[key] = log
608558
}
609-
s.LockXLogs.Unlock()
610559
if newData {
611560
diffVal := s.Get(0) - v2
612561
if diffVal != 0 && !math.IsNaN(diffVal) {

core_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,21 +152,23 @@ func TestConcurrent(t *testing.T) {
152152
wg.Add(1)
153153
go func() {
154154
defer wg.Done()
155-
// 测试基本读取操作
155+
// 测试基本读取操作(读取不需要锁)
156156
_ = testEnv.Close.Get(0)
157157
_ = testEnv.High.Get(1)
158158
_ = testEnv.Low.Get(2)
159159

160-
// 测试计算操作
160+
// 并发计算操作需要手动加锁
161+
testEnv.Lock.Lock()
161162
_ = testEnv.Close.Add(100).Get(0)
162163
_ = testEnv.Close.Sub(50).Get(0)
163164
_ = testEnv.Close.Mul(1.1).Get(0)
164165
_ = testEnv.High.Sub(testEnv.Low).Abs().Get(0)
165166

166167
// 测试交叉计算
167168
_ = testEnv.Close.Cross(30000)
169+
testEnv.Lock.Unlock()
168170

169-
// 测试范围操作
171+
// 测试范围操作(只读不需要锁)
170172
_ = testEnv.Close.Range(0, 5)
171173
}()
172174
}

develop.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
11

2-
### 支持并发操作BarEnv和Series
3-
为避免额外冗余创建BarEnv和Series,多策略和多账户会复用,多策略和多账户需支持并发调用,所以banta也需支持并发。
4-
测试给Series的Subs和XLogs,BarEnv的Items和Data改为sync.Map后,性能相比RWMutex+map大幅下降;故保留RWMutex方案。
2+
### 并发操作BarEnv和Series
3+
为避免额外冗余创建BarEnv和Series,多策略和多账户会复用,多策略和多账户需支持并发调用。
4+
5+
原方案使用RWMutex保护Series各方法,但会带来43%的性能损失。现改为在BarEnv中只保留一个通用锁`Lock`,由用户手动调用。
6+
7+
原因:banta的相关函数调用通常是密集且相邻的,所以一个锁就足矣,需要并发时可加锁保护banta代码块。
8+
9+
使用示例:
10+
```go
11+
env.Lock.Lock()
12+
// 进行banta运算
13+
result := banta.EMA(env.Close, 20).Get(0)
14+
env.Lock.Unlock()
15+
```
516

617
| 方案 | 耗时 | 内存 | 分配次数 | 性能变化 |
7-
|------|------|------|---------|---------|
18+
|------|------|------|---------|---------|
819
| 无锁 | 21ms | 10.8MB | 327K | 基准 |
920
| RWMutex | 30ms | 10.8MB | 327K | +43% |
1021
| sync.Map | 59ms | 23MB | 708K | +181% |

0 commit comments

Comments
 (0)