1+ <?php
2+
3+ declare (strict_types=1 );
4+
5+ namespace Spiral \RoadRunner \Tests \Http \Server ;
6+
7+ use Fiber ;
8+ use Spiral \Goridge \Frame ;
9+ use Spiral \RoadRunner \Tests \Http \Server \Command \BaseCommand ;
10+ use Spiral \RoadRunner \Tests \Http \Server \Command \StreamStop ;
11+
12+ /**
13+ * Client state on the server side.
14+ */
15+ class Client
16+ {
17+ private \Socket $ socket ;
18+
19+ /** @var string[] */
20+ private array $ writeQueue = [];
21+
22+ /** @var string */
23+ private string $ readBuffer = '' ;
24+
25+ public function __construct (
26+ \Socket $ socket ,
27+ ) {
28+ $ this ->socket = $ socket ;
29+ \socket_set_nonblock ($ this ->socket );
30+ }
31+
32+ public function __destruct ()
33+ {
34+ \socket_close ($ this ->socket );
35+ }
36+
37+ public static function init (\Socket $ socket ): self
38+ {
39+ return new self ($ socket );
40+ }
41+
42+ public function process (): void
43+ {
44+ $ this ->onInit ();
45+
46+ do {
47+ $ read = [$ this ->socket ];
48+ $ write = [$ this ->socket ];
49+ $ except = [$ this ->socket ];
50+ if (\socket_select ($ read , $ write , $ except , 0 , 0 ) === false ) {
51+ throw new \RuntimeException ('Socket select failed. ' );
52+ }
53+
54+ if ($ read !== []) {
55+ $ this ->readMessage ();
56+ }
57+
58+ if ($ write !== [] && $ this ->writeQueue !== []) {
59+ $ this ->writeQueue ();
60+ }
61+
62+ Fiber::suspend ();
63+ } while (true );
64+ }
65+
66+ private function onInit ()
67+ {
68+ $ this ->writeQueue [] = Frame::packFrame (new Frame ('{"pid":true} ' , [], Frame::CONTROL ));
69+ }
70+
71+ private function onFrame (Frame $ frame ): void
72+ {
73+ $ command = $ this ->getCommand ($ frame );
74+
75+ if ($ command === null ) {
76+ echo \substr ($ frame ->payload , $ frame ->options [0 ]) . "\n" ;
77+ return ;
78+ }
79+
80+ $ this ->onCommand ($ command );
81+ }
82+
83+ private function writeQueue (): void
84+ {
85+ foreach ($ this ->writeQueue as $ data ) {
86+ \socket_write ($ this ->socket , $ data );
87+ }
88+ socket_set_nonblock ($ this ->socket );
89+
90+ $ this ->writeQueue = [];
91+ }
92+
93+ /**
94+ * @see \Spiral\Goridge\SocketRelay::waitFrame()
95+ */
96+ private function readMessage (): void
97+ {
98+ $ header = $ this ->readNBytes (12 );
99+
100+ $ parts = Frame::readHeader ($ header );
101+ // total payload length
102+ $ length = $ parts [1 ] * 4 + $ parts [2 ];
103+
104+ if ($ length >= 8 * 1024 * 1024 ) {
105+ throw new \RuntimeException ('Frame payload is too large. ' );
106+ }
107+ $ payload = $ this ->readNBytes ($ length );
108+
109+ $ frame = Frame::initFrame ($ parts , $ payload );
110+
111+ $ this ->onFrame ($ frame );
112+ }
113+
114+ /**
115+ * @param positive-int $bytes
116+ *
117+ * @return non-empty-string
118+ */
119+ private function readNBytes (int $ bytes , bool $ canBeLess = false ): string
120+ {
121+ while (($ left = $ bytes - \strlen ($ this ->readBuffer )) > 0 ) {
122+ $ data = @\socket_read ($ this ->socket , $ left , \PHP_BINARY_READ );
123+ if ($ data === false ) {
124+ $ errNo = \socket_last_error ($ this ->socket );
125+ throw new \RuntimeException ('Socket read failed [ ' . $ errNo . ']: ' . \socket_strerror ($ errNo ));
126+ }
127+
128+ if ($ canBeLess ) {
129+ return $ data ;
130+ }
131+
132+ if ($ data === '' ) {
133+ Fiber::suspend ();
134+ continue ;
135+ }
136+
137+ $ this ->readBuffer .= $ data ;
138+ }
139+
140+ $ result = \substr ($ this ->readBuffer , 0 , $ bytes );
141+ $ this ->readBuffer = \substr ($ this ->readBuffer , $ bytes );
142+
143+ return $ result ;
144+ }
145+
146+ private function getCommand (Frame $ frame ): ?BaseCommand
147+ {
148+ $ payload = $ frame ->payload ;
149+ try {
150+ $ data = \json_decode ($ payload , true , 3 , \JSON_THROW_ON_ERROR );
151+ } catch (\JsonException ) {
152+ return null ;
153+ }
154+
155+ return match (false ) {
156+ \is_array ($ data ),
157+ \array_key_exists (BaseCommand::COMMAND_KEY , $ data ),
158+ \is_string ($ data [BaseCommand::COMMAND_KEY ]),
159+ \class_exists ($ data [BaseCommand::COMMAND_KEY ]),
160+ \is_a ($ data [BaseCommand::COMMAND_KEY ], BaseCommand::class, true ) => null ,
161+ default => new ($ data [BaseCommand::COMMAND_KEY ])(),
162+ };
163+ }
164+
165+ private function onCommand (BaseCommand $ command ): void
166+ {
167+ switch ($ command ::class) {
168+ case StreamStop::class:
169+ $ this ->writeQueue [] = $ command ->getResponse ();
170+ break ;
171+ }
172+ }
173+ }
0 commit comments