Skip to content

Commit 48c5d86

Browse files
authored
ai/live: Remote signer discovery (#3866)
This adds an optional remote discovery feature to the remote signer node. Client using the remote signer can now discover orchestrators and their supported capabilities on the network. The remote discovery endpoint is compatible with the existing orchestrator webhook, making it usable by go-livepeer as well as local gateway SDKs. ### Usage * Remote signer node gets a -remoteDiscovery flag. This is only usable in remote signer mode for now and is opt-in. All the other gateway-side orchestrator configuration flags are supported here: -orchWebhookUrl, -orchAddr , -orchBlocklist, -extraNodes etc. Additionally, operators can tune the orchestrator refresh interval with the existing -liveAICapReportInterval flag. * If the gateway is using a remote signer (via -remoteSignerUrl) and has no other orchestrator sources (eg, webhook, orchestrator list) then it will fall back to using the remote signer's discovery endpoint. * The discovery endpoint is GET /discover-orchestrators. It returns a JSON list of [{"address":...,"capabilities":[...]}] where the address is the URI the orchestrator can be reached at, and the capabilities is a list of capabilities the orchestrator supports, eg live-video-to-video/streamdiffusion. There is also an optional caps query parameter to return only the subset of orchestrators matching the capability (exact string match). Multiple caps parameters can be included (OR). ### Remote Discovery Implementation * Remote discovery builds on the existing DB discovery pool's orchestrator polling. The DB discovery pool periodically (via -liveAICapReportInterval) fetches orchestrator info from the network and updates the node's capability cache. * The remote discovery pool lazily reads from this cache and builds a capability-indexed view for quick lookups. * Price filtering happens during refresh, so it doesn't return orchestrators that will be later rejected by the remote signer due to price ### Supporting Changes * DB Discovery pool and Webhook Discovery pool are updated to use a "builder pattern" of config structs + initializer, matching the regular orchestrator pool. The existing constructor interfaces are left untouched, but the plumbing underneath uses the new builder pattern. This is done to support the IgnoreCapacityCheck flag (see below) * To avoid returning capacity errors during the DB Discovery refresh call (it is capabilities we want), we need to thread through the IgnoreCapacityCheck flag a couple places from the webhook, DB discovery, etc. Note that this flag is only in enabled when the node is in remote signer mode, so this change should not affect other modes such as the gateway. * DB Discovery cache refreshes now incorporate the ExtraNodes field in order to fully traverse the orchestrators available on the network. * To support price filtering during remote discovery, the PriceInfo field is added to OrchNetworkCapabilities. * The LiveAICapReportInterval field was added to the LivepeerNode; this is just storing an existing flag so the remote signer node can use it. * Bug fix in the broadcast price config to mitigate cross-test contamination due to the use of globals + inconsistent cleanup from other tests * Tests, docs, etc.
1 parent 8de5e30 commit 48c5d86

File tree

14 files changed

+987
-58
lines changed

14 files changed

+987
-58
lines changed

cmd/livepeer/starter/flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
140140
cfg.TestOrchAvail = fs.Bool("startupAvailabilityCheck", *cfg.TestOrchAvail, "Set to false to disable the startup Orchestrator availability check on the configured serviceAddr")
141141
cfg.RemoteSigner = fs.Bool("remoteSigner", *cfg.RemoteSigner, "Set to true to run remote signer service")
142142
cfg.RemoteSignerUrl = fs.String("remoteSignerUrl", *cfg.RemoteSignerUrl, "URL of remote signer service to use (e.g., http://localhost:8935). Gateway only.")
143+
cfg.RemoteDiscovery = fs.Bool("remoteDiscovery", *cfg.RemoteDiscovery, "Enable orchestrator discovery on remote signers")
143144

144145
// Gateway metrics
145146
cfg.KafkaBootstrapServers = fs.String("kafkaBootstrapServers", *cfg.KafkaBootstrapServers, "URL of Kafka Bootstrap Servers")

cmd/livepeer/starter/starter.go

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ type LivepeerConfig struct {
169169
TestOrchAvail *bool
170170
RemoteSigner *bool
171171
RemoteSignerUrl *string
172+
RemoteDiscovery *bool
172173
AIRunnerImage *string
173174
AIRunnerImageOverrides *string
174175
AIVerboseLogs *bool
@@ -306,6 +307,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
306307
defaultTestOrchAvail := true
307308
defaultRemoteSigner := false
308309
defaultRemoteSignerUrl := ""
310+
defaultRemoteDiscovery := false
309311

310312
// Gateway logs
311313
defaultKafkaBootstrapServers := ""
@@ -429,6 +431,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
429431
TestOrchAvail: &defaultTestOrchAvail,
430432
RemoteSigner: &defaultRemoteSigner,
431433
RemoteSignerUrl: &defaultRemoteSignerUrl,
434+
RemoteDiscovery: &defaultRemoteDiscovery,
432435

433436
// Gateway logs
434437
KafkaBootstrapServers: &defaultKafkaBootstrapServers,
@@ -1635,6 +1638,10 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
16351638
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout)
16361639
} else if len(orchURLs) > 0 {
16371640
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout)
1641+
} else if n.RemoteSignerUrl != nil {
1642+
orchDiscoveryURL := n.RemoteSignerUrl.ResolveReference(&url.URL{Path: "/discover-orchestrators"})
1643+
glog.Info("Using remote signer orchestrator discovery endpoint ", orchDiscoveryURL)
1644+
n.OrchestratorPool = discovery.NewWebhookPool(bcast, orchDiscoveryURL, *cfg.DiscoveryTimeout)
16381645
}
16391646

