Skip to content

Commit 63c1087

Browse files
authored
Merge pull request #96 from Graziel/remove_order_on_requestJob
rewrite requestJob a bit
2 parents ae70cea + 6e4be89 commit 63c1087

File tree

1 file changed

+29
-53
lines changed

1 file changed

+29
-53
lines changed

src/Model/Table/QueuedTasksTable.php

Lines changed: 29 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -184,29 +184,22 @@ public function getStats() {
184184
* @return array Taskdata.
185185
*/
186186
public function requestJob(array $capabilities, $group = null) {
187-
$whereClause = [];
188-
$wasFetched = [];
187+
$now = new Time();
188+
$nowStr = $now->toDateTimeString();
189189

190-
//$this->virtualFields['age'] = 'IFNULL(TIMESTAMPDIFF(SECOND, NOW(),notbefore), 0)';
190+
$query = $this->find();
191191
$findCond = [
192192
'conditions' => [
193193
'completed IS' => null,
194194
'OR' => [],
195195
],
196-
'fields' => function ($query) {
197-
return [
198-
'id',
199-
'jobtype',
200-
'fetched',
201-
//'age' => $query->func('IFNULL(TIMESTAMPDIFF(SECOND, NOW(), notbefore), 0)'),
202-
'age' => $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, NOW(), notbefore), 0)'),
203-
];
204-
},
196+
'fields' => [
197+
'age' => $query->newExpr()->add('IFNULL(TIMESTAMPDIFF(SECOND, "' . $nowStr . '", notbefore), 0)')
198+
],
205199
'order' => [
206200
'age ASC',
207201
'id ASC',
208-
],
209-
'limit' => 3,
202+
]
210203
];
211204

212205
if ($group !== null) {
@@ -216,18 +209,19 @@ public function requestJob(array $capabilities, $group = null) {
216209
// generate the task specific conditions.
217210
foreach ($capabilities as $task) {
218211
list($plugin, $name) = pluginSplit($task['name']);
212+
$timeoutAt = $now->copy();
219213
$tmp = [
220214
'jobtype' => $name,
221215
'AND' => [
222216
[
223217
'OR' => [
224-
'notbefore <' => new Time(),
218+
'notbefore <' => $now,
225219
'notbefore IS' => null,
226220
],
227221
],
228222
[
229223
'OR' => [
230-
'fetched <' => (new Time())->modify(sprintf('-%d seconds', $task['timeout'])),
224+
'fetched <' => $timeoutAt->subSeconds($task['timeout']),
231225
'fetched IS' => null,
232226
],
233227
],
@@ -240,51 +234,33 @@ public function requestJob(array $capabilities, $group = null) {
240234
$findCond['conditions']['OR'][] = $tmp;
241235
}
242236

243-
// First, find a list of a few of the oldest unfinished tasks.
244-
$data = $this->find('all', $findCond)->all()->toArray();
245-
if (!$data) {
237+
$job = $query->find('all', $findCond)
238+
->autoFields(true)
239+
->first();
240+
241+
if (!$job) {
246242
return null;
247243
}
248244

249-
// Generate a list of already fetched ID's and a where clause for the update statement
250-
$capTimeout = Hash::combine($capabilities, '{s}.name', '{s}.timeout');
251-
foreach ($data as $item) {
252-
$whereClause[] = '(id = ' . $item['id'] . ' AND (workerkey IS NULL OR fetched <= "' . date('Y-m-d H:i:s', time() - $capTimeout[$item['jobtype']]) . '"))';
253-
if (!empty($item['fetched'])) {
254-
$wasFetched[] = $item['id'];
255-
}
245+
if ($job->fetched) {
246+
$job = $this->patchEntity($job, [
247+
'failed' => $job->failed + 1,
248+
'failure_message' => 'Restart after timeout'
249+
]);
250+
251+
$this->save($job, ['fieldList' => ['id', 'failed', 'failure_message']]);
256252
}
257253

258254
$key = $this->key();
259-
//debug($key);ob_flush();
260-
261-
// try to update one of the found tasks with the key of this worker.
262-
$virtualFields['age'] = 'IFNULL(TIMESTAMPDIFF(SECOND, NOW(),notbefore), 0)';
263-
$this->_connection->query('UPDATE ' . $this->table() . ' SET workerkey = "' . $key . '", fetched = "' . date('Y-m-d H:i:s') . '" WHERE ' . implode(' OR ', $whereClause) . ' ORDER BY ' . $virtualFields['age'] . ' ASC, id ASC LIMIT 1');
264-
265-
// Read which one actually got updated, which is the job we are supposed to execute.
266-
$data = $this->find('all', [
267-
'conditions' => [
268-
'workerkey' => $key,
269-
'completed IS' => null,
270-
],
271-
'order' => ['fetched' => 'DESC'],
272-
])->first();
255+
$job = $this->patchEntity($job, [
256+
'workerkey' => $key,
257+
'fetched' => $now
258+
]);
273259

274-
if (!$data) {
275-
return null;
276-
}
260+
$this->save($job);
261+
$this->rateHistory[$job['jobtype']] = $now->toUnixString();
277262

278-
// If the job had an existing fetched timestamp, increment the failure counter
279-
if (in_array($data['id'], $wasFetched)) {
280-
$data['failed']++;
281-
$data['failure_message'] = 'Restart after timeout';
282-
//$this->id = $data['id'];
283-
$this->save($data, ['fieldList' => ['id', 'failed', 'failure_message']]);
284-
}
285-
//save last fetch by type for Rate Limiting.
286-
$this->rateHistory[$data['jobtype']] = (new Time())->toUnixString();
287-
return $data;
263+
return $job;
288264
}
289265

290266
/**

0 commit comments

Comments
 (0)