Skip to content

Commit 7ebf973

Browse files
committed
command to get all queued messages in messages stream
Signed-off-by: bota <[email protected]>
1 parent aa24e75 commit 7ebf973

File tree

6 files changed

+84
-4
lines changed

6 files changed

+84
-4
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
},
4545
"require": {
4646
"php": "~8.2.0 || ~8.3.0 || ~8.4",
47+
"ext-redis": "*",
4748
"dotkernel/dot-cli": "^3.9",
4849
"dotkernel/dot-dependency-injection": "^1.2",
4950
"dotkernel/dot-errorhandler": "4.2.1",

config/autoload/cli.global.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Dot\Cli\FileLockerInterface;
66
use Queue\Swoole\Command\GetFailedMessagesCommand;
77
use Queue\Swoole\Command\GetProcessedMessagesCommand;
8+
use Queue\Swoole\Command\GetQueuedMessagesCommand;
89
use Queue\Swoole\Command\StartCommand;
910
use Queue\Swoole\Command\StopCommand;
1011
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
@@ -21,6 +22,7 @@
2122
"messenger:debug" => DebugCommand::class,
2223
"processed" => GetProcessedMessagesCommand::class,
2324
"failed" => GetFailedMessagesCommand::class,
25+
"inventory" => GetQueuedMessagesCommand::class,
2426
],
2527
],
2628
FileLockerInterface::class => [

docs/book/v1/commands.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22

33
The commands available are:
44

5-
1. `GetFailedMessagesCommand.php` - returns logs with messages that failed to process (levelName:error)
6-
2. `GetProcessedMessagesCommand.php` - returns logs with messages that were successfully processed (levelName:info)
5+
1. `GetFailedMessagesCommand.php (failed)` - returns logs with messages that failed to process (levelName:error)
6+
2. `GetProcessedMessagesCommand.php (procecssed)` - returns logs with messages that were successfully processed (levelName:info)
7+
3. `GetQueuedMessagesCommand (inventory)` - returns all queued messages from Redis stream 'messages'
78

8-
Both commands are used to extract data that can be filtered by period from the log file. The commands can be run in two different ways:
9+
The commands can be run in two different ways:
910

1011
### CLI
1112

@@ -15,6 +16,8 @@ To run the commands via CLI, use the following syntax:
1516

1617
`php bin/cli.php processed --start="yyyy-mm-dd" --end="yyyy-mm-dd" --limit=int`
1718

19+
`php bin/cli.php inventory`
20+
1821
### TCP message
1922

2023
To use commands using TCP messages the following messages can be used:
@@ -29,4 +32,6 @@ In order to be able to test the `processed` command, by default when processing
2932

3033
`echo "control" | socat -t1 - TCP:host:port`
3134

32-
Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely.
35+
> Using `-t1` flag is not necessary but can be useful, it is used to set a timeout of n seconds for both reading and writing, after n second of inactivity, socat will terminate the connection. If the timeout is not set and the server does not respond or keep the connection open, the socat process could hang indefinitely.
36+
37+
`echo "inventory" | socat -t1 - TCP:host:port`
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Queue\Swoole\Command;
6+
7+
use Dot\DependencyInjection\Attribute\Inject;
8+
use Redis;
9+
use RedisException;
10+
use Symfony\Component\Console\Attribute\AsCommand;
11+
use Symfony\Component\Console\Command\Command;
12+
use Symfony\Component\Console\Input\InputInterface;
13+
use Symfony\Component\Console\Output\OutputInterface;
14+
15+
use function count;
16+
use function json_encode;
17+
use function str_repeat;
18+
19+
use const JSON_PRETTY_PRINT;
20+
use const JSON_UNESCAPED_UNICODE;
21+
22+
#[AsCommand(
23+
name: 'inventory',
24+
description: 'Get all queued messages from Redis stream "messages"',
25+
)]
26+
class GetQueuedMessagesCommand extends Command
27+
{
28+
protected static string $defaultName = 'inventory';
29+
30+
private Redis $redis;
31+
32+
#[Inject('redis')]
33+
public function __construct(Redis $redis)
34+
{
35+
parent::__construct(self::$defaultName);
36+
$this->redis = $redis;
37+
}
38+
39+
protected function configure(): void
40+
{
41+
$this->setDescription('Get all queued messages from Redis stream "messages"');
42+
}
43+
44+
/**
45+
* @throws RedisException
46+
*/
47+
protected function execute(InputInterface $input, OutputInterface $output): int
48+
{
49+
$entries = $this->redis->xRange('messages', '-', '+');
50+
51+
if (empty($entries)) {
52+
$output->writeln('<info>No messages queued found in Redis stream "messages".</info>');
53+
return Command::SUCCESS;
54+
}
55+
56+
foreach ($entries as $id => $entry) {
57+
$output->writeln("<info>Message ID:</info> $id");
58+
$output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
59+
$output->writeln(str_repeat('-', 40));
60+
}
61+
62+
$total = count($entries);
63+
$output->writeln("<info>Total queued messages in stream 'messages':</info> $total");
64+
$output->writeln(str_repeat('-', 40));
65+
66+
return Command::SUCCESS;
67+
}
68+
}

src/Swoole/ConfigProvider.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Queue\Swoole\Command\Factory\StopCommandFactory;
1010
use Queue\Swoole\Command\GetFailedMessagesCommand;
1111
use Queue\Swoole\Command\GetProcessedMessagesCommand;
12+
use Queue\Swoole\Command\GetQueuedMessagesCommand;
1213
use Queue\Swoole\Command\StartCommand;
1314
use Queue\Swoole\Command\StopCommand;
1415
use Queue\Swoole\Delegators\TCPServerDelegator;
@@ -36,6 +37,7 @@ public function getDependencies(): array
3637
StopCommand::class => StopCommandFactory::class,
3738
GetProcessedMessagesCommand::class => AttributedServiceFactory::class,
3839
GetFailedMessagesCommand::class => AttributedServiceFactory::class,
40+
GetQueuedMessagesCommand::class => AttributedServiceFactory::class,
3941
],
4042
"aliases" => [],
4143
];

src/Swoole/Delegators/TCPServerDelegator.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use Queue\App\Message\ExampleMessage;
99
use Queue\Swoole\Command\GetFailedMessagesCommand;
1010
use Queue\Swoole\Command\GetProcessedMessagesCommand;
11+
use Queue\Swoole\Command\GetQueuedMessagesCommand;
1112
use Swoole\Server as TCPSwooleServer;
1213
use Symfony\Component\Console\Application;
1314
use Symfony\Component\Console\Input\ArrayInput;
@@ -37,6 +38,7 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal
3738
$commandMap = [
3839
'processed' => GetProcessedMessagesCommand::class,
3940
'failed' => GetFailedMessagesCommand::class,
41+
'inventory' => GetQueuedMessagesCommand::class,
4042
];
4143

4244
$server->on('Connect', function ($server, $fd) {

0 commit comments

Comments
 (0)