Skip to content

Commit 08b890e

Browse files
authored
Merge pull request #32 from dotkernel/tcp-commands
run commands using TCP messages
2 parents f2bcab4 + 9d2f9fc commit 08b890e

File tree

1 file changed

+62
-13
lines changed

1 file changed

+62
-13
lines changed

src/Swoole/Delegators/TCPServerDelegator.php

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,22 @@
66

77
use Psr\Container\ContainerInterface;
88
use Queue\App\Message\ExampleMessage;
9+
use Queue\Swoole\Command\GetFailedMessagesCommand;
10+
use Queue\Swoole\Command\GetProcessedMessagesCommand;
911
use Swoole\Server as TCPSwooleServer;
12+
use Symfony\Component\Console\Application;
13+
use Symfony\Component\Console\Input\ArrayInput;
14+
use Symfony\Component\Console\Output\BufferedOutput;
1015
use Symfony\Component\Messenger\MessageBusInterface;
1116
use Symfony\Component\Messenger\Stamp\DelayStamp;
1217

18+
use function array_merge;
19+
use function array_shift;
20+
use function explode;
21+
use function ltrim;
22+
use function str_starts_with;
23+
use function trim;
24+
1325
class TCPServerDelegator
1426
{
1527
public function __invoke(ContainerInterface $container, string $serviceName, callable $callback): TCPSwooleServer
@@ -22,25 +34,62 @@ public function __invoke(ContainerInterface $container, string $serviceName, cal
2234

2335
$logger = $container->get("dot-log.queue-log");
2436

37+
$commandMap = [
38+
'processed' => GetProcessedMessagesCommand::class,
39+
'failed' => GetFailedMessagesCommand::class,
40+
];
41+
2542
$server->on('Connect', function ($server, $fd) {
2643
echo "Client: Connect.\n";
2744
});
2845

29-
// Register the function for the event `receive`
30-
$server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus) {
31-
$bus->dispatch(new ExampleMessage(["foo" => $data]));
32-
$bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [
33-
new DelayStamp(5000),
34-
]);
35-
36-
$server->send($fd, "Server: {$data}");
37-
$logger->notice("Request received on receive", [
38-
'fd' => $fd,
39-
'from_id' => $fromId,
40-
]);
46+
$server->on('receive', function ($server, $fd, $fromId, $data) use ($logger, $bus, $commandMap, $container) {
47+
$message = trim($data);
48+
$response = '';
49+
50+
$args = explode(' ', $message);
51+
$commandName = array_shift($args);
52+
53+
if (isset($commandMap[$commandName])) {
54+
$commandClass = $commandMap[$commandName];
55+
$application = new Application();
56+
$commandInstance = $container->get($commandClass);
57+
$application->add($commandInstance);
58+
59+
$parsedOptions = [];
60+
foreach ($args as $arg) {
61+
if (str_starts_with($arg, '--')) {
62+
[$key, $value] = explode('=', ltrim($arg, '-'), 2) + [null, null];
63+
$parsedOptions["--$key"] = $value;
64+
}
65+
}
66+
67+
$inputData = array_merge(['command' => $commandName], $parsedOptions);
68+
$input = new ArrayInput($inputData);
69+
$output = new BufferedOutput();
70+
71+
try {
72+
$application->setAutoExit(false);
73+
$application->run($input, $output);
74+
$response = $output->fetch();
75+
$server->send($fd, $response);
76+
} catch (\Throwable $e) {
77+
$logger->error("Error running command: " . $e->getMessage());
78+
}
79+
} else {
80+
$bus->dispatch(new ExampleMessage(["foo" => $data]));
81+
$bus->dispatch(new ExampleMessage(["foo" => "with 5 seconds delay"]), [
82+
new DelayStamp(5000),
83+
]);
84+
85+
$logger->notice("TCP request received", [
86+
'fd' => $fd,
87+
'from_id' => $fromId,
88+
'data' => $data,
89+
]);
90+
}
4191
});
4292

43-
// Listen for the 'Close' event.
4493
$server->on('Close', function ($server, $fd) {
4594
echo "Client: Close.\n";
4695
});

0 commit comments

Comments
 (0)