Skip to content

Commit cd4bfd4

Browse files
committed
FEATURE: Support live indexing based on JobQueue
1 parent 83b7622 commit cd4bfd4

File tree

5 files changed

+123
-21
lines changed

5 files changed

+123
-21
lines changed

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ class NodeIndexQueueCommandController extends CommandController
2626
{
2727
use LoggerTrait;
2828

29-
const QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
29+
const BATCH_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
30+
const LIVE_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live';
3031

3132
/**
3233
* @var JobManager
@@ -84,9 +85,9 @@ public function buildCommand($workspace = null)
8485
$this->outputLine();
8586
$this->outputLine('<b>Indexing on %s ...</b>', [$indexName]);
8687

87-
$pendingJobs = $this->queueManager->getQueue(self::QUEUE_NAME)->count();
88+
$pendingJobs = $this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->count();
8889
if ($pendingJobs !== 0) {
89-
$this->outputLine('<error>!! </error> The queue "%s" is not empty (%d pending jobs), please flush the queue.', [self::QUEUE_NAME, $pendingJobs]);
90+
$this->outputLine('<error>!! </error> The queue "%s" is not empty (%d pending jobs), please flush the queue.', [self::BATCH_QUEUE_NAME, $pendingJobs]);
9091
$this->quit(1);
9192
}
9293

@@ -101,38 +102,50 @@ public function buildCommand($workspace = null)
101102
$this->indexWorkspace($workspace, $indexPostfix);
102103
}
103104
$updateAliasJob = new UpdateAliasJob($indexPostfix);
104-
$this->jobManager->queue(self::QUEUE_NAME, $updateAliasJob);
105+
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $updateAliasJob);
105106

106-
$this->outputLine("Indexing jobs created for queue %s with success ...", [self::QUEUE_NAME]);
107+
$this->outputLine("Indexing jobs created for queue %s with success ...", [self::BATCH_QUEUE_NAME]);
107108
$this->outputSystemReport();
108109
$this->outputLine();
109110
}
110111

111112
/**
113+
* @param string $queue Type of queue to process, can be "live" or "batch"
112114
* @param int $exitAfter If set, this command will exit after the given amount of seconds
113115
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
114116
* @param bool $verbose Output debugging information
115117
* @return void
116118
*/
117-
public function workCommand($exitAfter = null, $limit = null, $verbose = false)
119+
public function workCommand($queue = 'batch', $exitAfter = null, $limit = null, $verbose = false)
118120
{
121+
$allowedQueues = [
122+
'batch' => self::BATCH_QUEUE_NAME,
123+
'live' => self::LIVE_QUEUE_NAME
124+
];
125+
if (!isset($allowedQueues[$queue])) {
126+
$this->output('Invalid queue, should be "live" or "batch"');
127+
}
128+
$queueName = $allowedQueues[$queue];
129+
119130
if ($verbose) {
120-
$this->output('Watching queue <b>"%s"</b>', [self::QUEUE_NAME]);
131+
$this->output('Watching queue <b>"%s"</b>', [$queueName]);
121132
if ($exitAfter !== null) {
122133
$this->output(' for <b>%d</b> seconds', [$exitAfter]);
123134
}
124135
$this->outputLine('...');
125136
}
137+
126138
$startTime = time();
127139
$timeout = null;
128140
$numberOfJobExecutions = 0;
141+
129142
do {
130143
$message = null;
131144
if ($exitAfter !== null) {
132145
$timeout = max(1, $exitAfter - (time() - $startTime));
133146
}
134147
try {
135-
$message = $this->jobManager->waitAndExecute(self::QUEUE_NAME, $timeout);
148+
$message = $this->jobManager->waitAndExecute($queueName, $timeout);
136149
} catch (JobQueueException $exception) {
137150
$numberOfJobExecutions ++;
138151
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
@@ -171,7 +184,7 @@ public function workCommand($exitAfter = null, $limit = null, $verbose = false)
171184
*/
172185
public function flushCommand()
173186
{
174-
$this->queueManager->getQueue(self::QUEUE_NAME)->flush();
187+
$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->flush();
175188
$this->outputSystemReport();
176189
$this->outputLine();
177190
}
@@ -185,8 +198,8 @@ protected function outputSystemReport()
185198
$this->outputLine('Memory Usage : %s', [Files::bytesToSizeString(memory_get_peak_usage(true))]);
186199
$time = microtime(true) - $_SERVER["REQUEST_TIME_FLOAT"];
187200
$this->outputLine('Execution time : %s seconds', [$time]);
188-
$this->outputLine('Indexing Queue : %s', [self::QUEUE_NAME]);
189-
$this->outputLine('Pending Jobs : %s', [$this->queueManager->getQueue(self::QUEUE_NAME)->count()]);
201+
$this->outputLine('Indexing Queue : %s', [self::BATCH_QUEUE_NAME]);
202+
$this->outputLine('Pending Jobs : %s', [$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->count()]);
190203
}
191204

192205
/**
@@ -217,7 +230,7 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
217230
}
218231

219232
$indexingJob = new IndexingJob($indexPostfix, $workspaceName, $jobData);
220-
$this->jobManager->queue(self::QUEUE_NAME, $indexingJob);
233+
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $indexingJob);
221234
$this->output('.');
222235
$offset += $batchSize;
223236
$this->persistenceManager->clearState();
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Indexer;
3+
4+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor;
5+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Command\NodeIndexQueueCommandController;
6+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\IndexingJob;
7+
use Flowpack\JobQueue\Common\Job\JobManager;
8+
use TYPO3\Flow\Annotations as Flow;
9+
use TYPO3\TYPO3CR\Domain\Model\NodeInterface;
10+
11+
/**
12+
* ElasticSearch Indexing Job Interface
13+
*/
14+
class NodeIndexer extends ContentRepositoryAdaptor\Indexer\NodeIndexer
15+
{
16+
/**
17+
* @var JobManager
18+
* @Flow\Inject
19+
*/
20+
protected $jobManager;
21+
22+
/**
23+
* @var bool
24+
* @Flow\InjectConfiguration(path="enableLiveAsyncIndexing")
25+
*/
26+
protected $enableLiveAsyncIndexing;
27+
28+
/**
29+
* @param NodeInterface $node
30+
* @param string|null $targetWorkspaceName
31+
*/
32+
public function indexNode(NodeInterface $node, $targetWorkspaceName = null)
33+
{
34+
if ($this->enableLiveAsyncIndexing !== true) {
35+
parent::indexNode($node, $targetWorkspaceName);
36+
return;
37+
}
38+
$indexingJob = new IndexingJob($this->indexNamePostfix, $targetWorkspaceName, [
39+
[
40+
'nodeIdentifier' => $this->persistenceManager->getIdentifierByObject($node->getNodeData()),
41+
'dimensions' => $node->getDimensions()
42+
]
43+
]);
44+
$this->jobManager->queue(NodeIndexQueueCommandController::LIVE_QUEUE_NAME, $indexingJob);
45+
}
46+
}

Configuration/Objects.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
TYPO3\TYPO3CR\Search\Indexer\NodeIndexerInterface:
2+
className: Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Indexer\NodeIndexer

Configuration/Settings.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
Flowpack:
2+
3+
ElasticSearch:
4+
5+
ContentRepositoryQueueIndexer:
6+
7+
enableLiveAsyncIndexing: true
8+
29
JobQueue:
10+
311
Common:
412
queues:
513
'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer':
@@ -8,3 +16,10 @@ Flowpack:
816
options:
917
host: '127.0.0.1'
1018
port: 11300
19+
20+
'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live':
21+
className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
22+
executeIsolated: true
23+
options:
24+
host: '127.0.0.1'
25+
port: 11300

README.md

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,32 @@
33
This package can be used to index a huge amount of nodes in ElasticSearch indexes. This
44
package use Beanstalkd and the JobQueue package to handle ES indexing asynchronously.
55

6-
How to build indexing job
7-
-------------------------
6+
## Batch Indexing
7+
8+
### How to build indexing job
89

910
flow nodeindexqueue:build --workspace live
1011

11-
How to process indexing job
12-
---------------------------
12+
### How to process indexing job
1313

1414
You can use this CLI command to process indexing job:
1515

16-
flow nodeindexqueue:work
16+
flow nodeindexqueue:work --queue batch
17+
18+
## Live Indexing
19+
20+
You can enable async live indexing by editing ```Settings.yaml```:
21+
22+
Flowpack:
23+
ElasticSearch:
24+
ContentRepositoryQueueIndexer:
25+
enableLiveAsyncIndexing: true
26+
27+
You can use this CLI command to process indexing job:
28+
29+
flow nodeindexqueue:work --queue live
30+
31+
## Supervisord configuration
1732

1833
You can use tools like ```supervisord``` to manage long runing process. Bellow you can
1934
found a basic configuration:
@@ -22,12 +37,23 @@ found a basic configuration:
2237

2338
[supervisorctl]
2439

25-
[program:elasticsearch_indexing]
26-
command=php flow job:work --queue Flowpack.ElasticSearch.ContentRepositoryQueueIndexer --limit 5000
40+
[program:elasticsearch_batch_indexing]
41+
command=php flow nodeindexqueue:work --queue batch
42+
stdout_logfile=AUTO
43+
stderr_logfile=AUTO
44+
numprocs=4
45+
process_name=elasticsearch_batch_indexing_%(process_num)02d
46+
environment=FLOW_CONTEXT="Production"
47+
autostart=true
48+
autorestart=true
49+
stopsignal=QUIT
50+
51+
[program:elasticsearch_live_indexing]
52+
command=php flow nodeindexqueue:live --queue live
2753
stdout_logfile=AUTO
2854
stderr_logfile=AUTO
29-
numprocs=12
30-
process_name=elasticsearch_indexing_%(process_num)02d
55+
numprocs=4
56+
process_name=elasticsearch_live_indexing_%(process_num)02d
3157
environment=FLOW_CONTEXT="Production"
3258
autostart=true
3359
autorestart=true

0 commit comments

Comments
 (0)