77use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \IndexingJob ;
88use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \LoggerTrait ;
99use Flowpack \ElasticSearch \ContentRepositoryQueueIndexer \UpdateAliasJob ;
10+ use Flowpack \ElasticSearch \Domain \Model \Mapping ;
11+ use Flowpack \JobQueue \Common \Exception ;
1012use Flowpack \JobQueue \Common \Job \JobManager ;
1113use Flowpack \JobQueue \Common \Queue \QueueManager ;
14+ use Neos \ContentRepository \Domain \Repository \WorkspaceRepository ;
1215use Neos \Flow \Annotations as Flow ;
1316use Neos \Flow \Cli \CommandController ;
1417use Neos \Flow \Persistence \PersistenceManagerInterface ;
15- use Neos \ContentRepository \Domain \Repository \WorkspaceRepository ;
1618use Neos \Utility \Files ;
1719
1820/**
@@ -26,7 +28,6 @@ class NodeIndexQueueCommandController extends CommandController
2628
2729 const BATCH_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer ' ;
2830 const LIVE_QUEUE_NAME = 'Flowpack.ElasticSearch.ContentRepositoryQueueIndexer.Live ' ;
29- const DEFAULT_BATCH_SIZE = 500 ;
3031
3132 /**
3233 * @var JobManager
@@ -71,23 +72,25 @@ class NodeIndexQueueCommandController extends CommandController
7172 protected $ nodeIndexer ;
7273
7374 /**
74- * @Flow\InjectConfiguration(package="Flowpack.ElasticSearch.ContentRepositoryQueueIndexer ")
75- * @var array
75+ * @Flow\InjectConfiguration(path="batchSize ")
76+ * @var int
7677 */
77- protected $ settings ;
78+ protected $ batchSize ;
7879
7980 /**
8081 * Index all nodes by creating a new index and when everything was completed, switch the index alias.
8182 *
8283 * @param string $workspace
84+ * @throws \Flowpack\JobQueue\Common\Exception
85+ * @throws \Neos\Flow\Mvc\Exception\StopActionException
86+ * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
8387 */
8488 public function buildCommand ($ workspace = null )
8589 {
8690 $ indexPostfix = time ();
8791 $ indexName = $ this ->createNextIndex ($ indexPostfix );
8892 $ this ->updateMapping ();
8993
90-
9194 $ this ->outputLine ();
9295 $ this ->outputLine ('<b>Indexing on %s ...</b> ' , [$ indexName ]);
9396
@@ -121,6 +124,7 @@ public function buildCommand($workspace = null)
121124 * @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
122125 * @param bool $verbose Output debugging information
123126 * @return void
127+ * @throws \Neos\Flow\Mvc\Exception\StopActionException
124128 */
125129 public function workCommand ($ queue = 'batch ' , $ exitAfter = null , $ limit = null , $ verbose = false )
126130 {
@@ -152,8 +156,8 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
152156 }
153157 try {
154158 $ message = $ this ->jobManager ->waitAndExecute ($ queueName , $ timeout );
155- } catch (JobQueueException $ exception ) {
156- $ numberOfJobExecutions ++;
159+ } catch (Exception $ exception ) {
160+ $ numberOfJobExecutions ++;
157161 $ this ->outputLine ('<error>%s</error> ' , [$ exception ->getMessage ()]);
158162 if ($ verbose && $ exception ->getPrevious () instanceof \Exception) {
159163 $ this ->outputLine (' Reason: %s ' , [$ exception ->getPrevious ()->getMessage ()]);
@@ -163,7 +167,7 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
163167 $ this ->quit (1 );
164168 }
165169 if ($ message !== null ) {
166- $ numberOfJobExecutions ++;
170+ $ numberOfJobExecutions ++;
167171 if ($ verbose ) {
168172 $ messagePayload = strlen ($ message ->getPayload ()) <= 50 ? $ message ->getPayload () : substr ($ message ->getPayload (), 0 , 50 ) . '... ' ;
169173 $ this ->outputLine ('<success>Successfully executed job "%s" (%s)</success> ' , [$ message ->getIdentifier (), $ messagePayload ]);
@@ -181,7 +185,6 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
181185 }
182186 $ this ->quit ();
183187 }
184-
185188 } while (true );
186189 }
187190
@@ -190,8 +193,12 @@ public function workCommand($queue = 'batch', $exitAfter = null, $limit = null,
190193 */
191194 public function flushCommand ()
192195 {
193- $ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->flush ();
194- $ this ->outputSystemReport ();
196+ try {
197+ $ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->flush ();
198+ $ this ->outputSystemReport ();
199+ } catch (Exception $ exception ) {
200+ $ this ->outputLine ('An error occurred: %s ' , [$ exception ->getMessage ()]);
201+ }
195202 $ this ->outputLine ();
196203 }
197204
@@ -205,7 +212,11 @@ protected function outputSystemReport()
205212 $ time = microtime (true ) - $ _SERVER ["REQUEST_TIME_FLOAT " ];
206213 $ this ->outputLine ('Execution time : %s seconds ' , [$ time ]);
207214 $ this ->outputLine ('Indexing Queue : %s ' , [self ::BATCH_QUEUE_NAME ]);
208- $ this ->outputLine ('Pending Jobs : %s ' , [$ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->count ()]);
215+ try {
216+ $ this ->outputLine ('Pending Jobs : %s ' , [$ this ->queueManager ->getQueue (self ::BATCH_QUEUE_NAME )->count ()]);
217+ } catch (Exception $ exception ) {
218+ $ this ->outputLine ('Pending Jobs : Error, queue not found, %s ' , [$ exception ->getMessage ()]);
219+ }
209220 }
210221
211222 /**
@@ -217,16 +228,19 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
217228 $ this ->outputLine ('<info>++</info> Indexing %s workspace ' , [$ workspaceName ]);
218229 $ nodeCounter = 0 ;
219230 $ offset = 0 ;
220- $ batchSize = $ this ->settings ['batchSize ' ] ?? static ::DEFAULT_BATCH_SIZE ;
221231 while (true ) {
222- $ iterator = $ this ->nodeDataRepository ->findAllBySiteAndWorkspace ($ workspaceName , $ offset , $ batchSize );
232+ $ iterator = $ this ->nodeDataRepository ->findAllBySiteAndWorkspace ($ workspaceName , $ offset , $ this -> batchSize );
223233
224234 $ jobData = [];
225235
226236 foreach ($ this ->nodeDataRepository ->iterate ($ iterator ) as $ data ) {
227237 $ jobData [] = [
228- 'nodeIdentifier ' => $ data ['nodeIdentifier ' ],
229- 'dimensions ' => $ data ['dimensions ' ]
238+ 'persistenceObjectIdentifier ' => $ data ['persistenceObjectIdentifier ' ],
239+ 'identifier ' => $ data ['identifier ' ],
240+ 'dimensions ' => $ data ['dimensions ' ],
241+ 'workspace ' => $ workspaceName ,
242+ 'nodeType ' => $ data ['nodeType ' ],
243+ 'path ' => $ data ['path ' ],
230244 ];
231245 $ nodeCounter ++;
232246 }
@@ -238,7 +252,7 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
238252 $ indexingJob = new IndexingJob ($ indexPostfix , $ workspaceName , $ jobData );
239253 $ this ->jobManager ->queue (self ::BATCH_QUEUE_NAME , $ indexingJob );
240254 $ this ->output ('. ' );
241- $ offset += $ batchSize ;
255+ $ offset += $ this -> batchSize ;
242256 $ this ->persistenceManager ->clearState ();
243257 }
244258 $ this ->outputLine ();
@@ -249,17 +263,22 @@ protected function indexWorkspace($workspaceName, $indexPostfix)
249263 /**
250264 * @param string $indexPostfix
251265 * @return string
266+ * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
252267 */
253268 protected function createNextIndex ($ indexPostfix )
254269 {
255270 $ this ->nodeIndexer ->setIndexNamePostfix ($ indexPostfix );
256271 $ this ->nodeIndexer ->getIndex ()->create ();
257272 $ this ->log (sprintf ('action=indexing step=index-created index=%s ' , $ this ->nodeIndexer ->getIndexName ()), LOG_INFO );
273+
258274 return $ this ->nodeIndexer ->getIndexName ();
259275 }
260276
261277 /**
262278 * Update Index Mapping
279+ *
280+ * @return void
281+ * @throws \Flowpack\ElasticSearch\ContentRepositoryAdaptor\Exception
263282 */
264283 protected function updateMapping ()
265284 {
0 commit comments