Skip to content

Commit 2eadd7d

Browse files
PR livepeer#3426 add hardware information
1 parent 629e8b7 commit 2eadd7d

21 files changed

+1263
-428
lines changed

common/types.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,15 @@ type OrchestratorStore interface {
168168
type RoundsManager interface {
169169
LastInitializedRound() *big.Int
170170
}
171+
172+
type NetworkCapabilities struct {
173+
Orchestrators []*OrchNetworkCapabilities `json:"orchestrators"`
174+
}
175+
type OrchNetworkCapabilities struct {
176+
Address string `json:"address"`
177+
LocalAddress string `json:"local_address"`
178+
OrchURI string `json:"orch_uri"`
179+
Capabilities *net.Capabilities `json:"capabilities"`
180+
CapabilitiesPrices []*net.PriceInfo `json:"capabilities_prices"`
181+
Hardware []*net.HardwareInformation `json:"hardware"`
182+
}

core/ai.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type AI interface {
3232
Stop(context.Context) error
3333
HasCapacity(string, string) bool
3434
EnsureImageAvailable(context.Context, string, string) error
35+
HardwareInformation() []worker.HardwareInformation
3536
}
3637

3738
// Custom type to parse a big.Rat from a JSON number.

core/ai_orchestrator.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type RemoteAIWorker struct {
3838
manager *RemoteAIWorkerManager
3939
stream net.AIWorker_RegisterAIWorkerServer
4040
capabilities *Capabilities
41+
hardware []worker.HardwareInformation
4142
eof chan struct{}
4243
addr string
4344
}
@@ -66,13 +67,14 @@ type RemoteAIWorkerManager struct {
6667
workerSecrets map[string]bool
6768
}
6869

69-
func NewRemoteAIWorker(m *RemoteAIWorkerManager, stream net.AIWorker_RegisterAIWorkerServer, caps *Capabilities) *RemoteAIWorker {
70+
func NewRemoteAIWorker(m *RemoteAIWorkerManager, stream net.AIWorker_RegisterAIWorkerServer, caps *Capabilities, hardware []worker.HardwareInformation) *RemoteAIWorker {
7071
return &RemoteAIWorker{
7172
manager: m,
7273
stream: stream,
7374
eof: make(chan struct{}, 1),
7475
addr: common.GetConnectionAddr(stream.Context()),
7576
capabilities: caps,
77+
hardware: hardware,
7678
}
7779
}
7880

@@ -91,13 +93,14 @@ func NewRemoteAIWorkerManager() *RemoteAIWorkerManager {
9193
}
9294
}
9395

94-
func (orch *orchestrator) ServeAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities) {
95-
orch.node.serveAIWorker(stream, capabilities)
96+
func (orch *orchestrator) ServeAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities, hardware []*net.HardwareInformation) {
97+
orch.node.serveAIWorker(stream, capabilities, hardware)
9698
}
9799

