Skip to content

Commit d7e0aee

Browse files
authored
feat(worker): Assign WorkerGroup via worker configuration (#332)
1 parent cadf1f0 commit d7e0aee

File tree

6 files changed

+385
-361
lines changed

6 files changed

+385
-361
lines changed

vermeer/apps/master/bl/grpc_handlers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
9292
}
9393
}
9494

95-
reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version)
95+
reqWorker, err := workerMgr.CreateWorker(ip+":"+port, ip, req.Version, req.WorkerGroup)
9696
if err != nil {
9797
logrus.Errorf("failed to create a WorkerClient, error: %s", err)
9898
return &pb.HelloMasterResp{WorkerId: -1, WorkerName: reqWorker.Name}, err
@@ -104,7 +104,7 @@ func (h *ServerHandler) SayHelloMaster(ctx context.Context, req *pb.HelloMasterR
104104
return &pb.HelloMasterResp{}, err
105105
}
106106

107-
logrus.Infof("worker say hello name: %s, client: %s", reqWorker.Name, p.Addr.String())
107+
logrus.Infof("worker say hello name: %s and set to workgroup: %s, client: %s", reqWorker.Name, reqWorker.Group, p.Addr.String())
108108

109109
resp := pb.HelloMasterResp{
110110
WorkerId: reqWorker.Id,

vermeer/apps/master/workers/worker_manager.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (wm *workerManager) Init() {
104104
// CreateWorker Build a WorkerClient without an ID, and it'll receive one upon joining the WorkerManager.
105105
// The new WokerClient instance will be assigned a same name with the old one added to the WorkerManager,
106106
// which has the same workerPeer property.
107-
func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version string) (*WorkerClient, error) {
107+
func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version string, workerGroup string) (*WorkerClient, error) {
108108
if workerPeer == "" {
109109
return nil, fmt.Errorf("the argument 'workerPeer' is invalid")
110110
}
@@ -115,12 +115,17 @@ func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version
115115
return nil, fmt.Errorf("the argument 'version' is invalid")
116116
}
117117

118+
// must check if workerGroup is valid
119+
if workerGroup == "" {
120+
workerGroup = "$"
121+
}
122+
118123
worker := &WorkerClient{
119124
GrpcPeer: workerPeer,
120125
IpAddr: ipAddr,
121126
Version: version,
122127
LaunchTime: time.Now(),
123-
Group: "$",
128+
Group: workerGroup,
124129
}
125130

126131
workerInDB := wm.retrieveWorker(workerPeer)
@@ -133,6 +138,11 @@ func (wm *workerManager) CreateWorker(workerPeer string, ipAddr string, version
133138
worker.InitTime = worker.LaunchTime
134139
}
135140

141+
// if workerGroup in workerInDB is different from the one in worker, give a warning to the user
142+
if workerGroup != "$" && worker.Group != workerGroup {
143+
logrus.Warnf("worker manager, worker group mismatch: given %s, but found %s in db for worker %s", workerGroup, worker.Group, worker.Name)
144+
}
145+
136146
return worker, nil
137147
}
138148

0 commit comments

Comments
 (0)