-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathgroup_delegate.go
More file actions
77 lines (63 loc) · 1.78 KB
/
group_delegate.go
File metadata and controls
77 lines (63 loc) · 1.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package hub
import (
"sync/atomic"
"github.com/rs/zerolog/log"
)
// 工作组,有委托能力
//
// 可以将 producer 委托给其他 Group 处理
type groupDelegate struct {
*Group
// 是否建立委托关系
delegated atomic.Value
}
// 构建委托工作组
func newGroupDelegate(options ...GroupOption) *groupDelegate {
gd := &groupDelegate{
Group: NewGroup(options...),
}
gd.delegated.Store(false)
return gd
}
// 委托其他工作组处理生产数据
func (gd *groupDelegate) DelegateChan(producer chan interface{}, g *Group) {
if gd.IsDelegated() {
// 已经建立其他委托关系,必须停止才能再次委托
panic("already delegate to some group, must stop it first")
}
gd.DetachCB(producer, func() {
gd.delegated.Store(true)
log.Trace().Bool("delegated", gd.IsDelegated()).Msg("委托生效")
g.Attach(producer)
})
}
// 中止委托关系,并自己处理工作
func (gd *groupDelegate) SelfSupport(producer chan interface{}, g *Group) {
if !gd.IsDelegated() {
log.Trace().Bool("delegated", gd.IsDelegated()).Msg("没有建立委托")
return // 没有建立委托
}
fn := func() {
// 标记未委托
gd.delegated.Store(false)
log.Trace().Bool("delegated", gd.IsDelegated()).Msg("标记未委托")
}
// 重新监听生产通道
g.DetachCB(producer, func() {
gd.AttachCB(producer, fn)
})
}
// 中止委托关系
func (gd *groupDelegate) StopDelegate() bool {
if !gd.IsDelegated() {
return false // 没有建立委托
}
// 标记未委托
log.Trace().Bool("delegated", gd.IsDelegated()).Msg("中止委托关系")
gd.delegated.Store(false)
return true
}
// 是否已经委托
func (gd *groupDelegate) IsDelegated() bool {
return gd.delegated.Load().(bool)
}