Skip to content

Commit 85195f5

Browse files
committed
Merge remote-tracking branch 'origin/2.0'
2 parents 0195768 + cd4bfd4 commit 85195f5

File tree

10 files changed

+462
-240
lines changed

10 files changed

+462
-240
lines changed

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Command/NodeIndexQueueCommandController.php

Lines changed: 247 additions & 135 deletions
Large diffs are not rendered by default.

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/Domain/Repository/NodeDataRepository.php

Lines changed: 51 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -12,61 +12,61 @@
1212
/**
1313
* @Flow\Scope("singleton")
1414
*/
15-
class NodeDataRepository extends Repository {
15+
class NodeDataRepository extends Repository
16+
{
17+
const ENTITY_CLASSNAME = 'TYPO3\TYPO3CR\Domain\Model\NodeData';
1618

17-
const ENTITY_CLASSNAME = 'TYPO3\TYPO3CR\Domain\Model\NodeData';
19+
/**
20+
* @Flow\Inject
21+
* @var ObjectManager
22+
*/
23+
protected $entityManager;
1824

19-
/**
20-
* @Flow\Inject
21-
* @var ObjectManager
22-
*/
23-
protected $entityManager;
25+
/**
26+
* @param string $workspaceName
27+
* @param integer $firstResult
28+
* @param integer $maxResults
29+
* @return IterableResult
30+
*/
31+
public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000)
32+
{
2433

25-
/**
26-
* @param string $workspaceName
27-
* @param integer $firstResult
28-
* @param integer $maxResults
29-
* @return IterableResult
30-
*/
31-
public function findAllBySiteAndWorkspace($workspaceName, $firstResult = 0, $maxResults = 1000) {
34+
/** @var QueryBuilder $queryBuilder */
35+
$queryBuilder = $this->entityManager->createQueryBuilder();
3236

33-
/** @var QueryBuilder $queryBuilder */
34-
$queryBuilder = $this->entityManager->createQueryBuilder();
37+
$queryBuilder->select('n.Persistence_Object_Identifier nodeIdentifier, n.dimensionValues dimensions')
38+
->from('TYPO3\TYPO3CR\Domain\Model\NodeData', 'n')
39+
->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL")
40+
->setFirstResult((integer)$firstResult)
41+
->setMaxResults((integer)$maxResults)
42+
->setParameters([
43+
':workspace' => $workspaceName,
44+
':removed' => false,
45+
]);
3546

36-
$queryBuilder->select('n.Persistence_Object_Identifier nodeIdentifier, n.dimensionValues dimensions')
37-
->from('TYPO3\TYPO3CR\Domain\Model\NodeData', 'n')
38-
->where("n.workspace = :workspace AND n.removed = :removed AND n.movedTo IS NULL")
39-
->setFirstResult((integer)$firstResult)
40-
->setMaxResults((integer)$maxResults)
41-
->setParameters([
42-
':workspace' => $workspaceName,
43-
':removed' => FALSE,
44-
]);
45-
46-
return $queryBuilder->getQuery()->iterate();
47-
}
48-
49-
/**
50-
* Iterator over an IterableResult and return a Generator
51-
*
52-
* This methos is useful for batch processing huge result set as it clear the object
53-
* manager and detach the current object on each iteration.
54-
*
55-
* @param IterableResult $iterator
56-
* @param callable $callback
57-
* @return \Generator
58-
*/
59-
public function iterate(IterableResult $iterator, callable $callback = null)
60-
{
61-
$iteration = 0;
62-
foreach ($iterator as $object) {
63-
$object = current($object);
64-
yield $object;
65-
if ($callback !== null) {
66-
call_user_func($callback, $iteration, $object);
67-
}
68-
++$iteration;
69-
}
70-
}
47+
return $queryBuilder->getQuery()->iterate();
48+
}
7149

50+
/**
51+
* Iterator over an IterableResult and return a Generator
52+
*
53+
* This methos is useful for batch processing huge result set as it clear the object
54+
* manager and detach the current object on each iteration.
55+
*
56+
* @param IterableResult $iterator
57+
* @param callable $callback
58+
* @return \Generator
59+
*/
60+
public function iterate(IterableResult $iterator, callable $callback = null)
61+
{
62+
$iteration = 0;
63+
foreach ($iterator as $object) {
64+
$object = current($object);
65+
yield $object;
66+
if ($callback !== null) {
67+
call_user_func($callback, $iteration, $object);
68+
}
69+
++$iteration;
70+
}
71+
}
7272
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Indexer;
3+
4+
use Flowpack\ElasticSearch\ContentRepositoryAdaptor;
5+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Command\NodeIndexQueueCommandController;
6+
use Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\IndexingJob;
7+
use Flowpack\JobQueue\Common\Job\JobManager;
8+
use TYPO3\Flow\Annotations as Flow;
9+
use TYPO3\TYPO3CR\Domain\Model\NodeInterface;
10+
11+
/**
12+
* ElasticSearch Indexing Job Interface
13+
*/
14+
class NodeIndexer extends ContentRepositoryAdaptor\Indexer\NodeIndexer
15+
{
16+
/**
17+
* @var JobManager
18+
* @Flow\Inject
19+
*/
20+
protected $jobManager;
21+
22+
/**
23+
* @var bool
24+
* @Flow\InjectConfiguration(path="enableLiveAsyncIndexing")
25+
*/
26+
protected $enableLiveAsyncIndexing;
27+
28+
/**
29+
* @param NodeInterface $node
30+
* @param string|null $targetWorkspaceName
31+
*/
32+
public function indexNode(NodeInterface $node, $targetWorkspaceName = null)
33+
{
34+
if ($this->enableLiveAsyncIndexing !== true) {
35+
parent::indexNode($node, $targetWorkspaceName);
36+
return;
37+
}
38+
$indexingJob = new IndexingJob($this->indexNamePostfix, $targetWorkspaceName, [
39+
[
40+
'nodeIdentifier' => $this->persistenceManager->getIdentifierByObject($node->getNodeData()),
41+
'dimensions' => $node->getDimensions()
42+
]
43+
]);
44+
$this->jobManager->queue(NodeIndexQueueCommandController::LIVE_QUEUE_NAME, $indexingJob);
45+
}
46+
}

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/IndexingJob.php

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer;
33

44
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer;
5+
use Flowpack\JobQueue\Common\Job\JobInterface;
6+
use Flowpack\JobQueue\Common\Queue\Message;
7+
use Flowpack\JobQueue\Common\Queue\QueueInterface;
58
use TYPO3\Flow\Annotations as Flow;
6-
use TYPO3\Flow\Log\SystemLoggerInterface;
79
use TYPO3\Flow\Utility\Algorithms;
8-
use TYPO3\Jobqueue\Common\Job\JobInterface;
9-
use TYPO3\Jobqueue\Common\Queue\Message;
10-
use TYPO3\Jobqueue\Common\Queue\QueueInterface;
1110
use TYPO3\TYPO3CR\Domain\Factory\NodeFactory;
1211
use TYPO3\TYPO3CR\Domain\Model\NodeData;
1312
use TYPO3\TYPO3CR\Domain\Model\NodeInterface;
@@ -19,6 +18,7 @@
1918
*/
2019
class IndexingJob implements JobInterface
2120
{
21+
use LoggerTrait;
2222

2323
/**
2424
* @var NodeIndexer
@@ -44,12 +44,6 @@ class IndexingJob implements JobInterface
4444
*/
4545
protected $contextFactory;
4646

47-
/**
48-
* @var SystemLoggerInterface
49-
* @Flow\Inject
50-
*/
51-
protected $logger;
52-
5347
/**
5448
* @var string
5549
*/
@@ -94,6 +88,8 @@ public function __construct($indexPostfix, $workspaceName, array $nodes)
9488
public function execute(QueueInterface $queue, Message $message)
9589
{
9690
$this->nodeIndexer->withBulkProcessing(function () {
91+
$numberOfNodes = count($this->nodes);
92+
$startTime = microtime(true);
9793
foreach ($this->nodes as $node) {
9894
/** @var NodeData $nodeData */
9995
$nodeData = $this->nodeDataRepository->findByIdentifier($node['nodeIdentifier']);
@@ -107,18 +103,20 @@ public function execute(QueueInterface $queue, Message $message)
107103

108104
// Skip this iteration if the node can not be fetched from the current context
109105
if (!$currentNode instanceof NodeInterface) {
110-
$this->logger->log(sprintf('Node with identifier %s could not be processed', $node['nodeIdentifier']));
106+
$this->log(sprintf('action=indexing step=failed node=%s message="Node could not be processed"', $node['nodeIdentifier']));
111107
continue;
112108
}
113109

114110
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
115-
$this->logger->log(sprintf('Process indexing job for %s', $currentNode));
111+
$this->log(sprintf('action=indexing step=started node=%s', $currentNode->getIdentifier()));
116112

117113
$this->nodeIndexer->indexNode($currentNode);
118-
119114
}
120115

121116
$this->nodeIndexer->flush();
117+
$duration = microtime(true) - $startTime;
118+
$rate = $numberOfNodes / $duration;
119+
$this->log(sprintf('action=indexing step=finished number_of_nodes=%d duration=%f nodes_per_second=%f', $numberOfNodes, $duration, $rate));
122120
});
123121

124122
return true;
@@ -143,5 +141,4 @@ public function getLabel()
143141
{
144142
return sprintf('ElasticSearch Indexing Job (%s)', $this->getIdentifier());
145143
}
146-
147144
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer;
3+
4+
use TYPO3\Flow\Annotations as Flow;
5+
6+
/**
7+
* LoggerTrait
8+
*/
9+
trait LoggerTrait
10+
{
11+
/**
12+
* @var \TYPO3\Flow\Log\SystemLoggerInterface
13+
* @Flow\Inject
14+
*/
15+
protected $_logger;
16+
17+
/**
18+
* @param string $message The message to log
19+
* @param integer $severity An integer value, one of the LOG_* constants
20+
* @param mixed $additionalData A variable containing more information about the event to be logged
21+
* @param string $packageKey Key of the package triggering the log (determined automatically if not specified)
22+
* @param string $className Name of the class triggering the log (determined automatically if not specified)
23+
* @param string $methodName Name of the method triggering the log (determined automatically if not specified)
24+
* @return void
25+
* @api
26+
*/
27+
protected function log($message, $severity = LOG_INFO, $additionalData = null, $packageKey = null, $className = null, $methodName = null)
28+
{
29+
$packageKey = $packageKey ?: 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer';
30+
$this->_logger->log($message, $severity, $additionalData, $packageKey, $className, $methodName);
31+
}
32+
}

Classes/Flowpack/ElasticSearch/ContentRepositoryQueueIndexer/UpdateAliasJob.php

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,35 +2,25 @@
22
namespace Flowpack\ElasticSearch\ContentRepositoryQueueIndexer;
33

44
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\Indexer\NodeIndexer;
5-
use Flowpack\ElasticSearch\ContentRepositoryAdaptor\LoggerInterface;
5+
use Flowpack\JobQueue\Common\Job\JobInterface;
6+
use Flowpack\JobQueue\Common\Queue\Message;
7+
use Flowpack\JobQueue\Common\Queue\QueueInterface;
68
use TYPO3\Flow\Annotations as Flow;
79
use TYPO3\Flow\Utility\Algorithms;
8-
use TYPO3\Jobqueue\Common\Job\JobInterface;
9-
use TYPO3\Jobqueue\Common\Queue\Message;
10-
use TYPO3\Jobqueue\Common\Queue\QueueInterface;
11-
use TYPO3\TYPO3CR\Domain\Factory\NodeFactory;
12-
use TYPO3\TYPO3CR\Domain\Model\NodeData;
13-
use TYPO3\TYPO3CR\Domain\Model\NodeInterface;
14-
use TYPO3\TYPO3CR\Domain\Repository\NodeDataRepository;
15-
use TYPO3\TYPO3CR\Domain\Service\ContextFactory;
1610

1711
/**
1812
* ElasticSearch Indexing Job Interface
1913
*/
20-
class UpdateAliasJob implements JobInterface {
14+
class UpdateAliasJob implements JobInterface
15+
{
16+
use LoggerTrait;
2117

2218
/**
2319
* @var NodeIndexer
2420
* @Flow\Inject
2521
*/
2622
protected $nodeIndexer;
2723

28-
/**
29-
* @var LoggerInterface
30-
* @Flow\Inject
31-
*/
32-
protected $logger;
33-
3424
/**
3525
* @var string
3626
*/
@@ -43,10 +33,9 @@ class UpdateAliasJob implements JobInterface {
4333

4434
/**
4535
* @param string $indexPostfix
46-
* @param string $workspaceName
47-
* @param array $nodes
4836
*/
49-
public function __construct($indexPostfix) {
37+
public function __construct($indexPostfix)
38+
{
5039
$this->identifier = Algorithms::generateRandomString(24);
5140
$this->indexPostfix = $indexPostfix;
5241
}
@@ -59,20 +48,22 @@ public function __construct($indexPostfix) {
5948
* @param Message $message The original message
6049
* @return boolean TRUE if the job was executed successfully and the message should be finished
6150
*/
62-
public function execute(QueueInterface $queue, Message $message) {
51+
public function execute(QueueInterface $queue, Message $message)
52+
{
6353
$this->nodeIndexer->setIndexNamePostfix($this->indexPostfix);
6454
$this->nodeIndexer->updateIndexAlias();
65-
$this->logger->log(sprintf('Update Index Alias (%s)', $this->indexPostfix), LOG_NOTICE);
55+
$this->log(sprintf('action=indexing step=index-switched alias=%s', $this->indexPostfix), LOG_NOTICE);
6656

67-
return TRUE;
57+
return true;
6858
}
6959

7060
/**
7161
* Get an optional identifier for the job
7262
*
7363
* @return string A job identifier
7464
*/
75-
public function getIdentifier() {
65+
public function getIdentifier()
66+
{
7667
return $this->identifier;
7768
}
7869

@@ -81,8 +72,8 @@ public function getIdentifier() {
8172
*
8273
* @return string A label for the job
8374
*/
84-
public function getLabel() {
75+
public function getLabel()
76+
{
8577
return sprintf('ElasticSearch Indexing Job (%s)', $this->getIdentifier());
8678
}
87-
8879
}

Configuration/Objects.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
TYPO3\TYPO3CR\Search\Indexer\NodeIndexerInterface:
2+
className: Flowpack\ElasticSearch\ContentRepositoryQueueIndexer\Indexer\NodeIndexer

Configuration/Settings.yaml

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,25 @@
1-
TYPO3:
2-
Jobqueue:
1+
Flowpack:
2+
3+
ElasticSearch:
4+
5+
ContentRepositoryQueueIndexer:
6+
7+
enableLiveAsyncIndexing: true
8+
9+
JobQueue:
10+
311
Common:
412
queues:
513
'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer':
6-
className: 'TYPO3\Jobqueue\Beanstalkd\Queue\BeanstalkdQueue'
14+
className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
15+
executeIsolated: true
16+
options:
17+
host: '127.0.0.1'
18+
port: 11300
19+
20+
'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live':
21+
className: 'Flowpack\JobQueue\Beanstalkd\Queue\BeanstalkdQueue'
22+
executeIsolated: true
723
options:
824
host: '127.0.0.1'
925
port: 11300

0 commit comments

Comments
 (0)