16401647
// When the node is on-chain mode always cache the on-chain orchestrators and poll for updates
@@ -1648,7 +1655,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
16481655
exit("Could not create orchestrator pool with DB cache: %v", err)
16491656
}
16501657

1651-
//if orchURLs is empty and webhook pool not used, use the DB orchestrator pool cache as orchestrator pool
1658+
// If orchURLs is empty and webhook pool not used, use the DB orchestrator pool cache.
16521659
if *cfg.OrchWebhookURL == "" && len(orchURLs) == 0 {
16531660
n.OrchestratorPool = dbOrchPoolCache
16541661
}
@@ -1793,6 +1800,12 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
17931800
if cfg.LiveAIHeartbeatInterval != nil {
17941801
n.LiveAIHeartbeatInterval = *cfg.LiveAIHeartbeatInterval
17951802
}
1803+
if cfg.LiveAICapReportInterval != nil {
1804+
n.LiveAICapReportInterval = *cfg.LiveAICapReportInterval
1805+
}
1806+
if cfg.RemoteDiscovery != nil {
1807+
n.RemoteDiscovery = *cfg.RemoteDiscovery
1808+
}
17961809
if cfg.LiveAIHeartbeatHeaders != nil {
17971810
n.LiveAIHeartbeatHeaders = make(map[string]string)
17981811
headers := strings.Split(*cfg.LiveAIHeartbeatHeaders, ",")
@@ -1842,8 +1855,69 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
18421855
}()
18431856
}
18441857

