Skip to content

Commit 9a4b2b1

Browse files
committed
Issue #59: dynamic stream flag and readable stream messages
Signed-off-by: bota <[email protected]>
1 parent a9578f7 commit 9a4b2b1

File tree

3 files changed

+150
-21
lines changed

3 files changed

+150
-21
lines changed

config/autoload/messenger.local.php.dist

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
88
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;
99

1010
return [
11-
"symfony" => [
12-
"messenger" => [
11+
'symfony' => [
12+
'messenger' => [
1313
'transports' => [
1414
'redis_transport' => [
1515
'dsn' => 'redis://127.0.0.1:6379/messages',
@@ -31,10 +31,10 @@ return [
3131
'failure_transport' => 'failed',
3232
],
3333
],
34-
"dependencies" => [
35-
"factories" => [
36-
"redis_transport" => [TransportFactory::class, 'redis_transport'],
37-
"failed" => [TransportFactory::class, 'failed'],
34+
'dependencies' => [
35+
'factories' => [
36+
'redis_transport' => [TransportFactory::class, 'redis_transport'],
37+
'failed' => [TransportFactory::class, 'failed'],
3838
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
3939
],
4040
],

src/Swoole/Command/GetQueuedMessagesCommand.php

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,28 @@
77
use Dot\DependencyInjection\Attribute\Inject;
88
use Redis;
99
use RedisException;
10+
use ReflectionClass;
1011
use Symfony\Component\Console\Attribute\AsCommand;
1112
use Symfony\Component\Console\Command\Command;
1213
use Symfony\Component\Console\Input\InputInterface;
14+
use Symfony\Component\Console\Input\InputOption;
1315
use Symfony\Component\Console\Output\OutputInterface;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1418

1519
use function count;
20+
use function is_array;
21+
use function is_string;
22+
use function json_decode;
1623
use function json_encode;
24+
use function method_exists;
25+
use function preg_match;
1726
use function str_repeat;
27+
use function stripcslashes;
28+
use function unserialize;
1829

1930
use const JSON_PRETTY_PRINT;
31+
use const JSON_UNESCAPED_SLASHES;
2032
use const JSON_UNESCAPED_UNICODE;
2133

2234
#[AsCommand(
@@ -25,9 +37,7 @@
2537
)]
2638
class GetQueuedMessagesCommand extends Command
2739
{
28-
/** @var string $defaultName */
29-
protected static $defaultName = 'inventory';
30-
40+
protected static string $defaultName = 'inventory';
3141
private Redis $redis;
3242

3343
#[Inject('redis')]
@@ -39,29 +49,93 @@ public function __construct(Redis $redis)
3949

4050
protected function configure(): void
4151
{
42-
$this->setDescription('Get all queued messages from Redis stream "messages"');
52+
$this->setDescription('Get all queued messages from Redis stream "messages"')
53+
->addOption('stream', null, InputOption::VALUE_REQUIRED, 'stream name');
4354
}
4455

4556
/**
4657
* @throws RedisException
4758
*/
4859
protected function execute(InputInterface $input, OutputInterface $output): int
4960
{
50-
$entries = $this->redis->xRange('messages', '-', '+');
61+
$streamName = (string) $input->getOption('stream');
62+
$entries = $this->redis->xRange($streamName, '-', '+');
5163

5264
if (empty($entries)) {
53-
$output->writeln('<info>No messages queued found in Redis stream "messages".</info>');
65+
$output->writeln("<info>No messages queued found in Redis stream $streamName</info>");
5466
return Command::SUCCESS;
5567
}
5668

5769
foreach ($entries as $id => $entry) {
58-
$output->writeln("<info>Message ID:</info> $id");
59-
$output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
70+
$output->writeln("<comment>Message ID:</comment> $id");
71+
72+
foreach ($entry as $field => $value) {
73+
$raw = is_string($value) ? $value : (string) $value;
74+
75+
if (preg_match('/^s:\d+:\".*\";$/s', $raw)) {
76+
$raw = @unserialize($raw, ['allowed_classes' => true]);
77+
}
78+
79+
$json = json_decode((string) $raw, true);
80+
81+
if (is_array($json) && isset($json['body'])) {
82+
$body = stripcslashes($json['body']);
83+
$envelope = @unserialize($body, ['allowed_classes' => true]);
84+
85+
if ($envelope instanceof Envelope) {
86+
$message = $envelope->getMessage();
87+
$output->writeln(" <info>$field</info>:");
88+
$output->writeln(" <comment>Message Class</comment>: " . $message::class);
89+
90+
$payload = null;
91+
if (method_exists($message, 'getPayload')) {
92+
$payload = $message->getPayload();
93+
} else {
94+
try {
95+
$refClass = new ReflectionClass($message);
96+
if ($refClass->hasProperty('payload')) {
97+
$prop = $refClass->getProperty('payload');
98+
$payload = $prop->getValue($message);
99+
}
100+
} catch (\Throwable $e) {
101+
$payload = '[unavailable: ' . $e->getMessage() . ']';
102+
}
103+
}
104+
105+
if ($payload !== null) {
106+
$output->writeln(" <comment>Payload</comment>:");
107+
$output->writeln(
108+
json_encode(
109+
$payload,
110+
JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
111+
)
112+
);
113+
}
114+
115+
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
116+
if (! empty($redeliveryStamps)) {
117+
$output->writeln(" <comment>Timestamps</comment>:");
118+
foreach ($redeliveryStamps as $stamp) {
119+
if ($stamp instanceof RedeliveryStamp) {
120+
$output->writeln(" - "
121+
. $stamp->getRedeliveredAt()->format('Y-m-d H:i:s')
122+
. " (retry count: " . $stamp->getRetryCount() . ")");
123+
}
124+
}
125+
}
126+
} else {
127+
$output->writeln(" <info>$field</info>: failed to unserialize envelope");
128+
}
129+
} else {
130+
$output->writeln(" <info>$field</info>: $raw");
131+
}
132+
}
133+
60134
$output->writeln(str_repeat('-', 40));
61135
}
62136

63137
$total = count($entries);
64-
$output->writeln("<info>Total queued messages in stream 'messages':</info> $total");
138+
$output->writeln("<info>Total queued messages in stream $streamName:</info> $total");
65139
$output->writeln(str_repeat('-', 40));
66140

67141
return Command::SUCCESS;

test/Swoole/Command/GetQueuedMessagesCommandTest.php

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,22 @@
77
use PHPUnit\Framework\MockObject\Exception;
88
use PHPUnit\Framework\MockObject\MockObject;
99
use PHPUnit\Framework\TestCase;
10+
use Queue\App\Message\Message;
1011
use Queue\Swoole\Command\GetQueuedMessagesCommand;
1112
use Redis;
1213
use RedisException;
1314
use Symfony\Component\Console\Command\Command;
1415
use Symfony\Component\Console\Exception\ExceptionInterface;
1516
use Symfony\Component\Console\Input\ArrayInput;
1617
use Symfony\Component\Console\Output\BufferedOutput;
18+
use Symfony\Component\Messenger\Envelope;
19+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1720

21+
use function addslashes;
1822
use function array_keys;
1923
use function count;
24+
use function json_encode;
25+
use function serialize;
2026

2127
class GetQueuedMessagesCommandTest extends TestCase
2228
{
@@ -42,7 +48,7 @@ public function testExecuteWithNoMessages(): void
4248
->willReturn([]);
4349

4450
$command = new GetQueuedMessagesCommand($this->redisMock);
45-
$input = new ArrayInput([]);
51+
$input = new ArrayInput(['--stream' => 'messages']);
4652
$output = new BufferedOutput();
4753

4854
$exitCode = $command->run($input, $output);
@@ -55,11 +61,11 @@ public function testExecuteWithNoMessages(): void
5561
/**
5662
* @throws ExceptionInterface
5763
*/
58-
public function testExecuteWithMessages(): void
64+
public function testExecuteWithSimpleMessages(): void
5965
{
6066
$fakeMessages = [
61-
'1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@dotkernel.com"}'],
62-
'1691000000001-0' => ['type' => 'testSms', 'payload' => '{"to":"+123456789"}'],
67+
'1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@example.com"}'],
68+
'1691000000001-0' => ['type' => 'testEmail2', 'payload' => '{"to":"[email protected]"}'],
6369
];
6470

6571
$this->redisMock
@@ -69,7 +75,7 @@ public function testExecuteWithMessages(): void
6975
->willReturn($fakeMessages);
7076

7177
$command = new GetQueuedMessagesCommand($this->redisMock);
72-
$input = new ArrayInput([]);
78+
$input = new ArrayInput(['--stream' => 'messages']);
7379
$output = new BufferedOutput();
7480

7581
$exitCode = $command->run($input, $output);
@@ -97,10 +103,59 @@ public function testRedisThrowsException(): void
97103
->willThrowException(new RedisException('Redis unavailable'));
98104

99105
$command = new GetQueuedMessagesCommand($this->redisMock);
100-
$input = new ArrayInput([]);
106+
$input = new ArrayInput(['--stream' => 'messages']);
101107
$output = new BufferedOutput();
102108

103109
$this->expectException(RedisException::class);
104110
$command->run($input, $output);
105111
}
112+
113+
public function testExecuteWithInvalidBody(): void
114+
{
115+
$invalidBodyJson = json_encode(['body' => 'not-a-serialized-envelope']);
116+
117+
$this->redisMock
118+
->expects($this->once())
119+
->method('xRange')
120+
->willReturn([
121+
'2-0' => ['body' => $invalidBodyJson],
122+
]);
123+
124+
$command = new GetQueuedMessagesCommand($this->redisMock);
125+
$input = new ArrayInput(['--stream' => 'messages']);
126+
$output = new BufferedOutput();
127+
128+
$exitCode = $command->run($input, $output);
129+
$outputText = $output->fetch();
130+
131+
$this->assertEquals(Command::SUCCESS, $exitCode);
132+
$this->assertStringContainsString('failed to unserialize envelope', $outputText);
133+
}
134+
135+
public function testExecuteWithValidEnvelope(): void
136+
{
137+
$message = new Message(['foo' => 'bar']);
138+
$envelope = new Envelope($message, [new RedeliveryStamp(1)]);
139+
$serializedEnvelope = serialize($envelope);
140+
$jsonBody = json_encode(['body' => addslashes($serializedEnvelope)]);
141+
142+
$this->redisMock
143+
->expects($this->once())
144+
->method('xRange')
145+
->willReturn([
146+
'100-0' => ['body' => $jsonBody],
147+
]);
148+
149+
$command = new GetQueuedMessagesCommand($this->redisMock);
150+
$input = new ArrayInput(['--stream' => 'messages']);
151+
$output = new BufferedOutput();
152+
153+
$exitCode = $command->run($input, $output);
154+
$outputText = $output->fetch();
155+
156+
$this->assertEquals(Command::SUCCESS, $exitCode);
157+
$this->assertStringContainsString('Message Class', $outputText);
158+
$this->assertStringContainsString('Payload', $outputText);
159+
$this->assertStringContainsString('Timestamps', $outputText);
160+
}
106161
}

0 commit comments

Comments
 (0)