Skip to content

Commit c9afef9

Browse files
[Messenger] Add TransportInterface as first class citizen sender+receiver
1 parent 13aa95f commit c9afef9

File tree

7 files changed

+111
-34
lines changed

7 files changed

+111
-34
lines changed

Transport/AmqpExt/AmqpSender.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec
3333
/**
3434
* {@inheritdoc}
3535
*/
36-
public function send($message)
36+
public function send($message): void
3737
{
3838
$encodedMessage = $this->messageEncoder->encode($message);
3939

Transport/AmqpExt/AmqpTransport.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\AmqpExt;
13+
14+
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
15+
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
16+
use Symfony\Component\Messenger\Transport\TransportInterface;
17+
18+
/**
19+
* @author Nicolas Grekas <[email protected]>
20+
*/
21+
class AmqpTransport implements TransportInterface
22+
{
23+
private $encoder;
24+
private $decoder;
25+
private $dsn;
26+
private $options;
27+
private $debug;
28+
private $connection;
29+
private $receiver;
30+
private $sender;
31+
32+
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
33+
{
34+
$this->encoder = $encoder;
35+
$this->decoder = $decoder;
36+
$this->dsn = $dsn;
37+
$this->options = $options;
38+
$this->debug = $debug;
39+
}
40+
41+
/**
42+
* {@inheritdoc}
43+
*/
44+
public function receive(callable $handler): void
45+
{
46+
($this->receiver ?? $this->getReceiver())->receive($hander);
47+
}
48+
49+
/**
50+
* {@inheritdoc}
51+
*/
52+
public function stop(): void
53+
{
54+
($this->receiver ?? $this->getReceiver())->stop();
55+
}
56+
57+
/**
58+
* {@inheritdoc}
59+
*/
60+
public function send($message): void
61+
{
62+
($this->sender ?? $this->getSender())->send($message);
63+
}
64+
65+
private function getReceiver()
66+
{
67+
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
68+
}
69+
70+
private function getSender()
71+
{
72+
return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
73+
}
74+
75+
private function getConnection()
76+
{
77+
return $this->connection = new Connection($this->dsn, $this->options, $this->debug);
78+
}
79+
}

Transport/AmqpExt/AmqpTransportFactory.php

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14-
use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface;
15-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
16-
use Symfony\Component\Messenger\Transport\SenderInterface;
1714
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
1815
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
16+
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
17+
use Symfony\Component\Messenger\Transport\TransportInterface;
1918

2019
/**
2120
* @author Samuel Roze <[email protected]>
@@ -33,14 +32,9 @@ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder
3332
$this->debug = $debug;
3433
}
3534

36-
public function createReceiver(string $dsn, array $options): ReceiverInterface
35+
public function createTransport(string $dsn, array $options): TransportInterface
3736
{
38-
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
39-
}
40-
41-
public function createSender(string $dsn, array $options): SenderInterface
42-
{
43-
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
37+
return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $thid->debug);
4438
}
4539

4640
public function supports(string $dsn, array $options): bool

Transport/SenderInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ interface SenderInterface
2323
*
2424
* @param object $message
2525
*/
26-
public function send($message);
26+
public function send($message): void;
2727
}

Transport/Factory/ChainTransportFactory.php renamed to Transport/TransportFactory.php

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,12 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Transport\Factory;
13-
14-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
15-
use Symfony\Component\Messenger\Transport\SenderInterface;
12+
namespace Symfony\Component\Messenger\Transport;
1613

1714
/**
1815
* @author Samuel Roze <[email protected]>
1916
*/
20-
class ChainTransportFactory implements TransportFactoryInterface
17+
class TransportFactory implements TransportFactoryInterface
2118
{
2219
private $factories;
2320

@@ -29,22 +26,11 @@ public function __construct(iterable $factories)
2926
$this->factories = $factories;
3027
}
3128

32-
public function createReceiver(string $dsn, array $options): ReceiverInterface
33-
{
34-
foreach ($this->factories as $factory) {
35-
if ($factory->supports($dsn, $options)) {
36-
return $factory->createReceiver($dsn, $options);
37-
}
38-
}
39-
40-
throw new \InvalidArgumentException(sprintf('No transport supports the given DSN "%s".', $dsn));
41-
}
42-
43-
public function createSender(string $dsn, array $options): SenderInterface
29+
public function createTransport(string $dsn, array $options): TransportInterface
4430
{
4531
foreach ($this->factories as $factory) {
4632
if ($factory->supports($dsn, $options)) {
47-
return $factory->createSender($dsn, $options);
33+
return $factory->createTransport($dsn, $options);
4834
}
4935
}
5036

Transport/Factory/TransportFactoryInterface.php renamed to Transport/TransportFactoryInterface.php

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
namespace Symfony\Component\Messenger\Transport\Factory;
13-
14-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
15-
use Symfony\Component\Messenger\Transport\SenderInterface;
12+
namespace Symfony\Component\Messenger\Transport;
1613

1714
/**
1815
* Creates a Messenger transport.

Transport/TransportInterface.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <[email protected]>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport;
13+
14+
/**
15+
* @author Nicolas Grekas <[email protected]>
16+
*
17+
* @experimental in 4.1
18+
*/
19+
interface TransportInterface extends ReceiverInterface, SenderInterface
20+
{
21+
}

0 commit comments

Comments
 (0)