@@ -21,24 +21,18 @@ use databend_common_base::base::tokio::net::TcpStream;
2121use databend_common_base:: base:: tokio:: task:: JoinHandle ;
2222use databend_common_base:: runtime:: Runtime ;
2323use databend_common_base:: runtime:: TrySpawn ;
24- use databend_common_catalog:: session_type:: SessionType ;
2524use databend_common_exception:: ErrorCode ;
2625use databend_common_exception:: Result ;
2726use futures:: future:: AbortHandle ;
2827use futures:: future:: AbortRegistration ;
2928use futures:: future:: Abortable ;
3029use futures:: StreamExt ;
3130use log:: error;
32- use log:: info;
33- use log:: warn;
34- use opensrv_mysql:: * ;
3531use rustls:: ServerConfig ;
36- use socket2:: SockRef ;
3732use socket2:: TcpKeepalive ;
3833use tokio_stream:: wrappers:: TcpListenerStream ;
3934
4035use crate :: servers:: mysql:: mysql_session:: MySQLConnection ;
41- use crate :: servers:: mysql:: reject_connection:: RejectConnection ;
4236use crate :: servers:: mysql:: tls:: MySQLTlsConfig ;
4337use crate :: servers:: server:: ListeningStream ;
4438use crate :: servers:: server:: Server ;
@@ -110,50 +104,13 @@ impl MySQLHandler {
110104 tls : Option < Arc < ServerConfig > > ,
111105 ) {
112106 executor. spawn ( async move {
113- match session_manager. create_session ( SessionType :: MySQL ) . await {
114- Err ( error) => {
115- warn ! ( "create session failed, {:?}" , error) ;
116- Self :: reject_session ( socket, error) . await
117- }
118- Ok ( session) => {
119- info ! ( "MySQL connection coming: {:?}" , socket. peer_addr( ) ) ;
120-
121- match session_manager. register_session ( session) {
122- Ok ( session) => {
123- // TcpStream must implement AsFd for socket2 0.5, wait https://github.com/tokio-rs/tokio/pull/5514
124- if let Err ( e) = SockRef :: from ( & socket) . set_tcp_keepalive ( & keepalive) {
125- warn ! ( "failed to set socket option keepalive {}" , e) ;
126- }
127-
128- if let Err ( error) = MySQLConnection :: run_on_stream ( session, socket, tls)
129- {
130- error ! ( "Unexpected error occurred during query: {:?}" , error) ;
131- } ;
132- }
133- Err ( error) => {
134- warn ! ( "fail to register session, {:?}" , error) ;
135- Self :: reject_session ( socket, error) . await
136- }
137- }
138- }
107+ if let Err ( error) =
108+ MySQLConnection :: run_on_stream ( session_manager, socket, keepalive, tls) . await
109+ {
110+ error ! ( "Unexpected error occurred during query: {:?}" , error) ;
139111 }
140112 } ) ;
141113 }
142-
143- #[ async_backtrace:: framed]
144- async fn reject_session ( stream : TcpStream , error : ErrorCode ) {
145- let ( kind, message) = match error. code ( ) {
146- 41 => ( ErrorKind :: ER_TOO_MANY_USER_CONNECTIONS , error. message ( ) ) ,
147- _ => ( ErrorKind :: ER_INTERNAL_ERROR , error. message ( ) ) ,
148- } ;
149-
150- if let Err ( error) = RejectConnection :: reject_mysql_connection ( stream, kind, message) . await {
151- error ! (
152- "Unexpected error occurred during reject connection: {:?}" ,
153- error
154- ) ;
155- }
156- }
157114}
158115
159116#[ async_trait:: async_trait]
0 commit comments