Skip to content

Commit a8870e4

Browse files
committed
manage logs for processed and failed messages
Signed-off-by: bota <[email protected]>
1 parent b042747 commit a8870e4

File tree

6 files changed

+254
-9
lines changed

6 files changed

+254
-9
lines changed

config/autoload/cli.global.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
declare(strict_types=1);
44

55
use Dot\Cli\FileLockerInterface;
6+
use Queue\Swoole\Command\GetFailedMessagesCommand;
7+
use Queue\Swoole\Command\GetProcessedMessagesCommand;
68
use Queue\Swoole\Command\StartCommand;
79
use Queue\Swoole\Command\StopCommand;
810
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
@@ -13,10 +15,12 @@
1315
'version' => '1.0.0',
1416
'name' => 'DotKernel CLI',
1517
'commands' => [
16-
"swoole:start" => StartCommand::class,
17-
"swoole:stop" => StopCommand::class,
18-
"messenger:start" => ConsumeMessagesCommand::class,
19-
"messenger:debug" => DebugCommand::class,
18+
"swoole:start" => StartCommand::class,
19+
"swoole:stop" => StopCommand::class,
20+
"messenger:start" => ConsumeMessagesCommand::class,
21+
"messenger:debug" => DebugCommand::class,
22+
"messenger:processed" => GetProcessedMessagesCommand::class,
23+
"messenger:failed" => GetFailedMessagesCommand::class,
2024
],
2125
],
2226
FileLockerInterface::class => [

config/autoload/local.php.dist

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,9 @@ return [
4646
'eof' => "\n",
4747
],
4848
],
49+
'fail-safe' => [
50+
'first_retry' => 3600000,
51+
'second_retry' => 43200000,
52+
'third_retry' => 86400000,
53+
],
4954
];

src/App/Message/ExampleMessageHandler.php

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,62 @@
66

77
use Dot\DependencyInjection\Attribute\Inject;
88
use Dot\Log\Logger;
9+
use Symfony\Component\Messenger\Exception\ExceptionInterface;
10+
use Symfony\Component\Messenger\MessageBusInterface;
11+
use Symfony\Component\Messenger\Stamp\DelayStamp;
912

1013
class ExampleMessageHandler
1114
{
1215
#[Inject(
16+
MessageBusInterface::class,
1317
'dot-log.queue-log',
1418
'config',
1519
)]
1620
public function __construct(
21+
protected MessageBusInterface $bus,
1722
protected Logger $logger,
1823
protected array $config,
1924
) {
2025
}
2126