98-
func (n *LivepeerNode) serveAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities) {
100+
func (n *LivepeerNode) serveAIWorker(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities, hardware []*net.HardwareInformation) {
99101
from := common.GetConnectionAddr(stream.Context())
100102
wkrCaps := CapabilitiesFromNetCapabilities(capabilities)
103+
wkrHdw := hardwareInformationFromNetHardware(hardware)
101104
if n.Capabilities.LivepeerVersionCompatibleWith(capabilities) {
102105
glog.Infof("Worker compatible, connecting worker_version=%s orchestrator_version=%s worker_addr=%s", capabilities.Version, n.Capabilities.constraints.minVersion, from)
103106
n.Capabilities.AddCapacity(wkrCaps)
@@ -106,17 +109,18 @@ func (n *LivepeerNode) serveAIWorker(stream net.AIWorker_RegisterAIWorkerServer,
106109
defer n.RemoveAICapabilities(wkrCaps)
107110

108111
// Manage blocks while AI worker is connected
109-
n.AIWorkerManager.Manage(stream, capabilities)
112+
n.AIWorkerManager.Manage(stream, capabilities, wkrHdw)
110113
glog.V(common.DEBUG).Infof("Closing aiworker=%s channel", from)
111114
} else {
112115
glog.Errorf("worker %s not connected, version not compatible", from)
113116
}
114117
}
115118

116119
// Manage adds aiworker to list of live aiworkers. Doesn't return until aiworker disconnects
117-
func (rwm *RemoteAIWorkerManager) Manage(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities) {
120+
func (rwm *RemoteAIWorkerManager) Manage(stream net.AIWorker_RegisterAIWorkerServer, capabilities *net.Capabilities, hardware []worker.HardwareInformation) {
118121
from := common.GetConnectionAddr(stream.Context())
119-
aiworker := NewRemoteAIWorker(rwm, stream, CapabilitiesFromNetCapabilities(capabilities))
122+
123+
aiworker := NewRemoteAIWorker(rwm, stream, CapabilitiesFromNetCapabilities(capabilities), hardware)
120124
go func() {
121125
ctx := stream.Context()
122126
<-ctx.Done()
@@ -421,6 +425,20 @@ func (orch *orchestrator) CheckAICapacity(pipeline, modelID string) bool {
421425
}
422426
}
423427

428+
func (orch *orchestrator) WorkerHardware() []worker.HardwareInformation {
429+
if orch.node.AIWorker != nil {
430+
return orch.node.AIWorker.HardwareInformation()
431+
} else {
432+
// return combined hardware information from all live remote workers from information provided by workers
433+
// when connecting to orchestrator. Does not reach out for real-time information.
434+
var wkrHdw []worker.HardwareInformation
435+
for _, worker := range orch.node.AIWorkerManager.liveAIWorkers {
436+
wkrHdw = append(wkrHdw, worker.hardware...)
437+
}
438+
return wkrHdw
439+
}
440+
}
441+
424442
func (orch *orchestrator) AIResults(tcID int64, res *RemoteAIWorkerResult) {
425443
orch.node.AIWorkerManager.aiResults(tcID, res)
426444
}
@@ -1188,3 +1206,20 @@ func (n *LivepeerNode) transcodeFrames(ctx context.Context, sessionID string, ur
11881206
}
11891207
return &tr
11901208
}
1209+
1210+
func hardwareInformationFromNetHardware(hdw []*net.HardwareInformation) []worker.HardwareInformation {
1211+
var netWorkerHardware []byte
1212+
netWorkerHardware, err := json.Marshal(hdw)
1213+
if err != nil {
1214+
glog.Errorf("Error converting hardware information to json: %v", err)
1215+
return []worker.HardwareInformation{}
1216+
}
1217+
var workerHardware []worker.HardwareInformation
1218+
err = json.Unmarshal(netWorkerHardware, &workerHardware)
1219+
if err != nil {
1220+
glog.Errorf("Error converting hardware information: %v", err)
1221+
return []worker.HardwareInformation{}
1222+
}
1223+
1224+
return workerHardware
1225+
}

core/ai_test.go

Lines changed: 78 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package core
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"os"
78
"path/filepath"
9+
"reflect"
810
"strconv"
911
"sync"
1012
"testing"
@@ -47,13 +49,50 @@ func TestServeAIWorker(t *testing.T) {
4749
// test that an ai worker was created
4850
caps := createAIWorkerCapabilities()
4951
netCaps := caps.ToNetCapabilities()
50-
go n.serveAIWorker(strm, netCaps)
52+
go n.serveAIWorker(strm, netCaps, nil)
5153
time.Sleep(1 * time.Second)
5254

5355
wkr, ok := n.AIWorkerManager.liveAIWorkers[strm]
5456
if !ok {
5557
t.Error("Unexpected transcoder type")
5658
}
59+
//confirm worker info
60+
assert.Equal(t, wkr.capabilities, caps)
61+
assert.Nil(t, wkr.hardware)
62+
63+
// test shutdown
64+
wkr.eof <- struct{}{}
65+
time.Sleep(1 * time.Second)
66+
67+
// stream should be removed
68+
_, ok = n.AIWorkerManager.liveAIWorkers[strm]
69+
if ok {
70+
t.Error("Unexpected ai worker presence")
71+
}
72+
73+
//confirm no workers connected
74+
assert.Len(t, n.AIWorkerManager.liveAIWorkers, 0)
75+
76+
//connect worker with hardware information
77+
strm1 := &StubAIWorkerServer{}
78+
hdwDetail := net.GPUComputeInfo{Id: "gpu-1", Name: "gpu name", Major: 8, Minor: 9, MemoryFree: 1, MemoryTotal: 10}
79+
hdwInfo := make(map[string]*net.GPUComputeInfo)
80+
hdwInfo["0"] = &hdwDetail
81+
hdw := net.HardwareInformation{Pipeline: "livepeer-pipeline", ModelId: "livepeer/model1", GpuInfo: hdwInfo}
82+
var netHdwList []*net.HardwareInformation
83+
netHdwList = append(netHdwList, &hdw)
84+
go n.serveAIWorker(strm1, netCaps, netHdwList)
85+
time.Sleep(1 * time.Second)
86+
87+
wkr, ok = n.AIWorkerManager.liveAIWorkers[strm1]
88+
if !ok {
89+
t.Error("Unexpected transcoder type")
90+
}
91+
92+
//confirm worker attached and has hardware information
93+
assert.Len(t, n.AIWorkerManager.liveAIWorkers, 1)
94+
wkrHdw := hardwareInformationFromNetHardware(netHdwList)
95+
assert.Equal(t, wkrHdw, n.AIWorkerManager.liveAIWorkers[strm1].hardware)
5796

5897
// test shutdown
5998
wkr.eof <- struct{}{}
@@ -65,6 +104,7 @@ func TestServeAIWorker(t *testing.T) {
65104
t.Error("Unexpected ai worker presence")
66105
}
67106
}
107+
68108
func TestServeAIWorker_IncompatibleVersion(t *testing.T) {
69109
assert := assert.New(t)
70110
n, _ := NewLivepeerNode(nil, "", nil)
@@ -76,7 +116,7 @@ func TestServeAIWorker_IncompatibleVersion(t *testing.T) {
76116
// test that an ai worker was created
77117
caps := createAIWorkerCapabilities()
78118
netCaps := caps.ToNetCapabilities()
79-
go n.serveAIWorker(strm, netCaps)
119+
go n.serveAIWorker(strm, netCaps, nil)
80120
time.Sleep(5 * time.Second)
81121
assert.Zero(len(n.AIWorkerManager.liveAIWorkers))
82122
assert.Zero(len(n.AIWorkerManager.remoteAIWorkers))
@@ -88,14 +128,14 @@ func TestRemoteAIWorkerManager(t *testing.T) {
88128
initAIWorker := func() (*RemoteAIWorker, *StubAIWorkerServer) {
89129
strm := &StubAIWorkerServer{manager: m}
90130
caps := createAIWorkerCapabilities()
91-
wkr := NewRemoteAIWorker(m, strm, caps)
131+
wkr := NewRemoteAIWorker(m, strm, caps, nil)
92132
return wkr, strm
93133
}
94134
//create worker and connect to manager
95135
wkr, strm := initAIWorker()
96136

97137
go func() {
98-
m.Manage(strm, wkr.capabilities.ToNetCapabilities())
138+
m.Manage(strm, wkr.capabilities.ToNetCapabilities(), nil)
99139
}()
100140
time.Sleep(1 * time.Millisecond) // allow the workers to activate
101141

@@ -156,9 +196,9 @@ func TestSelectAIWorker(t *testing.T) {
156196

157197
// register ai workers, which adds ai worker to liveAIWorkers and remoteAIWorkers
158198
wg := newWg(1)
159-
go func() { m.Manage(strm, capabilities.ToNetCapabilities()) }()
199+
go func() { m.Manage(strm, capabilities.ToNetCapabilities(), nil) }()
160200
time.Sleep(1 * time.Millisecond) // allow time for first stream to register
161-
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities()); wg.Done() }()
201+
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities(), nil); wg.Done() }()
162202
time.Sleep(1 * time.Millisecond) // allow time for second stream to register e for third stream to register
163203

164204
//update worker.addr to be different
@@ -256,7 +296,7 @@ func TestSelectAIWorker(t *testing.T) {
256296
assert.EqualError(err, ErrNoCompatibleWorkersAvailable.Error())
257297

258298
// reconnect worker and check pipeline only on second worker is available
259-
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities()); wg.Done() }()
299+
go func() { m.Manage(strm2, extraModelCapabilities.ToNetCapabilities(), nil); wg.Done() }()
260300
time.Sleep(1 * time.Millisecond)
261301
w, err = m.selectWorker(testRequestId, "image-to-image", "livepeer/model2")
262302
assert.NotNil(w)
@@ -280,7 +320,7 @@ func TestManageAIWorkers(t *testing.T) {
280320

281321
// test that transcoder is added to liveTranscoders and remoteTranscoders
282322
wg1 := newWg(1)
283-
go func() { m.Manage(strm, capabilities.ToNetCapabilities()); wg1.Done() }()
323+
go func() { m.Manage(strm, capabilities.ToNetCapabilities(), nil); wg1.Done() }()
284324
time.Sleep(1 * time.Millisecond) // allow the manager to activate
285325

286326
assert.NotNil(m.liveAIWorkers[strm])
@@ -291,7 +331,7 @@ func TestManageAIWorkers(t *testing.T) {
291331

292332
// test that additional transcoder is added to liveTranscoders and remoteTranscoders
293333
wg2 := newWg(1)
294-
go func() { m.Manage(strm2, capabilities.ToNetCapabilities()); wg2.Done() }()
334+
go func() { m.Manage(strm2, capabilities.ToNetCapabilities(), nil); wg2.Done() }()
295335
time.Sleep(1 * time.Millisecond) // allow the manager to activate
296336

297337
assert.NotNil(m.liveAIWorkers[strm])
@@ -321,7 +361,7 @@ func TestRemoteAIWorkerTimeout(t *testing.T) {
321361
strm := &StubAIWorkerServer{manager: m}
322362
//create capabilities and constraints the ai worker sends to orch
323363
caps := createAIWorkerCapabilities()
324-
wkr := NewRemoteAIWorker(m, strm, caps)
364+
wkr := NewRemoteAIWorker(m, strm, caps, nil)
325365
return wkr, strm
326366
}
327367
//create a new worker
@@ -483,14 +523,14 @@ func TestCheckAICapacity(t *testing.T) {
483523
initAIWorker := func() (*RemoteAIWorker, *StubAIWorkerServer) {
484524
strm := &StubAIWorkerServer{manager: o.node.AIWorkerManager}
485525
caps := createAIWorkerCapabilities()
486-
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps)
526+
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps, nil)
487527
return wkr, strm
488528
}
489529
//create worker and connect to manager
490530
wkr2, strm := initAIWorker()
491531

492532
go func() {
493-
o.node.AIWorkerManager.Manage(strm, wkr2.capabilities.ToNetCapabilities())
533+
o.node.AIWorkerManager.Manage(strm, wkr2.capabilities.ToNetCapabilities(), nil)
494534
}()
495535
time.Sleep(1 * time.Millisecond) // allow the workers to activate
496536

@@ -513,12 +553,12 @@ func TestRemoteAIWorkerProcessPipelines(t *testing.T) {
513553
initAIWorker := func() (*RemoteAIWorker, *StubAIWorkerServer) {
514554
strm := &StubAIWorkerServer{manager: o.node.AIWorkerManager}
515555
caps := createAIWorkerCapabilities()
516-
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps)
556+
wkr := NewRemoteAIWorker(o.node.AIWorkerManager, strm, caps, nil)
517557
return wkr, strm
518558
}
519559
//create worker and connect to manager
520560
wkr, strm := initAIWorker()
521-
go o.node.serveAIWorker(strm, wkr.capabilities.ToNetCapabilities())
561+
go o.node.serveAIWorker(strm, wkr.capabilities.ToNetCapabilities(), nil)
522562
time.Sleep(5 * time.Millisecond) // allow the workers to activate
523563

524564
//check workers connected
@@ -686,6 +726,10 @@ func (a *stubAIWorker) EnsureImageAvailable(ctx context.Context, pipeline string
686726
return nil
687727
}
688728

729+
func (a *stubAIWorker) HardwareInformation() []worker.HardwareInformation {
730+
return nil
731+
}
732+
689733
type StubAIWorkerServer struct {
690734
manager *RemoteAIWorkerManager
691735
SendError error
@@ -800,3 +844,23 @@ func TestParseAIModelConfigs(t *testing.T) {
800844
})
801845
}
802846
}
847+
848+
func TestHardwareInformationFromNetHardware(t *testing.T) {
849+
netHdwDetail := net.GPUComputeInfo{Id: "gpu-1", Name: "gpu name", Major: 8, Minor: 9, MemoryFree: 1, MemoryTotal: 10}
850+
netHdwInfo := make(map[string]*net.GPUComputeInfo)
851+
netHdwInfo["0"] = &netHdwDetail
852+
netHdw := net.HardwareInformation{Pipeline: "livepeer-pipeline", ModelId: "livepeer/model1", GpuInfo: netHdwInfo}
853+
var netHdwList []*net.HardwareInformation
854+
netHdwList = append(netHdwList, &netHdw)
855+
//create []worker.HardwareInformation
856+
hdwList := hardwareInformationFromNetHardware(netHdwList)
857+
858+
netHdwJson, _ := json.Marshal(netHdwList)
859+
hdwJson, _ := json.Marshal(hdwList)
860+
861+
var hdw1, hdw2 interface{}
862+
json.Unmarshal(netHdwJson, &hdw1)
863+
json.Unmarshal(hdwJson, &hdw2)
864+
assert.True(t, reflect.DeepEqual(hdw1, hdw2))
865+
866+
}

core/livepeernode.go

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,10 @@ type LivepeerNode struct {
142142
Sender pm.Sender
143143

144144
// Thread safety for config fields
145-
mu sync.RWMutex
146-
StorageConfigs map[string]*transcodeConfig
147-
storageMutex *sync.RWMutex
145+
mu sync.RWMutex
146+
StorageConfigs map[string]*transcodeConfig
147+
storageMutex *sync.RWMutex
148+
NetworkCapabilities common.NetworkCapabilities
148149
// Transcoder private fields
149150
priceInfo map[string]*AutoConvertedPrice
150151
priceInfoForCaps map[string]CapabilityPrices
@@ -269,6 +270,19 @@ func (n *LivepeerNode) GetBasePriceForCap(b_eth_addr string, cap Capability, mod
269270
return nil
270271
}
271272

273+
func (n *LivepeerNode) GetCapsPrices(b_eth_addr string) *CapabilityPrices {
274+
addr := strings.ToLower(b_eth_addr)
275+
n.mu.RLock()
276+
defer n.mu.RUnlock()
277+
278+
prices, ok := n.priceInfoForCaps[addr]
279+
if !ok {
280+
return nil
281+
}
282+
283+
return &prices
284+
}
285+
272286
// SetMaxFaceValue sets the faceValue upper limit for tickets received
273287
func (n *LivepeerNode) SetMaxFaceValue(maxfacevalue *big.Int) {
274288
n.mu.Lock()
@@ -393,3 +407,22 @@ func (n *LivepeerNode) UpdateTranscoderSecret(secret string, active bool) {
393407

394408
n.Database.UpdateTranscoderSecret(secret, active)
395409
}
410+
411+
func (n *LivepeerNode) UpdateNetworkCapabilities(orchNetworkCapabilities []*common.OrchNetworkCapabilities) error {
412+
n.mu.Lock()
413+
defer n.mu.Unlock()
414+
415+
n.NetworkCapabilities.Orchestrators = orchNetworkCapabilities
416+
417+
if lpmon.Enabled {
418+
lpmon.SendQueueEventAsync("network_capabilities", orchNetworkCapabilities)
419+
}
420+
421+
return nil
422+
}
423+
424+
func (n *LivepeerNode) GetNetworkCapabilities() []*common.OrchNetworkCapabilities {
425+
n.mu.Lock()
426+
defer n.mu.Unlock()
427+
return n.NetworkCapabilities.Orchestrators
428+
}

0 commit comments

Comments
 (0)