-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathsystem.go
More file actions
162 lines (146 loc) · 4.58 KB
/
system.go
File metadata and controls
162 lines (146 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package grain
import (
"log/slog"
"strconv"
"sync/atomic"
"github.com/chenxyzl/grain/message"
"github.com/chenxyzl/grain/uuid"
"google.golang.org/protobuf/proto"
)
type system struct {
config *config
//
registry *registry
rpcService iRpcServer
clusterProvider iProvider
timerSchedule *timerSchedule
addrHash *AddrHash
//
forceCloseChan chan bool
logger *slog.Logger
eventStream ActorRef
askId uint64
}
// NewSystem ...
// @param clusterName mean etcd root
// @param clusterUrls mean etcd urls
func NewSystem(clusterName string, version string, clusterUrls []string, opts ...ConfigOptFunc) ISystem {
sys := &system{}
sys.config = newConfig(clusterName, version, clusterUrls, opts...)
//
sys.logger = slog.Default()
sys.registry = newRegistry(sys.Logger())
sys.clusterProvider = newProvider[*providerEtcd]()
sys.forceCloseChan = make(chan bool, 1)
sys.timerSchedule = newTimerSchedule(sys)
sys.addrHash = newAddrHash()
//
return sys
}
func (x *system) getAddr() string { return x.rpcService.Addr() }
func (x *system) getConfig() *config { return x.config }
func (x *system) GetProvider() iProvider { return x.clusterProvider }
func (x *system) getRegistry() iRegistry { return x.registry }
func (x *system) getNextAskId() uint64 { return atomic.AddUint64(&x.askId, 1) }
func (x *system) GetScheduler() iScheduler { return x }
func (x *system) getAddrHash() *AddrHash { return x.addrHash }
func (x *system) Logger() *slog.Logger { return x.logger }
func (x *system) Spawn(p iProducer, opts ...KindOptFunc) ActorRef {
return x.SpawnNamed(p, strconv.Itoa(int(uuid.Generate())), opts...)
}
func (x *system) SpawnNamed(p iProducer, name string, opts ...KindOptFunc) ActorRef {
//
opts = append(opts, withOptsDirectSelf(name, x.getAddr(), x))
options := newOpts(p, opts...)
//
return newProcessor(x, options).self()
}
func (x *system) SpawnClusterName(p iProducer, opts ...KindOptFunc) ActorRef {
//
options := newOpts(p, opts...)
//
return newProcessor(x, options).self()
}
func (x *system) GetClusterActorRef(kind string, name string) ActorRef {
return newClusterActorRef(kind, name, x)
}
func (x *system) getSender() iSender {
return x
}
func (x *system) tellWithSender(target ActorRef, msg proto.Message, sender ActorRef, msgSnId uint64) {
//check
if target == nil {
x.Logger().Error("target actor is nil")
return
}
//check actor type
if target.isDirect() {
targetAddr := target.GetDirectAddr()
//direct actor
if targetAddr == x.getAddr() {
x.sendToLocal(target, msg, sender, msgSnId)
} else {
x.sendToCluster(targetAddr, target, msg, sender, msgSnId)
}
} else {
//for performance op
if proc := x.registry.get(target); proc != nil {
proc.send(newContext(proc.self(), sender, msg, msgSnId, x))
} else {
//cluster actor
cacheAddr, _ := target.getRemoteAddrCache()
if cacheAddr == "" {
x.Logger().Error("actor kind not in cluster")
return
}
//
if cacheAddr == x.getAddr() {
//ensure cluster kind actor must exist
x.ensureClusterKindActorExist(target)
//
x.sendToLocal(target, msg, sender, msgSnId)
} else {
x.sendToCluster(cacheAddr, target, msg, sender, msgSnId)
}
}
}
}
func (x *system) tell(target ActorRef, msg proto.Message) {
x.tellWithSender(target, msg, nil, x.getNextAskId())
}
func (x *system) sendToLocal(target ActorRef, msg proto.Message, sender ActorRef, msgSnId uint64) {
//to local
proc := x.registry.get(target)
if proc == nil {
//
if _, ok := msg.(*message.Poison); ok {
//ignore poison msg if proc not found
return
}
if sender.isAsk() {
sender.Tell(errActorNotFound)
}
x.Logger().Error("send, get actor failed", "actor", target, "msgName", msg.ProtoReflect().Descriptor().FullName())
return
}
//
proc.send(newContext(proc.self(), sender, msg, msgSnId, x))
}
func (x *system) sendToCluster(targetAddress string, target ActorRef, msg proto.Message, sender ActorRef, msgSnId uint64) {
//remote addr
writeStreamActorRef := newDirectActorRef(defaultWriteStreamKind, targetAddress, x.getAddr(), x)
//get proc
proc := x.registry.get(writeStreamActorRef)
if proc == nil {
// spawn if not found
x.SpawnNamed(func() IActor {
return newStreamWriterActor(writeStreamActorRef, targetAddress, x.getConfig().dialOptions, x.getConfig().callOptions)
}, writeStreamActorRef.GetName(), WithOptsKindName(writeStreamActorRef.GetKind()), WithOptsPoisonFirstOnQuit(false))
}
//must
proc = x.registry.get(writeStreamActorRef)
proc.send(newContext(target, sender, msg, msgSnId, x))
}
func (x *system) Poison(ref ActorRef) {
x.tell(ref, poison)
}