2227
public function __invoke(ExampleMessage $message): void
2328
{
24-
$this->logger->info("message: " . $message->getPayload()['foo']);
29+
try {
30+
// Throwing an exception to satisfy PHPStan (replace with own code)
31+
throw new \Exception("Failed to execute");
32+
} catch (\Throwable $exception) {
33+
$payload = $message->getPayload();
34+
$this->logger->error($payload['foo'] . ' failed with message: '
35+
. $exception->getMessage() . ' after ' . ($payload['retry'] ?? 0) . ' retries');
36+
$this->retry($payload);
37+
}
38+
}
39+
40+
/**
41+
* @throws ExceptionInterface
42+
*/
43+
public function retry(array $payload): void
44+
{
45+
if (! isset($payload['retry'])) {
46+
$this->bus->dispatch(new ExampleMessage(["foo" => $payload['foo'], 'retry' => 1]), [
47+
new DelayStamp($this->config['fail-safe']['first_retry']),
48+
]);
49+
} else {
50+
$retry = $payload['retry'];
51+
switch ($retry) {
52+
case 1:
53+
$delay = $this->config['fail-safe']['second_retry'];
54+
$this->bus->dispatch(new ExampleMessage(["foo" => $payload['foo'], 'retry' => ++$retry]), [
55+
new DelayStamp($delay),
56+
]);
57+
break;
58+
case 2:
59+
$delay = $this->config['fail-safe']['third_retry'];
60+
$this->bus->dispatch(new ExampleMessage(["foo" => $payload['foo'], 'retry' => ++$retry]), [
61+
new DelayStamp($delay),
62+
]);
63+
break;
64+
}
65+
}
2566
}
2667
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Queue\Swoole\Command;
6+
7+
use Dot\DependencyInjection\Attribute\Inject;
8+
use Symfony\Component\Console\Attribute\AsCommand;
9+
use Symfony\Component\Console\Command\Command;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
12+
use Symfony\Component\Console\Output\OutputInterface;
13+
14+
use function date;
15+
use function file;
16+
use function is_numeric;
17+
use function json_decode;
18+
use function preg_match;
19+
use function strtolower;
20+
use function strtotime;
21+
22+
use const FILE_IGNORE_NEW_LINES;
23+
use const FILE_SKIP_EMPTY_LINES;
24+
25+
#[AsCommand(
26+
name: 'failed',
27+
description: 'Get processing failure messages.',
28+
)]
29+
class GetFailedMessagesCommand extends Command
30+
{
31+
protected static string $defaultName = 'failed';
32+
33+
#[Inject()]
34+
public function __construct()
35+
{
36+
parent::__construct(self::$defaultName);
37+
}
38+
39+
protected function configure(): void
40+
{
41+
$this->setDescription('Get processing failure messages.')
42+
->addOption('start', null, InputOption::VALUE_OPTIONAL, 'Start timestamp (Y-m-d H:i:s)')
43+
->addOption('end', null, InputOption::VALUE_OPTIONAL, 'End timestamp (Y-m-d H:i:s)')
44+
->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit in days');
45+
}
46+
47+
protected function execute(InputInterface $input, OutputInterface $output): int
48+
{
49+
$start = $input->getOption('start');
50+
$end = $input->getOption('end');
51+
$limit = $input->getOption('limit');
52+
53+
if (! $end) {
54+
$end = date('Y-m-d H:i:s');
55+
} elseif (! preg_match('/\d{2}:\d{2}:\d{2}/', $end)) {
56+
$end .= ' 23:59:59';
57+
}
58+
59+
if ($limit && is_numeric($limit) && ! $start) {
60+
$start = date('Y-m-d H:i:s', strtotime("-{$limit} days", strtotime($end)));
61+
} elseif ($start && ! preg_match('/\d{2}:\d{2}:\d{2}/', $start)) {
62+
$start .= ' 00:00:00';
63+
}
64+
65+
$startTimestamp = $start ? strtotime($start) : null;
66+
$endTimestamp = $end ? strtotime($end) : null;
67+
68+
$logPath = 'log/queue-log.log';
69+
$lines = file($logPath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
70+
71+
foreach ($lines as $line) {
72+
$entry = json_decode($line, true);
73+
74+
if (! $entry || ! isset($entry['levelName'], $entry['timestamp'])) {
75+
continue;
76+
}
77+
78+
if (strtolower($entry['levelName']) !== 'error') {
79+
continue;
80+
}
81+
82+
$logTimestamp = strtotime($entry['timestamp']);
83+
if (
84+
($startTimestamp && $logTimestamp < $startTimestamp) ||
85+
($endTimestamp && $logTimestamp > $endTimestamp)
86+
) {
87+
continue;
88+
}
89+
90+
$output->writeln($line);
91+
}
92+
93+
return Command::SUCCESS;
94+
}
95+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Queue\Swoole\Command;
6+
7+
use Dot\DependencyInjection\Attribute\Inject;
8+
use Symfony\Component\Console\Attribute\AsCommand;
9+
use Symfony\Component\Console\Command\Command;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
12+
use Symfony\Component\Console\Output\OutputInterface;
13+
14+
use function date;
15+
use function file;
16+
use function is_numeric;
17+
use function json_decode;
18+
use function preg_match;
19+
use function strtolower;
20+
use function strtotime;
21+
22+
use const FILE_IGNORE_NEW_LINES;
23+
use const FILE_SKIP_EMPTY_LINES;
24+
25+
#[AsCommand(
26+
name: 'processed',
27+
description: 'Get successfully processed messages',
28+
)]
29+
class GetProcessedMessagesCommand extends Command
30+
{
31+
protected static string $defaultName = 'processed';
32+
33+
#[Inject()]
34+
public function __construct()
35+
{
36+
parent::__construct(self::$defaultName);
37+
}
38+
39+
protected function configure(): void
40+
{
41+
$this->setDescription('Get successfully processed messages')
42+
->addOption('start', null, InputOption::VALUE_OPTIONAL, 'Start timestamp (Y-m-d H:i:s)')
43+
->addOption('end', null, InputOption::VALUE_OPTIONAL, 'End timestamp (Y-m-d H:i:s)')
44+
->addOption('limit', null, InputOption::VALUE_OPTIONAL, 'Limit in days');
45+
}
46+
47+
protected function execute(InputInterface $input, OutputInterface $output): int
48+
{
49+
$start = $input->getOption('start');
50+
$end = $input->getOption('end');
51+
$limit = $input->getOption('limit');
52+
53+
if (! $end) {
54+
$end = date('Y-m-d H:i:s');
55+
} elseif (! preg_match('/\d{2}:\d{2}:\d{2}/', $end)) {
56+
$end .= ' 23:59:59';
57+
}
58+
59+
if ($limit && is_numeric($limit) && ! $start) {
60+
$start = date('Y-m-d H:i:s', strtotime("-{$limit} days", strtotime($end)));
61+
} elseif ($start && ! preg_match('/\d{2}:\d{2}:\d{2}/', $start)) {
62+
$start .= ' 00:00:00';
63+
}
64+
65+
$startTimestamp = $start ? strtotime($start) : null;
66+
$endTimestamp = $end ? strtotime($end) : null;
67+
68+
$logPath = 'log/queue-log.log';
69+
$lines = file($logPath, FILE_IGNORE_NEW_LINES | FILE_SKIP_EMPTY_LINES);
70+
71+
foreach ($lines as $line) {
72+
$entry = json_decode($line, true);
73+
74+
if (! $entry || ! isset($entry['levelName'], $entry['timestamp'])) {
75+
continue;
76+
}
77+
78+
if (strtolower($entry['levelName']) !== 'info') {
79+
continue;
80+
}
81+
82+
$logTimestamp = strtotime($entry['timestamp']);
83+
if (
84+
($startTimestamp && $logTimestamp < $startTimestamp) ||
85+
($endTimestamp && $logTimestamp > $endTimestamp)
86+
) {
87+
continue;
88+
}
89+
90+
$output->writeln($line);
91+
}
92+
93+
return Command::SUCCESS;
94+
}
95+
}

