Skip to content

Commit 601ccfb

Browse files
authored
fix(doctrine): move event listeners to doctrine/common (#6573)
1 parent 17c6b58 commit 601ccfb

21 files changed

+1011
-40
lines changed
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the API Platform project.
5+
*
6+
* (c) Kévin Dunglas <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace ApiPlatform\Doctrine\Common\EventListener;
15+
16+
use ApiPlatform\Api\IriConverterInterface as LegacyIriConverterInterface;
17+
use ApiPlatform\Api\ResourceClassResolverInterface as LegacyResourceClassResolverInterface;
18+
use ApiPlatform\Doctrine\Common\Messenger\DispatchTrait;
19+
use ApiPlatform\Exception\InvalidArgumentException;
20+
use ApiPlatform\Exception\OperationNotFoundException;
21+
use ApiPlatform\Exception\RuntimeException;
22+
use ApiPlatform\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
23+
use ApiPlatform\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
24+
use ApiPlatform\Metadata\HttpOperation;
25+
use ApiPlatform\Metadata\IriConverterInterface;
26+
use ApiPlatform\Metadata\Operation;
27+
use ApiPlatform\Metadata\Resource\Factory\ResourceMetadataCollectionFactoryInterface;
28+
use ApiPlatform\Metadata\ResourceClassResolverInterface;
29+
use ApiPlatform\Metadata\UrlGeneratorInterface;
30+
use ApiPlatform\Metadata\Util\ResourceClassInfoTrait;
31+
use Doctrine\Common\EventArgs;
32+
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
33+
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
34+
use Symfony\Component\ExpressionLanguage\ExpressionFunction;
35+
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
36+
use Symfony\Component\HttpFoundation\JsonResponse;
37+
use Symfony\Component\Mercure\HubRegistry;
38+
use Symfony\Component\Mercure\Update;
39+
use Symfony\Component\Messenger\MessageBusInterface;
40+
use Symfony\Component\Serializer\SerializerInterface;
41+
42+
/**
43+
* Publishes resources updates to the Mercure hub.
44+
*
45+
* @author Kévin Dunglas <[email protected]>
46+
*/
47+
final class PublishMercureUpdatesListener
48+
{
49+
use DispatchTrait;
50+
use ResourceClassInfoTrait;
51+
private const ALLOWED_KEYS = [
52+
'topics' => true,
53+
'data' => true,
54+
'private' => true,
55+
'id' => true,
56+
'type' => true,
57+
'retry' => true,
58+
'normalization_context' => true,
59+
'hub' => true,
60+
'enable_async_update' => true,
61+
];
62+
private readonly ?ExpressionLanguage $expressionLanguage;
63+
private \SplObjectStorage $createdObjects;
64+
private \SplObjectStorage $updatedObjects;
65+
private \SplObjectStorage $deletedObjects;
66+
67+
/**
68+
* @param array<string, string[]|string> $formats
69+
*/
70+
public function __construct(LegacyResourceClassResolverInterface|ResourceClassResolverInterface $resourceClassResolver, private readonly LegacyIriConverterInterface|IriConverterInterface $iriConverter, ResourceMetadataCollectionFactoryInterface $resourceMetadataFactory, private readonly SerializerInterface $serializer, private readonly array $formats, ?MessageBusInterface $messageBus = null, private readonly ?HubRegistry $hubRegistry = null, private readonly ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, private readonly ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ?ExpressionLanguage $expressionLanguage = null, private bool $includeType = false)
71+
{
72+
if (null === $messageBus && null === $hubRegistry) {
73+
throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
74+
}
75+
76+
$this->resourceClassResolver = $resourceClassResolver;
77+
78+
$this->resourceMetadataFactory = $resourceMetadataFactory;
79+
$this->messageBus = $messageBus;
80+
$this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
81+
$this->reset();
82+
83+
if ($this->expressionLanguage) {
84+
$rawurlencode = ExpressionFunction::fromPhp('rawurlencode', 'escape');
85+
$this->expressionLanguage->addFunction($rawurlencode);
86+
87+
$this->expressionLanguage->addFunction(
88+
new ExpressionFunction('get_operation', static fn (string $apiResource, string $name): string => \sprintf('getOperation(%s, %s)', $apiResource, $name), static fn (array $arguments, $apiResource, string $name): Operation => $resourceMetadataFactory->create($resourceClassResolver->getResourceClass($apiResource))->getOperation($name))
89+
);
90+
$this->expressionLanguage->addFunction(
91+
new ExpressionFunction('iri', static fn (string $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL, ?string $operation = null): string => \sprintf('iri(%s, %d, %s)', $apiResource, $referenceType, $operation), static fn (array $arguments, $apiResource, int $referenceType = UrlGeneratorInterface::ABS_URL, $operation = null): string => $iriConverter->getIriFromResource($apiResource, $referenceType, $operation))
92+
);
93+
}
94+
95+
if (false === $this->includeType) {
96+
trigger_deprecation('api-platform/core', '3.1', 'Having mercure.include_type (always include @type in Mercure updates, even delete ones) set to false in the configuration is deprecated. It will be true by default in API Platform 4.0.');
97+
}
98+
}
99+
100+
/**
101+
* Collects created, updated and deleted objects.
102+
*/
103+
public function onFlush(EventArgs $eventArgs): void
104+
{
105+
if ($eventArgs instanceof OrmOnFlushEventArgs) {
106+
// @phpstan-ignore-next-line
107+
$uow = method_exists($eventArgs, 'getObjectManager') ? $eventArgs->getObjectManager()->getUnitOfWork() : $eventArgs->getEntityManager()->getUnitOfWork();
108+
} elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
109+
$uow = $eventArgs->getDocumentManager()->getUnitOfWork();
110+
} else {
111+
return;
112+
}
113+
114+
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
115+
foreach ($uow->{$methodName}() as $object) {
116+
$this->storeObjectToPublish($object, 'createdObjects');
117+
}
118+
119+
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
120+
foreach ($uow->{$methodName}() as $object) {
121+
$this->storeObjectToPublish($object, 'updatedObjects');
122+
}
123+
124+
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
125+
foreach ($uow->{$methodName}() as $object) {
126+
$this->storeObjectToPublish($object, 'deletedObjects');
127+
}
128+
}
129+
130+
/**
131+
* Publishes updates for changes collected on flush, and resets the store.
132+
*/
133+
public function postFlush(): void
134+
{
135+
try {
136+
foreach ($this->createdObjects as $object) {
137+
$this->publishUpdate($object, $this->createdObjects[$object], 'create');
138+
}
139+
140+
foreach ($this->updatedObjects as $object) {
141+
$this->publishUpdate($object, $this->updatedObjects[$object], 'update');
142+
}
143+
144+
foreach ($this->deletedObjects as $object) {
145+
$this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
146+
}
147+
} finally {
148+
$this->reset();
149+
}
150+
}
151+
152+
private function reset(): void
153+
{
154+
$this->createdObjects = new \SplObjectStorage();
155+
$this->updatedObjects = new \SplObjectStorage();
156+
$this->deletedObjects = new \SplObjectStorage();
157+
}
158+
159+
private function storeObjectToPublish(object $object, string $property): void
160+
{
161+
if (null === $resourceClass = $this->getResourceClass($object)) {
162+
return;
163+
}
164+
165+
$operation = $this->resourceMetadataFactory->create($resourceClass)->getOperation();
166+
try {
167+
$options = $operation->getMercure() ?? false;
168+
} catch (OperationNotFoundException) {
169+
return;
170+
}
171+
172+
if (\is_string($options)) {
173+
if (null === $this->expressionLanguage) {
174+
throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
175+
}
176+
177+
$options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
178+
}
179+
180+
if (false === $options) {
181+
return;
182+
}
183+
184+
if (true === $options) {
185+
$options = [];
186+
}
187+
188+
if (!\is_array($options)) {
189+
throw new InvalidArgumentException(\sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
190+
}
191+
192+
foreach ($options as $key => $value) {
193+
if (!isset(self::ALLOWED_KEYS[$key])) {
194+
throw new InvalidArgumentException(\sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
195+
}
196+
}
197+
198+
$options['enable_async_update'] ??= true;
199+
200+
if ('deletedObjects' === $property) {
201+
$types = $operation instanceof HttpOperation ? $operation->getTypes() : null;
202+
if (null === $types) {
203+
$types = [$operation->getShortName()];
204+
}
205+
206+
// We need to evaluate it here, because in publishUpdate() the resource would be already deleted
207+
$this->evaluateTopics($options, $object);
208+
209+
$this->deletedObjects[(object) [
210+
'id' => $this->iriConverter->getIriFromResource($object),
211+
'iri' => $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL),
212+
'type' => 1 === \count($types) ? $types[0] : $types,
213+
]] = $options;
214+
215+
return;
216+
}
217+
218+
$this->{$property}[$object] = $options;
219+
}
220+
221+
private function publishUpdate(object $object, array $options, string $type): void
222+
{
223+
if ($object instanceof \stdClass) {
224+
// By convention, if the object has been deleted, we send only its IRI and its type.
225+
// This may change in the feature, because it's not JSON Merge Patch compliant,
226+
// and I'm not a fond of this approach.
227+
$iri = $options['topics'] ?? $object->iri;
228+
/** @var string $data */
229+
$data = json_encode(['@id' => $object->id] + ($this->includeType ? ['@type' => $object->type] : []), \JSON_THROW_ON_ERROR);
230+
} else {
231+
$resourceClass = $this->getObjectClass($object);
232+
$context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getOperation()->getNormalizationContext() ?? [];
233+
234+
// We need to evaluate it here, because in storeObjectToPublish() the resource would not have been persisted yet
235+
$this->evaluateTopics($options, $object);
236+
237+
$iri = $options['topics'] ?? $this->iriConverter->getIriFromResource($object, UrlGeneratorInterface::ABS_URL);
238+
$data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
239+
}
240+
241+
$updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
242+
243+
foreach ($updates as $update) {
244+
if ($options['enable_async_update'] && $this->messageBus) {
245+
$this->dispatch($update);
246+
continue;
247+
}
248+
249+
$this->hubRegistry->getHub($options['hub'] ?? null)->publish($update);
250+
}
251+
}
252+
253+
private function evaluateTopics(array &$options, object $object): void
254+
{
255+
if (!($options['topics'] ?? false)) {
256+
return;
257+
}
258+
259+
$topics = [];
260+
foreach ((array) $options['topics'] as $topic) {
261+
if (!\is_string($topic)) {
262+
$topics[] = $topic;
263+
continue;
264+
}
265+
266+
if (!str_starts_with($topic, '@=')) {
267+
$topics[] = $topic;
268+
continue;
269+
}
270+
271+
if (null === $this->expressionLanguage) {
272+
throw new \LogicException('The "@=" expression syntax cannot be used without the Expression Language component. Try running "composer require symfony/expression-language".');
273+
}
274+
275+
$topics[] = $this->expressionLanguage->evaluate(substr($topic, 2), ['object' => $object]);
276+
}
277+
278+
$options['topics'] = $topics;
279+
}
280+
281+
/**
282+
* @return Update[]
283+
*/
284+
private function getGraphQlSubscriptionUpdates(object $object, array $options, string $type): array
285+
{
286+
if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
287+
return [];
288+
}
289+
290+
$payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
291+
292+
$updates = [];
293+
foreach ($payloads as [$subscriptionId, $data]) {
294+
$updates[] = $this->buildUpdate(
295+
$this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
296+
(string) (new JsonResponse($data))->getContent(),
297+
$options
298+
);
299+
}
300+
301+
return $updates;
302+
}
303+
304+
/**
305+
* @param string|string[] $iri
306+
*/
307+
private function buildUpdate(string|array $iri, string $data, array $options): Update
308+
{
309+
return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
310+
}
311+
}

0 commit comments

Comments
 (0)