Skip to content

Commit dc7d42d

Browse files
committed
run commands using tcp messages
Signed-off-by: bota <[email protected]>
1 parent f2bcab4 commit dc7d42d

File tree

1 file changed

+63
-13
lines changed

1 file changed

+63
-13
lines changed

src/Swoole/Delegators/TCPServerDelegator.php

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,23 @@
66

77
use Psr\Container\ContainerInterface;
88
use Queue\App\Message\ExampleMessage;
9+
use Queue\Swoole\Command\GetFailedMessagesCommand;
10+
// Import your commands
11+
use Queue\Swoole\Command\GetProcessedMessagesCommand;
912
use Swoole\Server as TCPSwooleServer;
13+
use Symfony\Component\Console\Application;
14+
use Symfony\Component\Console\Input\ArrayInput;
15+
use Symfony\Component\Console\Output\BufferedOutput;
1016
use Symfony\Component\Messenger\MessageBusInterface;
1117
use Symfony\Component\Messenger\Stamp\DelayStamp;
1218

19+
use function array_merge;
20+
use function array_shift;
21+
use function explode;
22+
use function ltrim;
23+
use function str_starts_with;
24+
use function trim;
25+
1326
class TCPServerDelegator
1427
{
1528
public function __invoke(ContainerInterface $container, string $serviceName, callable $callback): TCPSwooleServer
@@ -22,25 +35,62 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal
2235

2336
$logger = $container->get("dot-log.queue-log");
2437

38+
$commandMap = [
39+
'processed' => GetProcessedMessagesCommand::class,
40+
'failed' => GetFailedMessagesCommand::class,
41+
];
42+
2543
$server->on('Connect', function ($server, $fd) {
2644
echo "Client: Connect.\n";
2745
});
2846

29-
// Register the function for the event `receive`
30-
$server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus) {
31-
$bus->dispatch(new ExampleMessage(["foo" => $data]));
32-
$bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [
33-
new DelayStamp(5000),
34-
]);
35-
36-
$server->send($fd, "Server: {$data}");
37-
$logger->notice("Request received on receive", [
38-
'fd' => $fd,
39-
'from_id' => $fromId,
40-
]);
47+
$server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus, $commandMap, $container) {
48+
$message = trim($data);
49+
$response = '';
50+
51+
$args = explode(' ', $message);
52+
$commandName = array_shift($args);
53+
54+
if (isset($commandMap[$commandName])) {
55+
$commandClass = $commandMap[$commandName];
56+
$application = new Application();
57+
$commandInstance = $container->get($commandClass);
58+
$application->add($commandInstance);
59+
60+
$parsedOptions = [];
61+
foreach ($args as $arg) {
62+
if (str_starts_with($arg, '--')) {
63+
[$key, $value] = explode('=', ltrim($arg, '-'), 2) + [null, null];
64+
$parsedOptions["--$key"] = $value;
65+
}
66+
}
67+
68+
$inputData = array_merge(['command' => $commandName], $parsedOptions);
69+
$input = new ArrayInput($inputData);
70+
$output = new BufferedOutput();
71+
72+
try {
73+
$application->setAutoExit(false);
74+
$application->run($input, $output);
75+
$response = $output->fetch();
76+
$server->send($fd, $response);
77+
} catch (\Throwable $e) {
78+
$logger->error("Error running command: " . $e->getMessage());
79+
}
80+
} else {
81+
$bus->dispatch(new ExampleMessage(["foo" => $data]));
82+
$bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [
83+
new DelayStamp(5000),
84+
]);
85+
86+
$logger->notice("TCP request received", [
87+
'fd' => $fd,
88+
'from_id' => $fromId,
89+
'data' => $data,
90+
]);
91+
}
4192
});
4293

43-
// Listen for the 'Close' event.
4494
$server->on('Close', function ($server, $fd) {
4595
echo "Client: Close.\n";
4696
});

0 commit comments

Comments
 (0)