src/Swoole/ConfigProvider.php

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
namespace Queue\Swoole;
66

7+
use Dot\DependencyInjection\Factory\AttributedServiceFactory;
78
use Queue\Swoole\Command\Factory\StartCommandFactory;
89
use Queue\Swoole\Command\Factory\StopCommandFactory;
10+
use Queue\Swoole\Command\GetFailedMessagesCommand;
11+
use Queue\Swoole\Command\GetProcessedMessagesCommand;
912
use Queue\Swoole\Command\StartCommand;
1013
use Queue\Swoole\Command\StopCommand;
1114
use Queue\Swoole\Delegators\TCPServerDelegator;
@@ -27,10 +30,12 @@ public function getDependencies(): array
2730
TCPSwooleServer::class => [TCPServerDelegator::class],
2831
],
2932
"factories" => [
30-
TCPSwooleServer::class => ServerFactory::class,
31-
PidManager::class => PidManagerFactory::class,
32-
StartCommand::class => StartCommandFactory::class,
33-
StopCommand::class => StopCommandFactory::class,
33+
TCPSwooleServer::class => ServerFactory::class,
34+
PidManager::class => PidManagerFactory::class,
35+
StartCommand::class => StartCommandFactory::class,
36+
StopCommand::class => StopCommandFactory::class,
37+
GetProcessedMessagesCommand::class => AttributedServiceFactory::class,
38+
GetFailedMessagesCommand::class => AttributedServiceFactory::class,
3439
],
3540
"aliases" => [],
3641
];

0 commit comments

Comments
 (0)