-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathtrackedWorkerAgents.ts
More file actions
244 lines (213 loc) · 7.99 KB
/
trackedWorkerAgents.ts
File metadata and controls
244 lines (213 loc) · 7.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import { PromisePool } from '@supercharge/promise-pool'
import { LoggerInstance, Reason, WorkerAgentId, valueOfCost, stringifyError } from '@sofie-package-manager/api'
import { ExpectationStateHandlerSession, WorkerAgentAssignment } from '../../lib/types'
import { WorkerAgentAPI } from '../../workerAgentApi'
import { ExpectationTracker } from '../../expectationTracker/expectationTracker'
import { TrackedExpectation } from '../../lib/trackedExpectation'
/** Storage for WorkerAgents */
export class TrackedWorkerAgents {
private workerAgents: Map<WorkerAgentId, TrackedWorkerAgent> = new Map()
private logger: LoggerInstance
constructor(logger: LoggerInstance, private tracker: ExpectationTracker) {
this.logger = logger.category('TrackedWorkerAgents')
}
public get(workerId: WorkerAgentId): TrackedWorkerAgent | undefined {
return this.workerAgents.get(workerId)
}
public list(): { workerId: WorkerAgentId; workerAgent: TrackedWorkerAgent }[] {
return Array.from(this.workerAgents.entries()).map(([workerId, workerAgent]) => {
return {
workerId,
workerAgent,
}
})
}
public upsert(workerId: WorkerAgentId, workerAgent: TrackedWorkerAgent): void {
this.workerAgents.set(workerId, workerAgent)
}
public remove(workerId: WorkerAgentId): void {
this.workerAgents.delete(workerId)
}
/**
* Asks the Workers if they support a certain Expectation.
* Updates trackedExp.availableWorkers to reflect the result.
*/
public async updateAvailableWorkersForExpectation(trackedExp: TrackedExpectation): Promise<{
hasQueriedAnyone: boolean
workerCount: number
}> {
const workerAgents = this.list()
// Note: In the future, this algorithm could be change to not ask ALL
// workers, but instead just ask until we have got an enough number of available workers.
let hasQueriedAnyone = false
await Promise.all(
workerAgents.map(async ({ workerId, workerAgent }) => {
if (!workerAgent.connected) return
// Only ask each worker once, or after a certain time has passed:
const queriedWorker = trackedExp.queriedWorkers.get(workerId)
if (!queriedWorker || Date.now() - queriedWorker > this.tracker.constants.WORKER_SUPPORT_TIME) {
trackedExp.queriedWorkers.set(workerId, Date.now())
hasQueriedAnyone = true
try {
const support = await workerAgent.api.doYouSupportExpectation(trackedExp.exp)
if (support.support) {
trackedExp.availableWorkers.add(workerId)
trackedExp.noAvailableWorkersReasons.delete(workerId)
} else {
trackedExp.availableWorkers.delete(workerId)
trackedExp.noAvailableWorkersReasons.set(workerId, {
user: support.reason.user,
tech: `${workerId}: ${support.reason.tech}`,
})
}
} catch (err) {
trackedExp.availableWorkers.delete(workerId)
if ((err + '').match(/timeout/i)) {
trackedExp.noAvailableWorkersReasons.set(workerId, {
user: 'Worker timed out',
tech: `${workerId}: Timeout in doYouSupportExpectation()`,
})
} else {
trackedExp.noAvailableWorkersReasons.set(workerId, {
user: 'Error in Worker',
tech: `${workerId}: Error thrown: ${stringifyError(err)}`,
})
}
}
}
})
)
return {
hasQueriedAnyone,
workerCount: workerAgents.length,
}
}
/**
* Goes through a list of workers and determine which of them is the "cheapest" one to handle a certain Expectation.
* @returns the best worker (and some metadata)
*/
public async determineBestWorkerForExpectation(trackedExp: TrackedExpectation): Promise<{
bestWorker: WorkerAgentAssignment | undefined
noCostReason: Reason
}> {
/** How many requests to send out simultaneously */
const BATCH_SIZE = 10
/** If we've gotten this amount of positive answers, we won't be asking more workers */
const minWorkerCount = 5
let countQueried = 0
let countInfinite = 0
const workerIds = Array.from(trackedExp.availableWorkers.keys())
let noCostReason: Reason | undefined = undefined
const workerCosts: WorkerAgentAssignment[] = []
// We're using PromisePool to query a batch of workers at a time:
await PromisePool.for(workerIds)
.withConcurrency(BATCH_SIZE)
.handleError(async (error, workerId: WorkerAgentId) => {
// Log the error
this.logger.error(`Error in assignWorkerToSession for worker "${workerId}": ${stringifyError(error)}`)
})
.process(async (workerId: WorkerAgentId) => {
// Abort if we have gotten enough answers:
if (workerCosts.length >= minWorkerCount) return
const workerAgent = this.get(workerId)
if (workerAgent) {
try {
countQueried++
const cost = await workerAgent.api.getCostForExpectation(trackedExp.exp)
if (cost.cost !== null) {
// null means that the cost is "infinite"
workerCosts.push({
worker: workerAgent.api,
id: workerId,
cost,
randomCost: Math.random(), // To randomize if there are several with the same best cost
})
} else {
noCostReason = cost.reason
countInfinite++
}
} catch (error) {
noCostReason = {
user: 'Error: Internal Error',
tech: `${stringifyError(error, true)}`,
}
}
} else {
this.logger.error(`Worker "${workerId}" not found in determineBestWorkerForExpectation`)
}
})
workerCosts.sort((a, b) => {
// Lowest cost first:
const aCost: number = valueOfCost(a.cost.startCost) + valueOfCost(a.cost.cost)
const bCost: number = valueOfCost(b.cost.startCost) + valueOfCost(b.cost.cost)
if (aCost > bCost) return 1
if (aCost < bCost) return -1
// To randomize if there are several with the same best cost:
if (a.randomCost > b.randomCost) return 1
if (a.randomCost < b.randomCost) return -1
return 0
})
if (!noCostReason) {
noCostReason = {
user: `${countInfinite} workers are currently busy`,
tech:
`availableWorkers: ${trackedExp.availableWorkers.size}, ` +
`queriedWorkers: ${trackedExp.queriedWorkers.size}, ` +
`countQueried: ${countQueried}, ` +
`countInfinite: ${countInfinite} ` +
`(Worker costs: ${workerCosts.map((c) => `${c.id}: ${c.cost}`).join(', ')}`,
}
}
return {
bestWorker: workerCosts[0],
noCostReason,
}
}
/** Do a bidding between the available Workers and assign the cheapest one to use for the evaulation-session. */
public async assignWorkerToSession(trackedExp: TrackedExpectation): Promise<void> {
const session: ExpectationStateHandlerSession | null = trackedExp.session
if (!session) throw new Error('ExpectationManager: Internal error: Session not set')
if (session.assignedWorker) {
// A worker has already been assigned
trackedExp.noWorkerAssignedTime = null
return
}
// Remove any workers that no longer exist:
// (Like if a worker has shut down)
{
for (const workerId of trackedExp.availableWorkers.keys()) {
if (!this.get(workerId)) {
trackedExp.availableWorkers.delete(workerId)
}
}
for (const workerId of trackedExp.queriedWorkers.keys()) {
if (!this.get(workerId)) {
trackedExp.queriedWorkers.delete(workerId)
}
}
}
if (trackedExp.waitingForWorkerTime !== null) {
// If the expectation is waiting for a worker, it might be a good idea to update the list of available workers:
// (This can be useful for example if a new worker has just been registered)
await this.updateAvailableWorkersForExpectation(trackedExp)
}
if (!trackedExp.availableWorkers.size) {
session.noAssignedWorkerReason = { user: `No workers available`, tech: `No workers available` }
}
// Send a number of requests simultaneously:
const { bestWorker, noCostReason } = await this.determineBestWorkerForExpectation(trackedExp)
if (bestWorker) {
session.assignedWorker = bestWorker
trackedExp.noWorkerAssignedTime = null
} else {
session.noAssignedWorkerReason = {
user: `Waiting for a free worker, ${noCostReason.user}`,
tech: `Waiting for a free worker, ${noCostReason.tech}`,
}
}
}
}
export interface TrackedWorkerAgent {
api: WorkerAgentAPI
connected: boolean
}