44use Enqueue \Client \CommandSubscriberInterface ;
55use Enqueue \Consumption \QueueSubscriberInterface ;
66use Enqueue \Util \JSON ;
7- use FOS \ElasticaBundle \Persister \ObjectPersisterInterface ;
7+ use FOS \ElasticaBundle \Persister \PersisterRegistry ;
88use FOS \ElasticaBundle \Provider \IndexableInterface ;
99use Interop \Queue \PsrContext ;
1010use Interop \Queue \PsrMessage ;
1414final class SyncIndexWithDoctrineORMObjectChangeProcessor implements PsrProcessor, CommandSubscriberInterface, QueueSubscriberInterface
1515{
1616 /**
17- * @var ObjectPersisterInterface
17+ * @var PersisterRegistry
1818 */
19- private $ objectPersister ;
19+ private $ persisterRegistry ;
2020
2121 /**
2222 * @var IndexableInterface
@@ -28,9 +28,9 @@ final class SyncIndexWithDoctrineORMObjectChangeProcessor implements PsrProcesso
2828 */
2929 private $ doctrine ;
3030
31- public function __construct (RegistryInterface $ doctrine , ObjectPersisterInterface $ objectPersister , IndexableInterface $ indexable )
31+ public function __construct (RegistryInterface $ doctrine , PersisterRegistry $ persisterRegistry , IndexableInterface $ indexable )
3232 {
33- $ this ->objectPersister = $ objectPersister ;
33+ $ this ->persisterRegistry = $ persisterRegistry ;
3434 $ this ->indexable = $ indexable ;
3535 $ this ->doctrine = $ doctrine ;
3636 }
@@ -46,42 +46,43 @@ public function process(PsrMessage $message, PsrContext $context)
4646 return self ::REJECT ;
4747 }
4848
49- $ indexName = $ data ['indexName ' ];
50- $ typeName = $ data ['typeName ' ];
49+ $ index = $ data ['indexName ' ];
50+ $ type = $ data ['typeName ' ];
5151
52- $ objectRepository = $ this ->doctrine ->getManagerForClass ($ data ['modelClass ' ])->getRepository ($ data ['modelClass ' ]);
52+ $ repository = $ this ->doctrine ->getManagerForClass ($ data ['modelClass ' ])->getRepository ($ data ['modelClass ' ]);
53+ $ persister = $ this ->persisterRegistry ->getPersister ($ index , $ type );
5354
5455 switch ($ data ['action ' ]) {
5556 case 'update ' :
56- if (false == $ object = $ objectRepository ->find ($ data ['id ' ])) {
57- $ this -> objectPersister ->deleteById ($ data ['id ' ]);
57+ if (false == $ object = $ repository ->find ($ data ['id ' ])) {
58+ $ persister ->deleteById ($ data ['id ' ]);
5859
5960 return self ::REJECT ;
6061 }
6162
62- if ($ this -> objectPersister ->handlesObject ($ object )) {
63- if ($ this ->indexable ->isObjectIndexable ($ indexName , $ typeName , $ object )) {
64- $ this -> objectPersister ->replaceOne ($ object );
63+ if ($ persister ->handlesObject ($ object )) {
64+ if ($ this ->indexable ->isObjectIndexable ($ index , $ type , $ object )) {
65+ $ persister ->replaceOne ($ object );
6566 } else {
66- $ this -> objectPersister ->deleteOne ($ object );
67+ $ persister ->deleteOne ($ object );
6768 }
6869 }
6970
7071 break ;
7172 case 'insert ' :
72- if (false == $ object = $ objectRepository ->find ($ data ['id ' ])) {
73- $ this -> objectPersister ->deleteById ($ data ['id ' ]);
73+ if (false == $ object = $ repository ->find ($ data ['id ' ])) {
74+ $ persister ->deleteById ($ data ['id ' ]);
7475
7576 return self ::REJECT ;
7677 }
7778
78- if ($ this -> objectPersister -> handlesObject ($ object ) && $ this ->indexable ->isObjectIndexable ($ indexName , $ typeName , $ object )) {
79- $ this -> objectPersister ->insertOne ($ object );
79+ if ($ persister -> handlesObject ($ object ) && $ this ->indexable ->isObjectIndexable ($ index , $ type , $ object )) {
80+ $ persister ->insertOne ($ object );
8081 }
8182
8283 break ;
8384 case 'delete ' :
84- $ this -> objectPersister ->deleteById ($ data ['id ' ]);
85+ $ persister ->deleteById ($ data ['id ' ]);
8586
8687 break ;
8788 default :
0 commit comments