Skip to content

Commit 52d4a94

Browse files
committed
Merge branch 'master' into johannessteu-master
# Conflicts: # Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php # Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/IndexingJob.php # Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/UpdateAliasJob.php # Configuration/Settings.yaml # composer.json
2 parents 12db0b1 + dbc595d commit 52d4a94

File tree

9 files changed

+356
-128
lines changed

9 files changed

+356
-128
lines changed

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php

Lines changed: 134 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Mapping\NodeTypeMappingBuilder;
77
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Domain\Repository\NodeDataRepository;
88
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\IndexingJob;
9+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\LoggerTrait;
910
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\UpdateAliasJob;
1011
use Flowpack\JobQueue\Common\Job\JobManager;
1112
use Neos\Flow\Annotations as Flow;
@@ -20,47 +21,52 @@
2021
*/
2122
class NodeIndexQueueCommandController extends CommandController
2223
{
24+
use LoggerTrait;
25+
26+
const BATCH_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
27+
const LIVE_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live';
28+
2329
/**
24-
* @Flow\Inject
2530
* @var JobManager
31+
* @Flow\Inject
2632
*/
2733
protected $jobManager;
2834

35+
/**
36+
* @var QueueManager
37+
* @Flow\Inject
38+
*/
39+
protected $queueManager;
40+
2941
/**
3042
* @var PersistenceManagerInterface
3143
* @Flow\Inject
3244
*/
3345
protected $persistenceManager;
3446

3547
/**
36-
* @Flow\Inject
3748
* @var NodeTypeMappingBuilder
49+
* @Flow\Inject
3850
*/
3951
protected $nodeTypeMappingBuilder;
4052

4153
/**
42-
* @Flow\Inject
4354
* @var NodeDataRepository
55+
* @Flow\Inject
4456
*/
4557
protected $nodeDataRepository;
4658

4759
/**
48-
* @Flow\Inject
4960
* @var WorkspaceRepository
61+
* @Flow\Inject
5062
*/
5163
protected $workspaceRepository;
5264

5365
/**
54-
* @Flow\Inject
5566
* @var NodeIndexer
56-
*/
57-
protected $nodeIndexer;
58-
59-
/**
6067
* @Flow\Inject
61-
* @var LoggerInterface
6268
*/
63-
protected $logger;
69+
protected $nodeIndexer;
6470

6571
/**
6672
* Index all nodes by creating a new index and when everything was completed, switch the index alias.
@@ -70,29 +76,128 @@ class NodeIndexQueueCommandController extends CommandController
7076
public function buildCommand($workspace = null)
7177
{
7278
$indexPostfix = time();
73-
$this->createNextIndex($indexPostfix);
79+
$indexName = $this->createNextIndex($indexPostfix);
7480
$this->updateMapping();
7581

76-
$this->outputLine(sprintf('Indexing on %s ... ', $indexPostfix));
82+
83+
$this->outputLine();
84+
$this->outputLine('<b>Indexing on %s ...</b>', [$indexName]);
85+
86+
$pendingJobs = $this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->count();
87+
if ($pendingJobs !== 0) {
88+
$this->outputLine('<error>!! </error> The queue "%s" is not empty (%d pending jobs), please flush the queue.', [self::BATCH_QUEUE_NAME, $pendingJobs]);
89+
$this->quit(1);
90+
}
7791

7892
if ($workspace === null) {
7993
foreach ($this->workspaceRepository->findAll() as $workspace) {
8094
$workspace = $workspace->getName();
8195
$this->outputLine();
82-
$this->outputLine(sprintf('<info>++</info> Indexing %s workspace', $workspace));
8396
$this->indexWorkspace($workspace, $indexPostfix);
8497
}
8598
} else {
8699
$this->outputLine();
87-
$this->outputLine(sprintf('<info>++</info> Indexing only %s workspace', $workspace));
88100
$this->indexWorkspace($workspace, $indexPostfix);
89101
}
90102
$updateAliasJob = new UpdateAliasJob($indexPostfix);
91-
$queueName = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
92-
$this->jobManager->queue($queueName, $updateAliasJob);
103+
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $updateAliasJob);
104+
105+
$this->outputLine("Indexing jobs created for queue %s with success ...", [self::BATCH_QUEUE_NAME]);
106+
$this->outputSystemReport();
107+
$this->outputLine();
108+
}
93109

110+
/**
111+
* @param string $queue Type of queue to process, can be "live" or "batch"
112+
* @param int $exitAfter If set, this command will exit after the given amount of seconds
113+
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
114+
* @param bool $verbose Output debugging information
115+
* @return void
116+
*/
117+
public function workCommand($queue = 'batch', $exitAfter = null, $limit = null, $verbose = false)
118+
{
119+
$allowedQueues = [
120+
'batch' => self::BATCH_QUEUE_NAME,
121+
'live' => self::LIVE_QUEUE_NAME
122+
];
123+
if (!isset($allowedQueues[$queue])) {
124+
$this->output('Invalid queue, should be "live" or "batch"');
125+
}
126+
$queueName = $allowedQueues[$queue];
127+
128+
if ($verbose) {
129+
$this->output('Watching queue <b>"%s"</b>', [$queueName]);
130+
if ($exitAfter !== null) {
131+
$this->output(' for <b>%d</b> seconds', [$exitAfter]);
132+
}
133+
$this->outputLine('...');
134+
}
135+
136+
$startTime = time();
137+
$timeout = null;
138+
$numberOfJobExecutions = 0;
139+
140+
do {
141+
$message = null;
142+
if ($exitAfter !== null) {
143+
$timeout = max(1, $exitAfter - (time() - $startTime));
144+
}
145+
try {
146+
$message = $this->jobManager->waitAndExecute($queueName, $timeout);
147+
} catch (JobQueueException $exception) {
148+
$numberOfJobExecutions ++;
149+
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
150+
if ($verbose && $exception->getPrevious() instanceof \Exception) {
151+
$this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);
152+
}
153+
} catch (\Exception $exception) {
154+
$this->outputLine('<error>Unexpected exception during job execution: %s, aborting...</error>', [$exception->getMessage()]);
155+
$this->quit(1);
156+
}
157+
if ($message !== null) {
158+
$numberOfJobExecutions ++;
159+
if ($verbose) {
160+
$messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...';
161+
$this->outputLine('<success>Successfully executed job "%s" (%s)</success>', [$message->getIdentifier(), $messagePayload]);
162+
}
163+
}
164+
if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) {
165+
if ($verbose) {
166+
$this->outputLine('Quitting after %d seconds due to <i>--exit-after</i> flag', [time() - $startTime]);
167+
}
168+
$this->quit();
169+
}
170+
if ($limit !== null && $numberOfJobExecutions >= $limit) {
171+
if ($verbose) {
172+
$this->outputLine('Quitting after %d executed job%s due to <i>--limit</i> flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']);
173+
}
174+
$this->quit();
175+
}
176+
177+
} while (true);
178+
}
179+
180+
/**
181+
* Flush the index queue
182+
*/
183+
public function flushCommand()
184+
{
185+
$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->flush();
186+
$this->outputSystemReport();
94187
$this->outputLine();
95-
$this->outputLine(sprintf('Indexing jobs created for queue %s with success ...', $queueName));
188+
}
189+
190+
/**
191+
* Output system report for CLI commands
192+
*/
193+
protected function outputSystemReport()
194+
{
195+
$this->outputLine();
196+
$this->outputLine('Memory Usage : %s', [Files::bytesToSizeString(memory_get_peak_usage(true))]);
197+
$time = microtime(true) - $_SERVER["REQUEST_TIME_FLOAT"];
198+
$this->outputLine('Execution time : %s seconds', [$time]);
199+
$this->outputLine('Indexing Queue : %s', [self::BATCH_QUEUE_NAME]);
200+
$this->outputLine('Pending Jobs : %s', [$this->queueManager->getQueue(self::BATCH_QUEUE_NAME)->count()]);
96201
}
97202

