Skip to content

Commit c16dccd

Browse files
committed
debug:router okay
1 parent 2c9c546 commit c16dccd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2892
-135
lines changed
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the API Platform project.
5+
*
6+
* (c) Kévin Dunglas <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
declare(strict_types=1);
13+
14+
namespace ApiPlatform\Core\Bridge\Doctrine\EventListener;
15+
16+
use ApiPlatform\Core\Api\IriConverterInterface;
17+
use ApiPlatform\Core\Api\ResourceClassResolverInterface;
18+
use ApiPlatform\Core\Api\UrlGeneratorInterface;
19+
use ApiPlatform\Core\Bridge\Symfony\Messenger\DispatchTrait;
20+
use ApiPlatform\Core\Exception\InvalidArgumentException;
21+
use ApiPlatform\Core\Exception\RuntimeException;
22+
use ApiPlatform\Core\GraphQl\Subscription\MercureSubscriptionIriGeneratorInterface as GraphQlMercureSubscriptionIriGeneratorInterface;
23+
use ApiPlatform\Core\GraphQl\Subscription\SubscriptionManagerInterface as GraphQlSubscriptionManagerInterface;
24+
use ApiPlatform\Core\Metadata\Resource\Factory\ResourceMetadataFactoryInterface;
25+
use ApiPlatform\Core\Util\ResourceClassInfoTrait;
26+
use Doctrine\Common\EventArgs;
27+
use Doctrine\ODM\MongoDB\Event\OnFlushEventArgs as MongoDbOdmOnFlushEventArgs;
28+
use Doctrine\ORM\Event\OnFlushEventArgs as OrmOnFlushEventArgs;
29+
use Symfony\Component\ExpressionLanguage\ExpressionLanguage;
30+
use Symfony\Component\HttpFoundation\JsonResponse;
31+
use Symfony\Component\Mercure\HubRegistry;
32+
use Symfony\Component\Mercure\Update;
33+
use Symfony\Component\Messenger\MessageBusInterface;
34+
use Symfony\Component\Serializer\SerializerInterface;
35+
36+
/**
37+
* Publishes resources updates to the Mercure hub.
38+
*
39+
* @author Kévin Dunglas <[email protected]>
40+
*
41+
* @experimental
42+
*/
43+
final class PublishMercureUpdatesListener
44+
{
45+
use DispatchTrait;
46+
use ResourceClassInfoTrait;
47+
private const ALLOWED_KEYS = [
48+
'topics' => true,
49+
'data' => true,
50+
'private' => true,
51+
'id' => true,
52+
'type' => true,
53+
'retry' => true,
54+
'normalization_context' => true,
55+
'hub' => true,
56+
'enable_async_update' => true,
57+
];
58+
59+
private $iriConverter;
60+
private $serializer;
61+
private $hubRegistry;
62+
private $expressionLanguage;
63+
private $createdObjects;
64+
private $updatedObjects;
65+
private $deletedObjects;
66+
private $formats;
67+
private $graphQlSubscriptionManager;
68+
private $graphQlMercureSubscriptionIriGenerator;
69+
70+
/**
71+
* @param array<string, string[]|string> $formats
72+
* @param HubRegistry|callable $hubRegistry
73+
*/
74+
public function __construct(ResourceClassResolverInterface $resourceClassResolver, IriConverterInterface $iriConverter, ResourceMetadataFactoryInterface $resourceMetadataFactory, SerializerInterface $serializer, array $formats, MessageBusInterface $messageBus = null, $hubRegistry = null, ?GraphQlSubscriptionManagerInterface $graphQlSubscriptionManager = null, ?GraphQlMercureSubscriptionIriGeneratorInterface $graphQlMercureSubscriptionIriGenerator = null, ExpressionLanguage $expressionLanguage = null)
75+
{
76+
if (null === $messageBus && null === $hubRegistry) {
77+
throw new InvalidArgumentException('A message bus or a hub registry must be provided.');
78+
}
79+
80+
$this->resourceClassResolver = $resourceClassResolver;
81+
$this->iriConverter = $iriConverter;
82+
$this->resourceMetadataFactory = $resourceMetadataFactory;
83+
$this->serializer = $serializer;
84+
$this->formats = $formats;
85+
$this->messageBus = $messageBus;
86+
$this->hubRegistry = $hubRegistry;
87+
$this->expressionLanguage = $expressionLanguage ?? (class_exists(ExpressionLanguage::class) ? new ExpressionLanguage() : null);
88+
$this->graphQlSubscriptionManager = $graphQlSubscriptionManager;
89+
$this->graphQlMercureSubscriptionIriGenerator = $graphQlMercureSubscriptionIriGenerator;
90+
$this->reset();
91+
}
92+
93+
/**
94+
* Collects created, updated and deleted objects.
95+
*/
96+
public function onFlush(EventArgs $eventArgs): void
97+
{
98+
if ($eventArgs instanceof OrmOnFlushEventArgs) {
99+
$uow = $eventArgs->getEntityManager()->getUnitOfWork();
100+
} elseif ($eventArgs instanceof MongoDbOdmOnFlushEventArgs) {
101+
$uow = $eventArgs->getDocumentManager()->getUnitOfWork();
102+
} else {
103+
return;
104+
}
105+
106+
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityInsertions' : 'getScheduledDocumentInsertions';
107+
foreach ($uow->{$methodName}() as $object) {
108+
$this->storeObjectToPublish($object, 'createdObjects');
109+
}
110+
111+
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityUpdates' : 'getScheduledDocumentUpdates';
112+
foreach ($uow->{$methodName}() as $object) {
113+
$this->storeObjectToPublish($object, 'updatedObjects');
114+
}
115+
116+
$methodName = $eventArgs instanceof OrmOnFlushEventArgs ? 'getScheduledEntityDeletions' : 'getScheduledDocumentDeletions';
117+
foreach ($uow->{$methodName}() as $object) {
118+
$this->storeObjectToPublish($object, 'deletedObjects');
119+
}
120+
}
121+
122+
/**
123+
* Publishes updates for changes collected on flush, and resets the store.
124+
*/
125+
public function postFlush(): void
126+
{
127+
try {
128+
foreach ($this->createdObjects as $object) {
129+
$this->publishUpdate($object, $this->createdObjects[$object], 'create');
130+
}
131+
132+
foreach ($this->updatedObjects as $object) {
133+
$this->publishUpdate($object, $this->updatedObjects[$object], 'update');
134+
}
135+
136+
foreach ($this->deletedObjects as $object) {
137+
$this->publishUpdate($object, $this->deletedObjects[$object], 'delete');
138+
}
139+
} finally {
140+
$this->reset();
141+
}
142+
}
143+
144+
private function reset(): void
145+
{
146+
$this->createdObjects = new \SplObjectStorage();
147+
$this->updatedObjects = new \SplObjectStorage();
148+
$this->deletedObjects = new \SplObjectStorage();
149+
}
150+
151+
/**
152+
* @param object $object
153+
*/
154+
private function storeObjectToPublish($object, string $property): void
155+
{
156+
if (null === $resourceClass = $this->getResourceClass($object)) {
157+
return;
158+
}
159+
160+
$options = $this->resourceMetadataFactory->create($resourceClass)->getAttribute('mercure', false);
161+
162+
if (\is_string($options)) {
163+
if (null === $this->expressionLanguage) {
164+
throw new RuntimeException('The Expression Language component is not installed. Try running "composer require symfony/expression-language".');
165+
}
166+
167+
$options = $this->expressionLanguage->evaluate($options, ['object' => $object]);
168+
}
169+
170+
if (false === $options) {
171+
return;
172+
}
173+
174+
if (true === $options) {
175+
$options = [];
176+
}
177+
178+
if (!\is_array($options)) {
179+
throw new InvalidArgumentException(sprintf('The value of the "mercure" attribute of the "%s" resource class must be a boolean, an array of options or an expression returning this array, "%s" given.', $resourceClass, \gettype($options)));
180+
}
181+
182+
foreach ($options as $key => $value) {
183+
if (0 === $key) {
184+
if (method_exists(Update::class, 'isPrivate')) {
185+
throw new \InvalidArgumentException('Targets do not exist anymore since Mercure 0.10. Mark the update as private instead or downgrade the Mercure Component to version 0.3');
186+
}
187+
188+
@trigger_error('Targets do not exist anymore since Mercure 0.10. Mark the update as private instead.', \E_USER_DEPRECATED);
189+
break;
190+
}
191+
192+
if (!isset(self::ALLOWED_KEYS[$key])) {
193+
throw new InvalidArgumentException(sprintf('The option "%s" set in the "mercure" attribute of the "%s" resource does not exist. Existing options: "%s"', $key, $resourceClass, implode('", "', self::ALLOWED_KEYS)));
194+
}
195+
196+
if ('hub' === $key && !$this->hubRegistry instanceof HubRegistry) {
197+
throw new InvalidArgumentException(sprintf('The option "hub" of the "mercure" attribute cannot be set on the "%s" resource . Try running "composer require symfony/mercure:^0.5".', $resourceClass));
198+
}
199+
}
200+
201+
$options['enable_async_update'] = $options['enable_async_update'] ?? true;
202+
203+
if ('deletedObjects' === $property) {
204+
$this->deletedObjects[(object) [
205+
'id' => $this->iriConverter->getIriFromItem($object),
206+
'iri' => $this->iriConverter->getIriFromItem($object, UrlGeneratorInterface::ABS_URL),
207+
]] = $options;
208+
209+
return;
210+
}
211+
212+
$this->{$property}[$object] = $options;
213+
}
214+
215+
/**
216+
* @param object $object
217+
*/
218+
private function publishUpdate($object, array $options, string $type): void
219+
{
220+
if ($object instanceof \stdClass) {
221+
// By convention, if the object has been deleted, we send only its IRI.
222+
// This may change in the feature, because it's not JSON Merge Patch compliant,
223+
// and I'm not a fond of this approach.
224+
$iri = $options['topics'] ?? $object->iri;
225+
/** @var string $data */
226+
$data = json_encode(['@id' => $object->id]);
227+
} else {
228+
$resourceClass = $this->getObjectClass($object);
229+
$context = $options['normalization_context'] ?? $this->resourceMetadataFactory->create($resourceClass)->getAttribute('normalization_context', []);
230+
231+
$iri = $options['topics'] ?? $this->iriConverter->getIriFromItem($object, UrlGeneratorInterface::ABS_URL);
232+
$data = $options['data'] ?? $this->serializer->serialize($object, key($this->formats), $context);
233+
}
234+
235+
$updates = array_merge([$this->buildUpdate($iri, $data, $options)], $this->getGraphQlSubscriptionUpdates($object, $options, $type));
236+
237+
foreach ($updates as $update) {
238+
if ($options['enable_async_update'] && $this->messageBus) {
239+
$this->dispatch($update);
240+
continue;
241+
}
242+
243+
$this->hubRegistry instanceof HubRegistry ? $this->hubRegistry->getHub($options['hub'] ?? null)->publish($update) : ($this->hubRegistry)($update);
244+
}
245+
}
246+
247+
/**
248+
* @param object $object
249+
*
250+
* @return Update[]
251+
*/
252+
private function getGraphQlSubscriptionUpdates($object, array $options, string $type): array
253+
{
254+
if ('update' !== $type || !$this->graphQlSubscriptionManager || !$this->graphQlMercureSubscriptionIriGenerator) {
255+
return [];
256+
}
257+
258+
$payloads = $this->graphQlSubscriptionManager->getPushPayloads($object);
259+
260+
$updates = [];
261+
foreach ($payloads as [$subscriptionId, $data]) {
262+
$updates[] = $this->buildUpdate(
263+
$this->graphQlMercureSubscriptionIriGenerator->generateTopicIri($subscriptionId),
264+
(string) (new JsonResponse($data))->getContent(),
265+
$options
266+
);
267+
}
268+
269+
return $updates;
270+
}
271+
272+
/**
273+
* @param string|string[] $iri
274+
*/
275+
private function buildUpdate($iri, string $data, array $options): Update
276+
{
277+
if (method_exists(Update::class, 'isPrivate')) {
278+
return new Update($iri, $data, $options['private'] ?? false, $options['id'] ?? null, $options['type'] ?? null, $options['retry'] ?? null);
279+
}
280+
281+
// Mercure Component < 0.4.
282+
/* @phpstan-ignore-next-line */
283+
return new Update($iri, $data, $options);
284+
}
285+
}

0 commit comments

Comments
 (0)