3939use Ripple \Socket ;
4040use Ripple \Stream ;
4141use Ripple \Stream \Exception \ConnectionException ;
42+ use Ripple \Tunnel \Http ;
43+ use Ripple \Tunnel \Socks5 ;
4244use Ripple \Utils \Output ;
4345use Ripple \WebSocket \Utils ;
4446use RuntimeException ;
4850use function base64_encode ;
4951use function call_user_func ;
5052use function chr ;
51- use function Co \async ;
53+ use function Co \go ;
5254use function Co \promise ;
5355use function explode ;
5456use function http_build_query ;
6668use function trim ;
6769use function ucwords ;
6870use function unpack ;
71+ use function getenv ;
72+ use function in_array ;
73+ use function is_array ;
6974
7075use const PHP_URL_HOST ;
7176use const PHP_URL_PATH ;
@@ -113,12 +118,12 @@ class Client
113118 /**
114119 * @param \Symfony\Component\HttpFoundation\Request|string $request
115120 * @param int|float $timeout
116- * @param mixed|null $context
121+ * @param array $options
117122 */
118123 public function __construct (
119124 Request |string $ request ,
120125 protected readonly int |float $ timeout = 10 ,
121- protected readonly mixed $ context = null
126+ protected readonly array $ options = []
122127 ) {
123128 if ($ request instanceof Request) {
124129 $ this ->request = $ request ;
@@ -134,20 +139,28 @@ public function __construct(
134139 default => throw new RuntimeException ('Unsupported scheme ' ),
135140 };
136141
142+ $ requestServer = [
143+ 'REQUEST_METHOD ' => 'GET ' ,
144+ 'REQUEST_URI ' => parse_url ($ request , PHP_URL_PATH ) ?? '/ ' ,
145+ 'HTTP_HOST ' => "{$ host }: {$ port }" ,
146+ 'HTTP_UPGRADE ' => 'websocket ' ,
147+ 'HTTP_CONNECTION ' => 'Upgrade ' ,
148+ 'SERVER_PORT ' => parse_url ($ request , PHP_URL_PORT )
149+ ];
150+
151+ if (isset ($ this ->options ['header ' ]) && is_array ($ this ->options ['header ' ])) {
152+ foreach ($ this ->options ['header ' ] as $ key => $ value ) {
153+ $ requestServer ['HTTP_ ' . strtoupper (str_replace ('- ' , '_ ' , $ key ))] = $ value ;
154+ }
155+ }
156+
137157 $ this ->request = new Request (
138158 $ query ,
139159 [],
140160 [],
141161 [],
142162 [],
143- [
144- 'REQUEST_METHOD ' => 'GET ' ,
145- 'REQUEST_URI ' => parse_url ($ request , PHP_URL_PATH ) ?? '/ ' ,
146- 'HTTP_HOST ' => "{$ host }: {$ port }" ,
147- 'HTTP_UPGRADE ' => 'websocket ' ,
148- 'HTTP_CONNECTION ' => 'Upgrade ' ,
149- 'SERVER_PORT ' => parse_url ($ request , PHP_URL_PORT )
150- ]
163+ $ requestServer
151164 );
152165 }
153166
@@ -161,7 +174,7 @@ public function __construct(
161174 throw new RuntimeException ('Failed to generate random bytes ' );
162175 }
163176
164- async (function () {
177+ go (function () {
165178 try {
166179 $ this ->handshake ();
167180 if (isset ($ this ->onOpen )) {
@@ -226,17 +239,69 @@ protected function handshake(): void
226239 $ host = $ this ->request ->getHost ();
227240 $ uri = $ this ->request ->getRequestUri ();
228241 $ port = $ this ->request ->getPort ();
229-
230242 $ query = http_build_query ($ this ->request ->query ->all ());
231- $ this ->stream = match ($ scheme ) {
232- 'ws ' => Socket::connect ("tcp:// {$ host }: {$ port }" , $ this ->timeout , $ this ->context ),
233- 'wss ' => Socket::connectWithSSL ("ssl:// {$ host }: {$ port }" , $ this ->timeout , $ this ->context ),
234- default => throw new Exception ('Unsupported scheme ' ),
243+ $ ssl = $ scheme === 'wss ' ;
244+
245+ $ proxy = match ($ scheme ) {
246+ 'ws ' => getenv ('http_proxy ' ),
247+ 'wss ' => getenv ('https_proxy ' ),
248+ default => null ,
235249 };
236250
237- $ this ->stream ->setBlocking (false );
251+ if ($ this ->options ['proxy ' ] ?? null ) {
252+ $ proxy = $ this ->options ['proxy ' ];
253+ }
254+
255+ if ($ proxy && in_array ($ host , ['127.0.0.1 ' , 'localhost ' , '::1 ' ], true )) {
256+ $ proxy = null ;
257+ }
238258
239- $ context = "{$ method } {$ uri }{$ query } HTTP/1.1 \r\n" ;
259+ if ($ proxy ) {
260+ $ proxyParse = parse_url ($ proxy );
261+ if (!isset ($ proxyParse ['host ' ], $ proxyParse ['port ' ])) {
262+ throw new ConnectionException ('Invalid proxy address ' , ConnectionException::CONNECTION_ERROR );
263+ }
264+
265+ $ payload = [
266+ 'host ' => $ host ,
267+ 'port ' => $ port ,
268+ ];
269+ if (isset ($ proxyParse ['user ' ], $ proxyParse ['pass ' ])) {
270+ $ payload ['username ' ] = $ proxyParse ['user ' ];
271+ $ payload ['password ' ] = $ proxyParse ['pass ' ];
272+ }
273+
274+ switch ($ proxyParse ['scheme ' ]) {
275+ case 'socks ' :
276+ case 'socks5 ' :
277+ $ tunnelSocket = Socks5::connect ("tcp:// {$ proxyParse ['host ' ]}: {$ proxyParse ['port ' ]}" , $ payload )->getSocket ();
278+ $ ssl && $ tunnelSocket ->enableSSL ();
279+ $ this ->stream = $ tunnelSocket ;
280+ break ;
281+ case 'http ' :
282+ $ tunnelSocket = Http::connect ("tcp:// {$ proxyParse ['host ' ]}: {$ proxyParse ['port ' ]}" , $ payload )->getSocket ();
283+ $ ssl && $ tunnelSocket ->enableSSL ();
284+ $ this ->stream = $ tunnelSocket ;
285+ break ;
286+ case 'https ' :
287+ $ tunnel = Socket::connectWithSSL ("tcp:// {$ proxyParse ['host ' ]}: {$ proxyParse ['port ' ]}" , $ this ->timeout );
288+ $ tunnelSocket = Http::connect ($ tunnel , $ payload )->getSocket ();
289+ $ ssl && $ tunnelSocket ->enableSSL ();
290+ $ this ->stream = $ tunnelSocket ;
291+ break ;
292+ default :
293+ throw new ConnectionException ('Unsupported proxy protocol ' , ConnectionException::CONNECTION_ERROR );
294+ }
295+ } else {
296+ $ this ->stream = match ($ scheme ) {
297+ 'ws ' => Socket::connect ("tcp:// {$ host }: {$ port }" , $ this ->timeout ),
298+ 'wss ' => Socket::connectWithSSL ("ssl:// {$ host }: {$ port }" , $ this ->timeout ),
299+ default => throw new Exception ('Unsupported scheme ' ),
300+ };
301+ }
302+
303+ $ this ->stream ->setBlocking (false );
304+ $ context = "{$ method } {$ uri }? {$ query } HTTP/1.1 \r\n" ;
240305 foreach ($ this ->request ->headers ->all () as $ name => $ values ) {
241306 $ name = str_replace (' ' , '- ' , ucwords (str_replace ('- ' , ' ' , $ name )));
242307 foreach ($ values as $ value ) {
0 commit comments