1
- use std:: {
2
- borrow:: Cow ,
3
- collections:: HashMap as StdHashMap ,
4
- net:: SocketAddr ,
5
- sync:: Arc ,
6
- time:: { Duration , Instant } ,
7
- } ;
8
-
9
1
use anyhow:: * ;
10
2
use bytes:: Bytes ;
11
3
use futures_util:: { SinkExt , StreamExt } ;
@@ -16,17 +8,30 @@ use hyper_util::{client::legacy::Client, rt::TokioExecutor};
16
8
use moka:: future:: Cache ;
17
9
use rand;
18
10
use rivet_api_builder:: { ErrorResponse , RawErrorResponse } ;
19
- use rivet_error:: RivetError ;
11
+ use rivet_error:: { INTERNAL_ERROR , RivetError } ;
20
12
use rivet_metrics:: KeyValue ;
21
13
use rivet_util:: Id ;
22
14
use serde_json;
15
+ use std:: {
16
+ borrow:: Cow ,
17
+ collections:: HashMap as StdHashMap ,
18
+ net:: SocketAddr ,
19
+ sync:: Arc ,
20
+ time:: { Duration , Instant } ,
21
+ } ;
23
22
use tokio:: sync:: Mutex ;
24
23
use tokio:: time:: timeout;
25
- use tokio_tungstenite:: tungstenite:: client:: IntoClientRequest ;
24
+ use tokio_tungstenite:: tungstenite:: {
25
+ client:: IntoClientRequest ,
26
+ protocol:: { CloseFrame , frame:: coding:: CloseCode } ,
27
+ } ;
26
28
use tracing:: Instrument ;
27
29
use url:: Url ;
28
30
29
- use crate :: { custom_serve:: CustomServeTrait , errors, metrics, request_context:: RequestContext } ;
31
+ use crate :: {
32
+ WebSocketHandle , custom_serve:: CustomServeTrait , errors, metrics,
33
+ request_context:: RequestContext ,
34
+ } ;
30
35
31
36
pub const X_FORWARDED_FOR : HeaderName = HeaderName :: from_static ( "x-forwarded-for" ) ;
32
37
pub const X_RIVET_ERROR : HeaderName = HeaderName :: from_static ( "x-rivet-error" ) ;
@@ -1432,9 +1437,9 @@ impl ProxyService {
1432
1437
1433
1438
// Close the WebSocket connection with the response message
1434
1439
let _ = client_ws. close ( Some ( tokio_tungstenite:: tungstenite:: protocol:: CloseFrame {
1435
- code : tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1436
- reason : response. message . as_ref ( ) . into ( ) ,
1437
- } ) ) . await ;
1440
+ code : tokio_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1441
+ reason : response. message . as_ref ( ) . into ( ) ,
1442
+ } ) ) . await ;
1438
1443
return ;
1439
1444
}
1440
1445
Result :: Ok ( ResolveRouteOutput :: CustomServe ( _) ) => {
@@ -1813,31 +1818,42 @@ impl ProxyService {
1813
1818
let mut attempts = 0u32 ;
1814
1819
let mut client_ws = client_websocket;
1815
1820
1821
+ let ws_handle = WebSocketHandle :: new ( client_ws) ;
1822
+
1816
1823
loop {
1817
1824
match handlers
1818
1825
. handle_websocket (
1819
- client_ws ,
1826
+ ws_handle . clone ( ) ,
1820
1827
& req_headers,
1821
1828
& req_path,
1822
1829
& mut request_context,
1823
1830
)
1824
1831
. await
1825
1832
{
1826
- Result :: Ok ( ( ) ) => break ,
1827
- Result :: Err ( ( returned_client_ws, err) ) => {
1833
+ Result :: Ok ( ( ) ) => {
1834
+ tracing:: debug!( "websocket closed" ) ;
1835
+
1836
+ // Send graceful close
1837
+ ws_handle. send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1838
+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1839
+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Normal ,
1840
+ reason : format ! ( "Closed" ) . into ( ) ,
1841
+ } ,
1842
+ ) ) ) ;
1843
+
1844
+ break ;
1845
+ }
1846
+ Result :: Err ( err) => {
1828
1847
attempts += 1 ;
1829
1848
if attempts > max_attempts || !is_retryable_ws_error ( & err) {
1830
- // Accept and close the client websocket with an error reason
1831
- if let Result :: Ok ( mut ws) = returned_client_ws. await {
1832
- let _ = ws
1833
- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1834
- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1835
- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1836
- reason : format ! ( "{}" , err) . into ( ) ,
1837
- } ,
1838
- ) ) )
1839
- . await ;
1840
- }
1849
+ // Close WebSocket with error
1850
+ ws_handle
1851
+ . accept_and_send (
1852
+ hyper_tungstenite:: tungstenite:: Message :: Close (
1853
+ Some ( err_to_close_frame ( err) ) ,
1854
+ ) ,
1855
+ )
1856
+ . await ?;
1841
1857
1842
1858
break ;
1843
1859
} else {
@@ -1861,49 +1877,38 @@ impl ProxyService {
1861
1877
new_handlers,
1862
1878
) ) => {
1863
1879
handlers = new_handlers;
1864
- client_ws = returned_client_ws;
1865
1880
continue ;
1866
1881
}
1867
1882
Result :: Ok ( ResolveRouteOutput :: Response ( response) ) => {
1868
- if let Result :: Ok ( mut ws) = returned_client_ws. await
1869
- {
1870
- let _ = ws
1871
- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1872
- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1873
- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1874
- reason : response. message . as_ref ( ) . into ( ) ,
1875
- } ,
1876
- ) ) )
1877
- . await ;
1878
- }
1879
- break ;
1883
+ ws_handle
1884
+ . accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1885
+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1886
+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1887
+ reason : response. message . as_ref ( ) . into ( ) ,
1888
+ } ,
1889
+ ) ) )
1890
+ . await ;
1880
1891
}
1881
1892
Result :: Ok ( ResolveRouteOutput :: Target ( _) ) => {
1882
- if let Result :: Ok ( mut ws) = returned_client_ws. await
1883
- {
1884
- let _ = ws
1885
- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1886
- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1887
- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1888
- reason : "Cannot retry WebSocket with non-custom serve route" . into ( ) ,
1889
- } ,
1890
- ) ) )
1891
- . await ;
1892
- }
1893
+ ws_handle
1894
+ . accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1895
+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1896
+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1897
+ reason : "Cannot retry WebSocket with non-custom serve route" . into ( ) ,
1898
+ } ,
1899
+ ) ) )
1900
+ . await ;
1893
1901
break ;
1894
1902
}
1895
1903
Err ( res_err) => {
1896
- if let Result :: Ok ( mut ws) = returned_client_ws. await
1897
- {
1898
- let _ = ws
1899
- . send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1900
- hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1901
- code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1902
- reason : format ! ( "Routing error: {}" , res_err) . into ( ) ,
1903
- } ,
1904
- ) ) )
1905
- . await ;
1906
- }
1904
+ ws_handle
1905
+ . accept_and_send ( hyper_tungstenite:: tungstenite:: Message :: Close ( Some (
1906
+ hyper_tungstenite:: tungstenite:: protocol:: CloseFrame {
1907
+ code : hyper_tungstenite:: tungstenite:: protocol:: frame:: coding:: CloseCode :: Error ,
1908
+ reason : format ! ( "Routing error: {}" , res_err) . into ( ) ,
1909
+ } ,
1910
+ ) ) )
1911
+ . await ;
1907
1912
break ;
1908
1913
}
1909
1914
}
@@ -2242,3 +2247,26 @@ fn is_retryable_ws_error(err: &anyhow::Error) -> bool {
2242
2247
false
2243
2248
}
2244
2249
}
2250
+
2251
+ pub fn err_to_close_frame ( err : anyhow:: Error ) -> CloseFrame {
2252
+ let rivet_err = err
2253
+ . chain ( )
2254
+ . find_map ( |x| x. downcast_ref :: < RivetError > ( ) )
2255
+ . cloned ( )
2256
+ . unwrap_or_else ( || RivetError :: from ( & INTERNAL_ERROR ) ) ;
2257
+
2258
+ let code = match ( rivet_err. group ( ) , rivet_err. code ( ) ) {
2259
+ ( "ws" , "connection_closed" ) => CloseCode :: Normal ,
2260
+ _ => CloseCode :: Error ,
2261
+ } ;
2262
+
2263
+ // NOTE: reason cannot be more than 123 bytes as per the WS protocol
2264
+ let reason = rivet_util:: safe_slice (
2265
+ & format ! ( "{}.{}" , rivet_err. group( ) , rivet_err. code( ) ) ,
2266
+ 0 ,
2267
+ 123 ,
2268
+ )
2269
+ . into ( ) ;
2270
+
2271
+ CloseFrame { code, reason }
2272
+ }
0 commit comments