Skip to content

Commit 588bd18

Browse files
use db orchs if orch pool not set
1 parent 2eadd7d commit 588bd18

File tree

1 file changed

+43
-14
lines changed

1 file changed

+43
-14
lines changed

discovery/db_discovery.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error {
238238

239239
func (dbo *DBOrchestratorPoolCache) pollOrchestratorInfo(ctx context.Context) error {
240240
if err := dbo.cacheOrchInfos(); err != nil {
241+
glog.Errorf("unable to poll orchestrator info: %v", err)
241242
return err
242243
}
243244

@@ -261,15 +262,41 @@ func (dbo *DBOrchestratorPoolCache) pollOrchestratorInfo(ctx context.Context) er
261262
func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
262263
//get list of orchestrators to poll info for. If -orchAddr or -orchWebhookUrl is used it will
263264
//limit the set of orchestrators polled to those specified.
264-
orchs := dbo.node.OrchestratorPool.GetInfos()
265+
var orchs []common.OrchestratorLocalInfo
266+
if dbo.node.OrchestratorPool != nil {
267+
orchs = dbo.node.OrchestratorPool.GetInfos()
268+
glog.Infof("Using orchestrator pool with %d orchestrators", len(orchs))
269+
} else {
270+
dbOrchs, err := dbo.store.SelectOrchs(
271+
&common.DBOrchFilter{
272+
CurrentRound: dbo.rm.LastInitializedRound(),
273+
},
274+
)
275+
if err != nil {
276+
return fmt.Errorf("could not retrieve orchestrators from DB: %v", err)
277+
}
265278

266-
resc, errc := make(chan *common.DBOrch, len(orchs)), make(chan error, len(orchs))
279+
for _, o := range dbOrchs {
280+
url, err := parseURI(o.ServiceURI)
281+
if err != nil {
282+
continue
283+
}
284+
orchs = append(orchs, common.OrchestratorLocalInfo{URL: url})
285+
}
286+
287+
glog.Infof("Using DB orchestrator pool with %d orchestrators", len(orchs))
288+
}
289+
290+
type orchPollingInfo struct {
291+
orchInfo *net.OrchestratorInfo
292+
dbOrch *common.DBOrch
293+
}
294+
295+
resc, errc := make(chan orchPollingInfo, len(orchs)), make(chan error, len(orchs))
267296
timeout := getOrchestratorTimeoutLoop // Needs to be same or longer than GRPCConnectTimeout in server/rpc.go
268297
ctx, cancel := context.WithTimeout(context.Background(), timeout)
269298
defer cancel()
270299

271-
var orchNetworkCapabilities []*common.OrchNetworkCapabilities
272-
273300
getOrchInfo := func(orch common.OrchestratorLocalInfo) {
274301
uri, err := parseURI(orch.URL.String())
275302
if err != nil {
@@ -315,14 +342,11 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
315342
return
316343
}
317344

318-
//add response to network capabilities
319-
orchNetworkCapabilities = append(orchNetworkCapabilities, orchInfoToOrchNetworkCapabilities(info))
320-
321345
if err != nil {
322346
glog.Error("Error adding Orchestrator to network capabilities: ", err)
323347
}
324348

325-
resc <- dbOrch
349+
resc <- orchPollingInfo{orchInfo: info, dbOrch: dbOrch}
326350
}
327351

328352
numOrchs := 0
@@ -331,21 +355,26 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
331355
go getOrchInfo(orch)
332356
}
333357

358+
var orchNetworkCapabilities []*common.OrchNetworkCapabilities
334359
for i := 0; i < numOrchs; i++ {
335360
select {
336361
case res := <-resc:
337-
if err := dbo.store.UpdateOrch(res); err != nil {
362+
//add response to network capabilities
363+
orchNetworkCapabilities = append(orchNetworkCapabilities, orchInfoToOrchNetworkCapabilities(res.orchInfo))
364+
//update db with response
365+
if err := dbo.store.UpdateOrch(res.dbOrch); err != nil {
338366
glog.Error("Error updating Orchestrator in DB: ", err)
339367
}
340368
case err := <-errc:
341369
glog.Errorln(err)
342370
case <-ctx.Done():
343371
glog.Infof("Done fetching orch info for orchestrators, context timeout (fetched: %v out of %v)", i, numOrchs)
344-
return nil
372+
i = numOrchs //exit loop
345373
}
346374
}
347-
375+
//save network capabilities in LivepeerNode
348376
dbo.node.UpdateNetworkCapabilities(orchNetworkCapabilities)
377+
349378
return nil
350379
}
351380

@@ -395,11 +424,11 @@ func pmTicketParams(params *net.TicketParams) *pm.TicketParams {
395424
}
396425

397426
func orchInfoToOrchNetworkCapabilities(info *net.OrchestratorInfo) *common.OrchNetworkCapabilities {
398-
var orch *common.OrchNetworkCapabilities
427+
var orch common.OrchNetworkCapabilities
399428

400429
// add orch operating information if available
401430
if info != nil {
402-
orch.LocalAddress = ethcommon.BytesToAddress(info.Address).Hex()
431+
orch.LocalAddress = ethcommon.BytesToAddress(info.GetAddress()).Hex()
403432
orch.OrchURI = info.GetTranscoder()
404433
orch.Capabilities = info.GetCapabilities()
405434
orch.Hardware = info.GetHardware()
@@ -409,5 +438,5 @@ func orchInfoToOrchNetworkCapabilities(info *net.OrchestratorInfo) *common.OrchN
409438
}
410439
}
411440

412-
return orch
441+
return &orch
413442
}

0 commit comments

Comments
 (0)