1845-
// Start remote signer server if in remote signer mode
1858+
// Set up orchestrator pool for remote signer mode and start server
18461859
if n.NodeType == core.RemoteSignerNode {
1860+
bcast := core.NewBroadcaster(n)
1861+
orchBlacklist := parseOrchBlacklist(cfg.OrchBlacklist)
1862+
n.ExtraNodes = *cfg.ExtraNodes
1863+
1864+
// Set up orchestrator discovery - same logic as BroadcasterNode
1865+
if *cfg.OrchWebhookURL != "" {
1866+
whurl, err := validateURL(*cfg.OrchWebhookURL)
1867+
if err != nil {
1868+
glog.Exit("Error setting orch webhook URL ", err)
1869+
}
1870+
glog.Info("Using orchestrator webhook URL ", whurl)
1871+
n.OrchestratorPool = discovery.WebhookPoolConfig{
1872+
Broadcaster: bcast,
1873+
Callback: whurl,
1874+
DiscoveryTimeout: *cfg.DiscoveryTimeout,
1875+
IgnoreCapacityCheck: true,
1876+
}.New()
1877+
} else if len(orchURLs) > 0 {
1878+
pool, err := discovery.NewOrchestratorPoolWithConfig(discovery.OrchestratorPoolConfig{
1879+
Broadcaster: bcast,
1880+
URIs: orchURLs,
1881+
Score: common.Score_Trusted,
1882+
OrchBlacklist: orchBlacklist,
1883+
DiscoveryTimeout: *cfg.DiscoveryTimeout,
1884+
IgnoreCapacityCheck: true,
1885+
ExtraNodes: *cfg.ExtraNodes,
1886+
})
1887+
if err != nil {
1888+
glog.Exit("Error initializing orchestrator pool ", err)
1889+
}
1890+
n.OrchestratorPool = pool
1891+
}
1892+
1893+
// When the node is on-chain mode always cache the on-chain orchestrators and poll for updates
1894+
if *cfg.Network != "offchain" && *cfg.RemoteDiscovery {
1895+
ctx, cancel := context.WithCancel(ctx)
1896+
defer cancel()
1897+
dbOrchPoolCache, err := discovery.DBOrchestratorPoolCacheConfig{
1898+
Ctx: ctx,
1899+
Node: n,
1900+
RoundsManager: timeWatcher,
1901+
OrchBlacklist: orchBlacklist,
1902+
DiscoveryTimeout: *cfg.DiscoveryTimeout,
1903+
LiveAICapReportInterval: *cfg.LiveAICapReportInterval,
1904+
IgnoreCapacityCheck: true,
1905+
}.New()
1906+
if err != nil {
1907+
exit("Could not create orchestrator pool with DB cache: %v", err)
1908+
}
1909+
1910+
// If orchURLs is empty and webhook pool not used, use the DB orchestrator pool cache
1911+
if *cfg.OrchWebhookURL == "" && len(orchURLs) == 0 {
1912+
n.OrchestratorPool = dbOrchPoolCache
1913+
}
1914+
}
1915+
1916+
if n.RemoteDiscovery && n.OrchestratorPool == nil {
1917+
exit("RemoteDiscovery is set but no orchestrator pool could be configured")
1918+
}
1919+
1920+
// Start remote signer server
18471921
go func() {
18481922
*cfg.HttpAddr = defaultAddr(*cfg.HttpAddr, "127.0.0.1", OrchestratorRpcPort)
18491923
glog.Info("Starting remote signer server on ", *cfg.HttpAddr)

common/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ type OrchNetworkCapabilities struct {
174174
LocalAddress string `json:"local_address"`
175175
OrchURI string `json:"orch_uri"`
176176
Capabilities *net.Capabilities `json:"capabilities"`
177+
PriceInfo *net.PriceInfo `json:"price_info"`
177178
CapabilitiesPrices []*net.PriceInfo `json:"capabilities_prices"`
178179
Hardware []*net.HardwareInformation `json:"hardware"`
179180
}

core/livepeernode.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ type LivepeerNode struct {
152152
RemoteSignerUrl *url.URL
153153
RemoteEthAddr ethcommon.Address // eth address of the remote signer
154154
InfoSig []byte // sig over eth address for the OrchestratorInfo request
155+
RemoteDiscovery bool // expose remote discovery endpoint when enabled
155156

156157
// Thread safety for config fields
157158
mu sync.RWMutex
@@ -177,6 +178,7 @@ type LivepeerNode struct {
177178
LiveAIHeartbeatURL string
178179
LiveAIHeartbeatHeaders map[string]string
179180
LiveAIHeartbeatInterval time.Duration
181+
LiveAICapReportInterval time.Duration
180182
LivePaymentInterval time.Duration
181183
LiveOutSegmentTimeout time.Duration
182184
LiveAISaveNSegments *int

discovery/db_discovery.go

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,33 @@ type DBOrchestratorPoolCache struct {
3434
bcast common.Broadcaster
3535
orchBlacklist []string
3636
discoveryTimeout time.Duration
37+
ignoreCapacityCheck bool
3738
node *core.LivepeerNode
3839
}
3940

4041
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration, liveAICapReportInterval time.Duration) (*DBOrchestratorPoolCache, error) {
42+
return DBOrchestratorPoolCacheConfig{
43+
Ctx: ctx,
44+
Node: node,
45+
RoundsManager: rm,
46+
OrchBlacklist: orchBlacklist,
47+
DiscoveryTimeout: discoveryTimeout,
48+
LiveAICapReportInterval: liveAICapReportInterval,
49+
}.New()
50+
}
51+
52+
type DBOrchestratorPoolCacheConfig struct {
53+
Ctx context.Context
54+
Node *core.LivepeerNode
55+
RoundsManager common.RoundsManager
56+
OrchBlacklist []string
57+
DiscoveryTimeout time.Duration
58+
LiveAICapReportInterval time.Duration
59+
IgnoreCapacityCheck bool
60+
}
61+
62+
func (cfg DBOrchestratorPoolCacheConfig) New() (*DBOrchestratorPoolCache, error) {
63+
node := cfg.Node
4164
if node.Eth == nil {
4265
return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
4366
}
@@ -46,10 +69,11 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
4669
store: node.Database,
4770
lpEth: node.Eth,
4871
ticketParamsValidator: node.Sender,
49-
rm: rm,
72+
rm: cfg.RoundsManager,
5073
bcast: core.NewBroadcaster(node),
51-
orchBlacklist: orchBlacklist,
52-
discoveryTimeout: discoveryTimeout,
74+
orchBlacklist: cfg.OrchBlacklist,
75+
discoveryTimeout: cfg.DiscoveryTimeout,
76+
ignoreCapacityCheck: cfg.IgnoreCapacityCheck,
5377
node: node,
5478
}
5579

@@ -62,7 +86,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
6286
return err
6387
}
6488

65-
if err := dbo.pollOrchestratorInfo(ctx, liveAICapReportInterval); err != nil {
89+
if err := dbo.pollOrchestratorInfo(cfg.Ctx, cfg.LiveAICapReportInterval); err != nil {
6690
return err
6791
}
6892
return nil
@@ -152,7 +176,19 @@ func (dbo *DBOrchestratorPoolCache) GetOrchestrators(ctx context.Context, numOrc
152176
return true
153177
}
154178

155-
orchPool := NewOrchestratorPoolWithPred(dbo.bcast, uris, pred, common.Score_Untrusted, dbo.orchBlacklist, dbo.discoveryTimeout)
179+
orchPool, err := NewOrchestratorPoolWithConfig(OrchestratorPoolConfig{
180+
Broadcaster: dbo.bcast,
181+
URIs: uris,
182+
Pred: pred,
183+
Score: common.Score_Untrusted,
184+
OrchBlacklist: dbo.orchBlacklist,
185+
DiscoveryTimeout: dbo.discoveryTimeout,
186+
IgnoreCapacityCheck: dbo.ignoreCapacityCheck,
187+
ExtraNodes: dbo.bcast.ExtraNodes(),
188+
})
189+
if err != nil {
190+
return nil, err
191+
}
156192
orchInfos, err := orchPool.GetOrchestrators(ctx, numOrchestrators, suspender, caps, scorePred)
157193
if err != nil || len(orchInfos) <= 0 {
158194
return nil, err
@@ -303,16 +339,24 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
303339
}
304340

305341
type orchPollingInfo struct {
342+
level int
306343
orchInfo *net.OrchestratorInfo
307344
dbOrch *common.DBOrch
308345
}
309346

310-
resc, errc := make(chan orchPollingInfo, len(orchs)), make(chan error, len(orchs))
347+
nodesPerOrch := dbo.bcast.ExtraNodes()
348+
// Each base orchestrator can contribute itself plus up to nodesPerOrch first-level advertised nodes.
349+
maxOrchs := len(orchs) * (nodesPerOrch + 1)
350+
resc, errc := make(chan orchPollingInfo, maxOrchs), make(chan error, maxOrchs)
311351
timeout := getOrchestratorTimeoutLoop // Needs to be same or longer than GRPCConnectTimeout in server/rpc.go
312352
ctx, cancel := context.WithTimeout(context.Background(), timeout)
313353
defer cancel()
354+
getOrchInfoRPC := serverGetOrchInfo
355+
if pool, ok := dbo.node.OrchestratorPool.(*orchestratorPool); ok && pool.getOrchInfo != nil {
356+
getOrchInfoRPC = pool.getOrchInfo
357+
}
314358

315-
getOrchInfo := func(orch common.OrchestratorLocalInfo) {
359+
getOrchInfo := func(orch common.OrchestratorLocalInfo, level int) {
316360
uri, err := parseURI(orch.URL.String())
317361
if err != nil {
318362
errc <- err
@@ -323,7 +367,9 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
323367
errc <- fmt.Errorf("skipping orch=%v, URI not set", orch.URL.String())
324368
return
325369
}
326-
info, err := serverGetOrchInfo(ctx, dbo.bcast, uri, server.GetOrchestratorInfoParams{})
370+
info, err := getOrchInfoRPC(ctx, dbo.bcast, uri, server.GetOrchestratorInfoParams{
371+
IgnoreCapacityCheck: dbo.ignoreCapacityCheck,
372+
})
327373
if err != nil {
328374
errc <- err
329375
return
@@ -361,13 +407,30 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
361407
}
362408
}
363409

364-
resc <- orchPollingInfo{orchInfo: info, dbOrch: dbOrch}
410+
resc <- orchPollingInfo{
411+
level: level,
412+
orchInfo: info,
413+
dbOrch: dbOrch,
414+
}
365415
}
366416

417+
seen := make(map[string]bool, maxOrchs)
367418
numOrchs := 0
368-
for _, orch := range orchs {
419+
startOrchLookup := func(orch common.OrchestratorLocalInfo, level int) {
420+
if orch.URL == nil {
421+
return
422+
}
423+
key := orch.URL.String()
424+
if key == "" || seen[key] {
425+
return
426+
}
427+
seen[key] = true
369428
numOrchs++
370-
go getOrchInfo(orch)
429+
go getOrchInfo(orch, level)
430+
}
431+
432+
for _, orch := range orchs {
433+
startOrchLookup(orch, 0)
371434
}
372435

373436
var orchNetworkCapabilities []*common.OrchNetworkCapabilities
@@ -376,6 +439,22 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
376439
case res := <-resc:
377440
//add response to network capabilities
378441
orchNetworkCapabilities = append(orchNetworkCapabilities, orchInfoToOrchNetworkCapabilities(res.orchInfo))
442+
443+
// discover newly advertised nodes. only recurse the first level.
444+
if res.level == 0 && len(res.orchInfo.GetNodes()) > 0 {
445+
for idx, inst := range res.orchInfo.GetNodes() {
446+
if idx >= nodesPerOrch {
447+
break
448+
}
449+
u, err := parseURI(inst)
450+
if err != nil {
451+
glog.Errorf("Invalid node URL orch=%v node=%v err=%q", res.orchInfo.GetTranscoder(), inst, err)
452+
continue
453+
}
454+
startOrchLookup(common.OrchestratorLocalInfo{URL: u, Score: common.Score_Untrusted}, res.level+1)
455+
}
456+
}
457+
379458
//update db with response
380459
if res.dbOrch != nil {
381460
if err := dbo.store.UpdateOrch(res.dbOrch); err != nil {
@@ -507,6 +586,7 @@ func orchInfoToOrchNetworkCapabilities(info *net.OrchestratorInfo) *common.OrchN
507586
orch.LocalAddress = ethcommon.BytesToAddress(info.GetAddress()).Hex()
508587
orch.OrchURI = info.GetTranscoder()
509588
orch.Capabilities = info.GetCapabilities()
589+
orch.PriceInfo = info.GetPriceInfo()
510590
orch.Hardware = info.GetHardware()
511591
orch.CapabilitiesPrices = info.GetCapabilitiesPrices()
512592
if info.GetTicketParams() != nil {

0 commit comments

Comments
 (0)