Skip to content

Commit 33e1bf8

Browse files
authored
Revert "fix(ai): update ai-video selection suspension (#3033)" (#3392)
This reverts commit 1c1c280.
1 parent 4fb4712 commit 33e1bf8

File tree

5 files changed

+11
-187
lines changed

5 files changed

+11
-187
lines changed

server/ai_process.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1370,23 +1370,6 @@ func processImageToText(ctx context.Context, params aiRequestParams, req worker.
13701370
return txtResp, nil
13711371
}
13721372

1373-
// isRetryableError checks if the error is a transient error that can be retried.
1374-
func isRetryableError(err error) bool {
1375-
transientErrorMessages := []string{
1376-
"insufficient capacity", // Caused by limitation in our current implementation.
1377-
"invalid ticket sendernonce", // Caused by gateway nonce mismatch.
1378-
"ticketparams expired", // Caused by ticket expiration.
1379-
}
1380-
1381-
errMsg := strings.ToLower(err.Error())
1382-
for _, msg := range transientErrorMessages {
1383-
if strings.Contains(errMsg, msg) {
1384-
return true
1385-
}
1386-
}
1387-
return false
1388-
}
1389-
13901373
func processAIRequest(ctx context.Context, params aiRequestParams, req interface{}) (interface{}, error) {
13911374
var cap core.Capability
13921375
var modelID string
@@ -1536,13 +1519,6 @@ func processAIRequest(ctx context.Context, params aiRequestParams, req interface
15361519
break
15371520
}
15381521

1539-
// Don't suspend the session if the error is a transient error.
1540-
if isRetryableError(err) {
1541-
params.sessManager.Complete(ctx, sess)
1542-
continue
1543-
}
1544-
1545-
// Suspend the session on other errors.
15461522
clog.Infof(ctx, "Error submitting request modelID=%v try=%v orch=%v err=%v", modelID, tries, sess.Transcoder(), err)
15471523
params.sessManager.Remove(ctx, sess) //TODO: Improve session selection logic for live-video-to-video
15481524

server/ai_process_test.go

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package server
22

33
import (
44
"context"
5-
"errors"
65
"reflect"
76
"testing"
87

@@ -89,38 +88,6 @@ func Test_submitAudioToText(t *testing.T) {
8988
}
9089
}
9190

92-
func Test_isRetryableError(t *testing.T) {
93-
tests := []struct {
94-
name string
95-
err error
96-
want bool
97-
}{
98-
{
99-
name: "insufficient capacity error",
100-
err: errors.New("Insufficient capacity"),
101-
want: true,
102-
},
103-
{
104-
name: "INSUFFICIENT capacity ERROR",
105-
err: errors.New("Insufficient capacity"),
106-
want: true,
107-
},
108-
{
109-
name: "non-retryable error",
110-
err: errors.New("some other error"),
111-
want: false,
112-
},
113-
}
114-
115-
for _, tt := range tests {
116-
t.Run(tt.name, func(t *testing.T) {
117-
if got := isRetryableError(tt.err); got != tt.want {
118-
t.Errorf("isRetryableError() = %v, want %v", got, tt.want)
119-
}
120-
})
121-
}
122-
}
123-
12491
func TestEncodeReqMetadata(t *testing.T) {
12592
tests := []struct {
12693
name string

server/ai_session.go

Lines changed: 11 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,14 @@ type AISessionPool struct {
2929
sessMap map[string]*BroadcastSession
3030
inUseSess []*BroadcastSession
3131
suspender *suspender
32-
penalty int
3332
mu sync.RWMutex
3433
}
3534

36-
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender, penalty int) *AISessionPool {
35+
func NewAISessionPool(selector BroadcastSessionsSelector, suspender *suspender) *AISessionPool {
3736
return &AISessionPool{
3837
selector: selector,
3938
sessMap: make(map[string]*BroadcastSession),
4039
suspender: suspender,
41-
penalty: penalty,
4240
mu: sync.RWMutex{},
4341
}
4442
}
@@ -103,6 +101,10 @@ func (pool *AISessionPool) Add(sessions []*BroadcastSession) {
103101
pool.mu.Lock()
104102
defer pool.mu.Unlock()
105103

104+
// If we try to add new sessions to the pool the suspender
105+
// should treat this as a refresh
106+
pool.suspender.signalRefresh()
107+
106108
var uniqueSessions []*BroadcastSession
107109
for _, sess := range sessions {
108110
if _, ok := pool.sessMap[sess.Transcoder()]; ok {
@@ -124,14 +126,10 @@ func (pool *AISessionPool) Remove(sess *BroadcastSession) {
124126
delete(pool.sessMap, sess.Transcoder())
125127
pool.inUseSess = removeSessionFromList(pool.inUseSess, sess)
126128

129+
// Magic number for now
130+
penalty := 3
127131
// If this method is called assume that the orch should be suspended
128-
// as well. Since AISessionManager re-uses the pools the suspension
129-
// penalty needs to consider the current suspender count to set the penalty
130-
lastCount, ok := pool.suspender.list[sess.Transcoder()]
131-
penalty := pool.suspender.count + pool.penalty
132-
if ok {
133-
penalty -= lastCount
134-
}
132+
// as well
135133
pool.suspender.suspend(sess.Transcoder(), penalty)
136134
}
137135

@@ -158,14 +156,12 @@ type AISessionSelector struct {
158156
// The time until the pools should be refreshed with orchs from discovery
159157
ttl time.Duration
160158
lastRefreshTime time.Time
161-
initialPoolSize int
162159

163160
cap core.Capability
164161
modelID string
165162

166163
node *core.LivepeerNode
167164
suspender *suspender
168-
penalty int
169165
os drivers.OSSession
170166
}
171167

@@ -184,10 +180,8 @@ func NewAISessionSelector(ctx context.Context, cap core.Capability, modelID stri
184180
// The latency score in this context is just the latency of the last completed request for a session
185181
// The "good enough" latency score is set to 0.0 so the selector will always select unknown sessions first
186182
minLS := 0.0
187-
// Session pool suspender starts at 0. Suspension is 3 requests if there are errors from the orchestrator
188-
penalty := 3
189-
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, warmCaps), suspender, penalty)
190-
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, coldCaps), suspender, penalty)
183+
warmPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, warmCaps), suspender)
184+
coldPool := NewAISessionPool(NewMinLSSelector(stakeRdr, minLS, node.SelectionAlgorithm, node.OrchPerfScore, coldCaps), suspender)
191185
sel := &AISessionSelector{
192186
warmPool: warmPool,
193187
coldPool: coldPool,
@@ -196,7 +190,6 @@ func NewAISessionSelector(ctx context.Context, cap core.Capability, modelID stri
196190
modelID: modelID,
197191
node: node,
198192
suspender: suspender,
199-
penalty: penalty,
200193
os: drivers.NodeStorage.NewSession(strconv.Itoa(int(cap)) + "_" + modelID),
201194
}
202195

@@ -225,26 +218,11 @@ func newAICapabilities(cap core.Capability, modelID string, warm bool, minVersio
225218
return caps
226219
}
227220

228-
// selectorIsEmpty returns true if no orchestrators are in the warm or cold pools.
229-
func (sel *AISessionSelector) SelectorIsEmpty() bool {
230-
return sel.warmPool.Size() == 0 && sel.coldPool.Size() == 0
231-
}
232-
233221
func (sel *AISessionSelector) Select(ctx context.Context) *AISession {
234222
shouldRefreshSelector := func() bool {
235-
discoveryPoolSize := int(math.Min(float64(sel.node.OrchestratorPool.Size()), float64(sel.initialPoolSize)))
236-
237-
// If the selector is empty, release all orchestrators from suspension and
238-
// try refresh.
239-
if sel.SelectorIsEmpty() {
240-
clog.Infof(ctx, "refreshing sessions, no orchestrators in pools")
241-
for i := 0; i < sel.penalty; i++ {
242-
sel.suspender.signalRefresh()
243-
}
244-
}
245-
246223
// Refresh if the # of sessions across warm and cold pools falls below the smaller of the maxRefreshSessionsThreshold and
247224
// 1/2 the total # of orchs that can be queried during discovery
225+
discoveryPoolSize := sel.node.OrchestratorPool.Size()
248226
if sel.warmPool.Size()+sel.coldPool.Size() < int(math.Min(maxRefreshSessionsThreshold, math.Ceil(float64(discoveryPoolSize)/2.0))) {
249227
return true
250228
}
@@ -294,10 +272,6 @@ func (sel *AISessionSelector) Remove(sess *AISession) {
294272
}
295273

296274
func (sel *AISessionSelector) Refresh(ctx context.Context) error {
297-
// If we try to add new sessions to the pool the suspender
298-
// should treat this as a refresh
299-
sel.suspender.signalRefresh()
300-
301275
sessions, err := sel.getSessions(ctx)
302276
if err != nil {
303277
return err
@@ -312,13 +286,6 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
312286
continue
313287
}
314288

315-
// We request 100 orchestrators in getSessions above so all Orchestrators are returned with refreshed information
316-
// This keeps the suspended Orchestrators out of the pool until the selector is empty or 30 minutes has passed (refresh happens every 10 minutes)
317-
if sel.suspender.Suspended(sess.Transcoder()) > 0 {
318-
clog.V(common.DEBUG).Infof(ctx, "skipping suspended orchestrator=%s", sess.Transcoder())
319-
continue
320-
}
321-
322289
// If the constraint for the modelID are missing skip this session
323290
modelConstraint, ok := constraints.Models[sel.modelID]
324291
if !ok {
@@ -334,7 +301,6 @@ func (sel *AISessionSelector) Refresh(ctx context.Context) error {
334301

335302
sel.warmPool.Add(warmSessions)
336303
sel.coldPool.Add(coldSessions)
337-
sel.initialPoolSize = len(warmSessions) + len(coldSessions) + len(sel.suspender.list)
338304

339305
sel.lastRefreshTime = time.Now()
340306

@@ -405,8 +371,6 @@ func (c *AISessionManager) Select(ctx context.Context, cap core.Capability, mode
405371
return nil, err
406372
}
407373

408-
clog.V(common.DEBUG).Infof(ctx, "selected orchestrator=%s", sess.Transcoder())
409-
410374
return sess, nil
411375
}
412376

server/handlers.go

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -296,88 +296,6 @@ func getAvailableTranscodingOptionsHandler() http.Handler {
296296
})
297297
}
298298

299-
// poolOrchestrator contains information about an orchestrator in a pool.
300-
type poolOrchestrator struct {
301-
Url string `json:"url"`
302-
LatencyScore float64 `json:"latency_score"`
303-
InFlight int `json:"in_flight"`
304-
}
305-
306-
// aiPoolInfo contains information about an AI pool.
307-
type aiPoolInfo struct {
308-
Size int `json:"size"`
309-
InUse int `json:"in_use"`
310-
Orchestrators []poolOrchestrator `json:"orchestrators"`
311-
}
312-
313-
// suspendedInfo contains information about suspended orchestrators.
314-
type suspendedInfo struct {
315-
List map[string]int `json:"list"`
316-
CurrentCount int `json:"current_count"`
317-
}
318-
319-
// aiOrchestratorPools contains information about all AI pools.
320-
type aiOrchestratorPools struct {
321-
Cold aiPoolInfo `json:"cold"`
322-
Warm aiPoolInfo `json:"warm"`
323-
LastRefresh time.Time `json:"last_refresh"`
324-
Suspended suspendedInfo `json:"suspended"`
325-
}
326-
327-
// getAIOrchestratorPoolsInfoHandler returns information about AI orchestrator pools.
328-
func (s *LivepeerServer) getAIPoolsInfoHandler() http.Handler {
329-
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
330-
aiPoolsInfoResp := make(map[string]aiOrchestratorPools)
331-
332-
s.AISessionManager.mu.Lock()
333-
defer s.AISessionManager.mu.Unlock()
334-
335-
// Return if no selectors are present.
336-
if len(s.AISessionManager.selectors) == 0 {
337-
glog.Warning("Orchestrator pools are not yet initialized")
338-
respondJson(w, aiPoolsInfoResp)
339-
return
340-
}
341-
342-
// Loop through selectors and get pools info.
343-
for cap, pool := range s.AISessionManager.selectors {
344-
warmPool := aiPoolInfo{
345-
Size: pool.warmPool.Size(),
346-
InUse: len(pool.warmPool.inUseSess),
347-
}
348-
for _, sess := range pool.warmPool.sessMap {
349-
poolOrchestrator := poolOrchestrator{
350-
Url: sess.Transcoder(),
351-
LatencyScore: sess.LatencyScore,
352-
InFlight: len(sess.SegsInFlight),
353-
}
354-
warmPool.Orchestrators = append(warmPool.Orchestrators, poolOrchestrator)
355-
}
356-
357-
coldPool := aiPoolInfo{
358-
Size: pool.coldPool.Size(),
359-
InUse: len(pool.coldPool.inUseSess),
360-
}
361-
for _, sess := range pool.coldPool.sessMap {
362-
coldPool.Orchestrators = append(coldPool.Orchestrators, poolOrchestrator{
363-
Url: sess.Transcoder(),
364-
LatencyScore: sess.LatencyScore,
365-
InFlight: len(sess.SegsInFlight),
366-
})
367-
}
368-
369-
aiPoolsInfoResp[cap] = aiOrchestratorPools{
370-
Cold: coldPool,
371-
Warm: warmPool,
372-
LastRefresh: pool.lastRefreshTime,
373-
Suspended: suspendedInfo{List: pool.suspender.list, CurrentCount: pool.suspender.count},
374-
}
375-
}
376-
377-
respondJson(w, aiPoolsInfoResp)
378-
})
379-
}
380-
381299
// Rounds
382300
func currentRoundHandler(client eth.LivepeerEthClient) http.Handler {
383301
return mustHaveClient(client, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

server/webserver.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ func (s *LivepeerServer) cliWebServerHandlers(bindAddr string) *http.ServeMux {
5050
mux.Handle("/getBroadcastConfig", getBroadcastConfigHandler())
5151
mux.Handle("/getAvailableTranscodingOptions", getAvailableTranscodingOptionsHandler())
5252
mux.Handle("/setMaxPriceForCapability", mustHaveFormParams(s.setMaxPriceForCapability(), "maxPricePerUnit", "pixelsPerUnit", "currency", "pipeline", "modelID"))
53-
mux.Handle("/getAISessionPoolsInfo", s.getAIPoolsInfoHandler())
5453

5554
// Rounds
5655
mux.Handle("/currentRound", currentRoundHandler(client))

0 commit comments

Comments
 (0)