98203
/**
@@ -101,6 +206,8 @@ public function buildCommand($workspace = null)
101206
*/
102207
protected function indexWorkspace($workspaceName, $indexPostfix)
103208
{
209+
$this->outputLine('<info>++</info> Indexing %s workspace', [$workspaceName]);
210+
$nodeCounter = 0;
104211
$offset = 0;
105212
$batchSize = 500;
106213
while (true) {
@@ -113,30 +220,34 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
113220
'nodeIdentifier' => $data['nodeIdentifier'],
114221
'dimensions' => $data['dimensions']
115222
];
223+
$nodeCounter++;
116224
}
117225

118226
if ($jobData === []) {
119227
break;
120228
}
121229

122230
$indexingJob = new IndexingJob($indexPostfix, $workspaceName, $jobData);
123-
$this->jobManager->queue('Flowpack.ElasticSearch.ContentRepositoryQueueIndexer', $indexingJob);
231+
$this->jobManager->queue(self::BATCH_QUEUE_NAME, $indexingJob);
124232
$this->output('.');
125233
$offset += $batchSize;
126234
$this->persistenceManager->clearState();
127235
}
128236
$this->outputLine();
237+
$this->outputLine("\nNumber of Nodes to be indexed in workspace '%s': %d", [$workspaceName, $nodeCounter]);
238+
$this->outputLine();
129239
}
130240

