From 54ed971d02032ab970056b6e991bc3c7125997eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Boryczko?= Date: Fri, 3 Nov 2023 00:06:00 +0100 Subject: [PATCH 1/2] added option to configure DetailType for EventBridge --- README.md | 11 + src/Resources/config/services.yaml | 2 + .../DefaultEventBridgeDetailTypeResolver.php | 15 ++ .../EventBridgeDetailTypeResolver.php | 10 + .../EventBridge/EventBridgeTransport.php | 37 +-- .../EventBridgeTransportFactory.php | 13 +- ...faultEventBridgeDetailTypeResolverTest.php | 24 ++ .../EventBridge/EventBridgeTransportTest.php | 211 ++++++++++++++++++ 8 files changed, 306 insertions(+), 17 deletions(-) create mode 100644 src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php create mode 100644 src/Service/EventBridge/EventBridgeDetailTypeResolver.php create mode 100644 tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php create mode 100644 tests/Unit/Service/EventBridge/EventBridgeTransportTest.php diff --git a/README.md b/README.md index eda26ad..ef5b1fe 100644 --- a/README.md +++ b/README.md @@ -398,6 +398,17 @@ services: Now, anytime a message is dispatched to EventBridge for that source, the Lambda function will be called. The Bref consumer class will put back the message into Symfony Messenger to be processed. +EventBridge event dispatching: + +By default `DetailType` is `Symfony Messenger message` but you can change it by: +```yaml +# config/services.yaml +services: + bref.messenger.eventbridge_detail_type_resolver: + class: Bref\Symfony\Messenger\Service\EventBridge\DefaultEventBridgeDetailTypeResolver +``` +You can use `DefaultEventBridgeDetailTypeResolver` which is provided by this package (it will use message class name as DetailType) or create your own resolver by implementing `Bref\Symfony\Messenger\Service\EventBridge\EventBridgeDetailTypeResolverInterface`. + ## Error handling AWS Lambda has error handling mechanisms (retrying and handling failed messages). Because of that, this package does not integrates Symfony Messenger's retry mechanism. Instead, it works with Lambda's retry mechanism. diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 0bf098a..98fd54f 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -17,5 +17,7 @@ services: tags: ['messenger.transport_factory'] arguments: - '@bref.messenger.eventbridge_client' + - '@?bref.messenger.eventbridge_detail_type_resolver' bref.messenger.eventbridge_client: class: AsyncAws\EventBridge\EventBridgeClient + diff --git a/src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php b/src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php new file mode 100644 index 0000000..32bf2a9 --- /dev/null +++ b/src/Service/EventBridge/DefaultEventBridgeDetailTypeResolver.php @@ -0,0 +1,15 @@ +getMessage())); + + return end($explodedFQCN); + } +} diff --git a/src/Service/EventBridge/EventBridgeDetailTypeResolver.php b/src/Service/EventBridge/EventBridgeDetailTypeResolver.php new file mode 100644 index 0000000..9db6407 --- /dev/null +++ b/src/Service/EventBridge/EventBridgeDetailTypeResolver.php @@ -0,0 +1,10 @@ +eventBridge = $eventBridge; $this->serializer = $serializer; $this->source = $source; $this->eventBusName = $eventBusName; + $this->detailTypeResolver = $detailTypeResolver; } public function send(Envelope $envelope): Envelope @@ -37,8 +43,9 @@ public function send(Envelope $envelope): Envelope 'Entries' => [ [ 'Detail' => json_encode($encodedMessage, JSON_THROW_ON_ERROR), - // Ideally here we could put the class name of the message, but how to retrieve it? - 'DetailType' => 'Symfony Messenger message', + 'DetailType' => $this->detailTypeResolver !== null ? + $this->detailTypeResolver->resolveDetailType($envelope) : + 'Symfony Messenger message', 'Source' => $this->source, ], ], @@ -58,7 +65,7 @@ public function send(Envelope $envelope): Envelope if ($failedCount > 0) { foreach ($result->getEntries() as $entry) { $reason = $entry->getErrorMessage() ?? 'no reason provided'; - throw new TransportException("$failedCount message(s) could not be published to EventBridge: $reason."); + throw new TransportException(Symfony Messenger message); } } diff --git a/src/Service/EventBridge/EventBridgeTransportFactory.php b/src/Service/EventBridge/EventBridgeTransportFactory.php index 9d7f80c..d9e26db 100644 --- a/src/Service/EventBridge/EventBridgeTransportFactory.php +++ b/src/Service/EventBridge/EventBridgeTransportFactory.php @@ -13,9 +13,12 @@ final class EventBridgeTransportFactory implements TransportFactoryInterface /** @var EventBridgeClient */ private $eventBridge; - public function __construct(EventBridgeClient $eventBridge) + private ?EventBridgeDetailTypeResolver $detailTypeResolver; + + public function __construct(EventBridgeClient $eventBridge, ?EventBridgeDetailTypeResolver $detailTypeResolver = null) { $this->eventBridge = $eventBridge; + $this->detailTypeResolver = $detailTypeResolver; } public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface @@ -29,7 +32,13 @@ public function createTransport(string $dsn, array $options, SerializerInterface parse_str($parsedUrl['query'], $query); } - return new EventBridgeTransport($this->eventBridge, $serializer, $parsedUrl['host'], $query['event_bus_name'] ?? null); + return new EventBridgeTransport( + $this->eventBridge, + $serializer, + $parsedUrl['host'], + $query['event_bus_name'] ?? null, + $this->detailTypeResolver + ); } public function supports(string $dsn, array $options): bool diff --git a/tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php b/tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php new file mode 100644 index 0000000..170fb32 --- /dev/null +++ b/tests/Unit/Service/EventBridge/DefaultEventBridgeDetailTypeResolverTest.php @@ -0,0 +1,24 @@ +resolver = new DefaultEventBridgeDetailTypeResolver(); + } + + public function testResolver(): void + { + $envelope = new Envelope(new \stdClass()); + + $this->assertEquals('stdClass', $this->resolver->resolveDetailType($envelope)); + } +} diff --git a/tests/Unit/Service/EventBridge/EventBridgeTransportTest.php b/tests/Unit/Service/EventBridge/EventBridgeTransportTest.php new file mode 100644 index 0000000..8dd2ec5 --- /dev/null +++ b/tests/Unit/Service/EventBridge/EventBridgeTransportTest.php @@ -0,0 +1,211 @@ +eventBusName = null; + $this->detailTypeResolver = null; + $this->serializer = $this->getMockForAbstractClass(SerializerInterface::class); + $this->eventBridge = $this->createMock(EventBridgeClient::class); + } + + public function testSendSuccess() + { + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->with( + [ + 'Entries' => [ + [ + 'Detail' => '{"body":"test"}', + 'DetailType' => 'Symfony Messenger message', + 'Source' => $this->source, + ], + ], + ] + ) + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(0); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + public function testSendSuccessWithCustomBusName() + { + $this->eventBusName = 'custom'; + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->with( + [ + 'Entries' => [ + [ + 'Detail' => '{"body":"test"}', + 'DetailType' => 'Symfony Messenger message', + 'Source' => $this->source, + 'EventBusName' => 'custom', + ], + ], + ] + ) + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(0); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + public function testSendSuccessWithDetailTypeResolver() + { + $this->detailTypeResolver = $this->getMockForAbstractClass(EventBridgeDetailTypeResolver::class); + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->detailTypeResolver + ->expects($this->once()) + ->method('resolveDetailType') + ->with($envelope) + ->willReturn('stdClass'); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->with( + [ + 'Entries' => [ + [ + 'Detail' => '{"body":"test"}', + 'DetailType' => 'stdClass', + 'Source' => $this->source, + ], + ], + ] + ) + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(0); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + public function testSendFailed() + { + $envelope = new Envelope(new \stdClass()); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->willThrowException(new \Exception('event bridge exception')); + $this->expectException(TransportException::class); + + $this->createTransport()->send($envelope); + } + + public function testSendResultFailed() + { + $envelope = new Envelope(new \stdClass()); + $result = $this->createMock(PutEventsResponse::class); + $resultErrorEntry = new PutEventsResultEntry(['ErrorMessage' => 'Error message']); + + $this->serializer + ->expects($this->once()) + ->method('encode') + ->with($envelope) + ->willReturn(['body' => 'test']); + $this->eventBridge + ->expects($this->once()) + ->method('putEvents') + ->willReturn($result); + $result->expects($this->once()) + ->method('getFailedEntryCount') + ->willReturn(1); + $result->expects($this->once()) + ->method('getEntries') + ->willReturn([$resultErrorEntry]); + + $this->expectException(TransportException::class); + $this->expectExceptionMessage("1 message(s) could not be published to EventBridge: Error message."); + + $this->assertSame( + $envelope, + $this->createTransport()->send($envelope) + ); + } + + private function createTransport(): EventBridgeTransport + { + return new EventBridgeTransport( + $this->eventBridge, + $this->serializer, + $this->source, + $this->eventBusName, + $this->detailTypeResolver + ); + } +} From 1c922ccd31edd9fa174253e8c35c4e142cd8c4a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Boryczko?= Date: Fri, 3 Nov 2023 00:07:22 +0100 Subject: [PATCH 2/2] added option to configure DetailType for EventBridge - fix typo --- src/Service/EventBridge/EventBridgeTransport.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Service/EventBridge/EventBridgeTransport.php b/src/Service/EventBridge/EventBridgeTransport.php index a7df2f2..e4e58e5 100644 --- a/src/Service/EventBridge/EventBridgeTransport.php +++ b/src/Service/EventBridge/EventBridgeTransport.php @@ -65,7 +65,7 @@ public function send(Envelope $envelope): Envelope if ($failedCount > 0) { foreach ($result->getEntries() as $entry) { $reason = $entry->getErrorMessage() ?? 'no reason provided'; - throw new TransportException(Symfony Messenger message); + throw new TransportException("$failedCount message(s) could not be published to EventBridge: $reason."); } }