Skip to content

Commit 1ff0423

Browse files
committed
Merge branch 'master' of https://github.com/datafuse-extras/msql-srv into bump-deps
2 parents 6effef6 + 077fbca commit 1ff0423

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed

src/commands.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ use crate::myc::constants::{CapabilityFlags, Command as CommandByte};
22

33
#[derive(Debug)]
44
pub struct ClientHandshake {
5+
#[allow(dead_code)]
56
maxps: u32,
67
pub(crate) capabilities: CapabilityFlags,
8+
#[allow(dead_code)]
79
pub(crate) collation: u16,
10+
#[allow(dead_code)]
811
pub(crate) db: Option<Vec<u8>>,
912
pub(crate) username: Vec<u8>,
1013
pub(crate) auth_response: Vec<u8>,

src/lib.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,17 @@ pub trait MysqlShim<W: Write> {
256256
}
257257
}
258258

259+
/// The options which passed to MysqlIntermediary struct
260+
#[derive(Debug, Clone, PartialEq, Eq, Default)]
261+
pub struct MysqlIntermediaryOptions {
262+
process_use_statement_on_query: bool,
263+
}
264+
259265
/// A server that speaks the MySQL/MariaDB protocol, and can delegate client commands to a backend
260266
/// that implements [`MysqlShim`](trait.MysqlShim.html).
261267
pub struct MysqlIntermediary<B, R: Read, W: Write> {
262268
pub(crate) client_capabilities: CapabilityFlags,
269+
process_use_statement_on_query: bool,
263270
shim: B,
264271
reader: packet::PacketReader<R>,
265272
writer: packet::PacketWriter<W>,
@@ -273,6 +280,17 @@ impl<B: MysqlShim<net::TcpStream>> MysqlIntermediary<B, net::TcpStream, net::Tcp
273280
let w = stream.try_clone()?;
274281
MysqlIntermediary::run_on(shim, stream, w)
275282
}
283+
284+
/// Create a new server over a TCP stream and process client commands until the client
285+
/// disconnects or an error occurs. See also
286+
pub fn run_on_tcp_with_options(
287+
shim: B,
288+
stream: net::TcpStream,
289+
opts: &MysqlIntermediaryOptions,
290+
) -> Result<(), B::Error> {
291+
let w = stream.try_clone()?;
292+
MysqlIntermediary::run_with_options(shim, stream, w, opts)
293+
}
276294
}
277295

278296
impl<B: MysqlShim<S>, S: Read + Write + Clone> MysqlIntermediary<B, S, S> {
@@ -282,6 +300,16 @@ impl<B: MysqlShim<S>, S: Read + Write + Clone> MysqlIntermediary<B, S, S> {
282300
pub fn run_on_stream(shim: B, stream: S) -> Result<(), B::Error> {
283301
MysqlIntermediary::run_on(shim, stream.clone(), stream)
284302
}
303+
304+
/// Create a new server over a two-way stream and process client commands until the client
305+
/// disconnects or an error ocurrs.
306+
pub fn run_on_stream_with_options(
307+
shim: B,
308+
stream: S,
309+
opts: &MysqlIntermediaryOptions,
310+
) -> Result<(), B::Error> {
311+
MysqlIntermediary::run_with_options(shim, stream.clone(), stream, opts)
312+
}
285313
}
286314

287315
#[derive(Default)]
@@ -296,10 +324,22 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
296324
/// Create a new server over two one-way channels and process client commands until the client
297325
/// disconnects or an error occurs.
298326
pub fn run_on(shim: B, reader: R, writer: W) -> Result<(), B::Error> {
327+
Self::run_with_options(shim, reader, writer, &Default::default())
328+
}
329+
330+
/// Create a new server over two one-way channels and process client commands until the client
331+
/// disconnects or an error occurs, with config options.
332+
pub fn run_with_options(
333+
shim: B,
334+
reader: R,
335+
writer: W,
336+
opts: &MysqlIntermediaryOptions,
337+
) -> Result<(), B::Error> {
299338
let r = packet::PacketReader::new(reader);
300339
let w = packet::PacketWriter::new(writer);
301340
let mut mi = MysqlIntermediary {
302341
client_capabilities: CapabilityFlags::from_bits_truncate(0),
342+
process_use_statement_on_query: opts.process_use_statement_on_query,
303343
shim,
304344
reader: r,
305345
writer: w,
@@ -510,7 +550,9 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
510550
w.finish()?;
511551
}
512552
}
513-
} else if q.starts_with(b"USE ") || q.starts_with(b"use ") {
553+
} else if !self.process_use_statement_on_query
554+
&& (q.starts_with(b"USE ") || q.starts_with(b"use "))
555+
{
514556
let w = InitWriter {
515557
client_capabilities: self.client_capabilities,
516558
writer: &mut self.writer,

src/packet.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ impl<R: Read> PacketReader<R> {
115115
let end = self.bytes.len();
116116
self.bytes.resize(std::cmp::max(4096, end * 2), 0);
117117
let read = {
118-
let mut buf = &mut self.bytes[end..];
119-
self.r.read(&mut buf)?
118+
let buf = &mut self.bytes[end..];
119+
self.r.read(buf)?
120120
};
121121
self.bytes.truncate(end + read);
122122
self.remaining = self.bytes.len();

0 commit comments

Comments
 (0)