11<?php
22namespace Enqueue \ElasticaBundle \Elastica ;
33
4- use Enqueue \Psr \PsrContext ;
5- use Enqueue \Util \JSON ;
4+ use Enqueue \Client \ProducerInterface ;
5+ use Enqueue \ElasticaBundle \Async \Commands ;
6+ use Enqueue \Rpc \Promise ;
67use FOS \ElasticaBundle \Doctrine \ORM \Provider ;
78
89class AsyncDoctrineOrmProvider extends Provider
910{
11+ /**
12+ * @var int
13+ */
1014 private $ batchSize ;
1115
1216 /**
13- * @var PsrContext
17+ * @var ProducerInterface
1418 */
15- private $ context ;
19+ private $ producer ;
1620
1721 /**
18- * @param PsrContext $context
22+ * @param ProducerInterface $producer
1923 */
20- public function setContext (PsrContext $ context )
24+ public function setContext (ProducerInterface $ producer )
2125 {
22- $ this ->context = $ context ;
26+ $ this ->producer = $ producer ;
2327 }
2428
2529 /**
@@ -42,49 +46,41 @@ protected function doPopulate($options, \Closure $loggerClosure = null)
4246 $ nbObjects = $ this ->countObjects ($ queryBuilder );
4347 $ offset = $ options ['offset ' ];
4448
45- $ queue = $ this ->context ->createQueue ('fos_elastica_populate ' );
46- $ resultQueue = $ this ->context ->createTemporaryQueue ();
47- $ consumer = $ this ->context ->createConsumer ($ resultQueue );
48-
49- $ producer = $ this ->context ->createProducer ();
50-
51- $ nbMessages = 0 ;
49+ /** @var Promise[] $promises */
50+ $ promises = [];
5251 for (; $ offset < $ nbObjects ; $ offset += $ options ['batch_size ' ]) {
5352 $ options ['offset ' ] = $ offset ;
5453 $ options ['real_populate ' ] = true ;
55- $ message = $ this ->context ->createMessage (JSON ::encode ($ options ));
56- $ message ->setReplyTo ($ resultQueue ->getQueueName ());
57- $ producer ->send ($ queue , $ message );
5854
59- $ nbMessages ++ ;
55+ $ promises [] = $ this -> producer -> sendCommand (Commands:: POPULATE , $ options , true ) ;
6056 }
6157
6258 $ limitTime = time () + 180 ;
63- while ($ nbMessages ) {
64- if ($ message = $ consumer ->receive (20000 )) {
65- $ errorMessage = null ;
66-
67- $ errorMessage = null ;
68- if (false == $ message ->getProperty ('fos-populate-successful ' , false )) {
69- $ errorMessage = sprintf (
70- '<error>Batch failed: </error> <comment>Failed to process message %s</comment> ' ,
71- $ message ->getBody ()
72- );
73- }
74-
75- if ($ loggerClosure ) {
76- $ loggerClosure ($ options ['batch_size ' ], $ nbObjects , $ errorMessage );
59+ while ($ promises ) {
60+ foreach ($ promises as $ index => $ promise ) {
61+ if ($ message = $ promise ->receiveNoWait ()) {
62+ unset($ promises [$ index ]);
63+
64+ $ errorMessage = null ;
65+ if (false == $ message ->getProperty ('fos-populate-successful ' , false )) {
66+ $ errorMessage = sprintf (
67+ '<error>Batch failed: </error> <comment>Failed to process message %s</comment> ' ,
68+ $ message ->getBody ()
69+ );
70+ }
71+
72+ if ($ loggerClosure ) {
73+ $ loggerClosure ($ options ['batch_size ' ], $ nbObjects , $ errorMessage );
74+ }
75+
76+ $ limitTime = time () + 180 ;
7777 }
7878
79- $ consumer ->acknowledge ($ message );
80-
81- $ nbMessages --;
79+ sleep (1 );
8280
83- $ limitTime = time () + 180 ;
84- }
85-
86- if (time () > $ limitTime ) {
87- throw new \LogicException (sprintf ('No response in %d seconds ' , 180 ));
81+ if (time () > $ limitTime ) {
82+ throw new \LogicException (sprintf ('No response in %d seconds ' , 180 ));
83+ }
8884 }
8985 }
9086 }
0 commit comments