131241
/**
132-
* Create next index
133242
* @param string $indexPostfix
243+
* @return string
134244
*/
135245
protected function createNextIndex($indexPostfix)
136246
{
137247
$this->nodeIndexer->setIndexNamePostfix($indexPostfix);
138248
$this->nodeIndexer->getIndex()->create();
139-
$this->logger->log(sprintf('action=indexing step=index-created index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
249+
$this->log(sprintf('action=indexing step=index-created index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
250+
return $this->nodeIndexer->getIndexName();
140251
}
141252

142253
/**
@@ -146,9 +257,9 @@ protected function updateMapping()
146257
{
147258
$nodeTypeMappingCollection = $this->nodeTypeMappingBuilder->buildMappingInformation($this->nodeIndexer->getIndex());
148259
foreach ($nodeTypeMappingCollection as $mapping) {
149-
/** @var \Flowpack\ElasticSearch\Domain\Model\Mapping $mapping */
260+
/** @var Mapping $mapping */
150261
$mapping->apply();
151262
}
152-
$this->logger->log(sprintf('action=indexing step=mapping-updated index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
263+
$this->log(sprintf('action=indexing step=mapping-updated index=%s', $this->nodeIndexer->getIndexName()), LOG_INFO);
153264
}
154265
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Indexer;
3+
4+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor;
5+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Command\NodeIndexQueueCommandController;
6+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\IndexingJob;
7+
use Flowpack\JobQueue\Common\Job\JobManager;
8+
use TYPO3\Flow\Annotations as Flow;
9+
use TYPO3\Flow\Persistence\PersistenceManagerInterface;
10+
use TYPO3\TYPO3CR\Domain\Model\NodeInterface;
11+
12+
/**
13+
* ElasticSearch Indexing Job Interface
14+
*/
15+
class NodeIndexer extends ContentRepositoryAdaptor\Indexer\NodeIndexer
16+
{
17+
/**
18+
* @var JobManager
19+
* @Flow\Inject
20+
*/
21+
protected $jobManager;
22+
23+
/**
24+
* @var PersistenceManagerInterface
25+
* @Flow\Inject
26+
*/
27+
protected $persistenceManager;
28+
29+
/**
30+
* @var bool
31+
* @Flow\InjectConfiguration(path="enableLiveAsyncIndexing")
32+
*/
33+
protected $enableLiveAsyncIndexing;
34+
35+
/**
36+
* @param NodeInterface $node
37+
* @param string|null $targetWorkspaceName
38+
*/
39+
public function indexNode(NodeInterface $node, $targetWorkspaceName = null)
40+
{
41+
if ($this->enableLiveAsyncIndexing !== true) {
42+
parent::indexNode($node, $targetWorkspaceName);
43+
return;
44+
}
45+
$indexingJob = new IndexingJob($this->indexNamePostfix, $targetWorkspaceName, [
46+
[
47+
'nodeIdentifier' => $this->persistenceManager->getIdentifierByObject($node->getNodeData()),
48+
'dimensions' => $node->getDimensions()
49+
]
50+
]);
51+
$this->jobManager->queue(NodeIndexQueueCommandController::LIVE_QUEUE_NAME, $indexingJob);
52+
}
53+
}

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/IndexingJob.php

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
class IndexingJob implements JobInterface
2020
{
21+
use LoggerTrait;
22+
2123
/**
2224
* @var NodeIndexer
2325
* @Flow\Inject
@@ -42,12 +44,6 @@ class IndexingJob implements JobInterface
4244
*/
4345
protected $contextFactory;
4446

45-
/**
46-
* @var SystemLoggerInterface
47-
* @Flow\Inject
48-
*/
49-
protected $logger;
50-
5147
/**
5248
* @var string
5349
*/
@@ -107,20 +103,20 @@ public function execute(QueueInterface $queue, Message $message)
107103

108104
// Skip this iteration if the node can not be fetched from the current context
109105
if (!$currentNode instanceof NodeInterface) {
110-
$this->logger->log(sprintf('action=indexing step=failed node=%s message="Node could not be processed"', $node['nodeIdentifier']));
106+
$this->log(sprintf('action=indexing step=failed node=%s message="Node could not be processed"', $node['nodeIdentifier']));
111107
continue;
112108
}
113109

114110
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
115-
$this->logger->log(sprintf('action=indexing step=started node=%s', $currentNode->getIdentifier()));
111+
$this->log(sprintf('action=indexing step=started node=%s', $currentNode->getIdentifier()));
116112

117113
$this->nodeIndexer->indexNode($currentNode);
118114
}
119115

120116
$this->nodeIndexer->flush();
121117
$duration = microtime(true) - $startTime;
122118
$rate = $numberOfNodes / $duration;
123-
$this->logger->log(sprintf('action=indexing step=finished number_of_nodes=%d duration=%f nodes_per_second=%f', $numberOfNodes, $duration, $rate));
119+
$this->log(sprintf('action=indexing step=finished number_of_nodes=%d duration=%f nodes_per_second=%f', $numberOfNodes, $duration, $rate));
124120
});
125121

126122
return true;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer;
3+
4+
use TYPO3\Flow\Annotations as Flow;
5+
6+
/**
7+
* LoggerTrait
8+
*/
9+
trait LoggerTrait
10+
{
11+
/**
12+
* @var \TYPO3\Flow\Log\SystemLoggerInterface
13+
* @Flow\Inject
14+
*/
15+
protected $_logger;
16+
17+
/**
18+
* @param string $message The message to log
19+
* @param integer $severity An integer value, one of the LOG_* constants
20+
* @param mixed $additionalData A variable containing more information about the event to be logged
21+
* @param string $packageKey Key of the package triggering the log (determined automatically if not specified)
22+
* @param string $className Name of the class triggering the log (determined automatically if not specified)
23+
* @param string $methodName Name of the method triggering the log (determined automatically if not specified)
24+
* @return void
25+
* @api
26+
*/
27+
protected function log($message, $severity = LOG_INFO, $additionalData = null, $packageKey = null, $className = null, $methodName = null)
28+
{
29+
$packageKey = $packageKey ?: 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
30+
$this->_logger->log($message, $severity, $additionalData, $packageKey, $className, $methodName);
31+
}
32+
}

0 commit comments

Comments
 (0)