Skip to content

Commit ab376b0

Browse files
authored
Merge pull request jonhoo#8 from datafuse-extras/stats
Query progress stats
2 parents 74473a9 + 60e369b commit ab376b0

File tree

9 files changed

+207
-78
lines changed

9 files changed

+207
-78
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl<W: io::Write> MysqlShim<W> for Backend {
3838
_: ParamParser,
3939
results: QueryResultWriter<W>,
4040
) -> io::Result<()> {
41-
results.completed(0, 0)
41+
results.completed(OkResponse::default())
4242
}
4343
fn on_close(&mut self, _: u32) {}
4444

examples/serve_auth.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ extern crate mysql_common as myc;
1111

1212
use msql_srv::*;
1313
use std::io;
14+
use std::iter;
1415
use std::net;
1516
use std::thread;
1617

@@ -27,13 +28,26 @@ impl<W: io::Write> MysqlShim<W> for Backend {
2728
_: msql_srv::ParamParser,
2829
results: QueryResultWriter<W>,
2930
) -> io::Result<()> {
30-
results.completed(0, 0)
31+
let resp = OkResponse::default();
32+
results.completed(resp)
3133
}
3234
fn on_close(&mut self, _: u32) {}
3335

3436
fn on_query(&mut self, sql: &str, results: QueryResultWriter<W>) -> io::Result<()> {
3537
println!("execute sql {:?}", sql);
36-
results.start(&[])?.finish()
38+
39+
let cols = &[Column {
40+
table: String::new(),
41+
column: "abc".to_string(),
42+
coltype: myc::constants::ColumnType::MYSQL_TYPE_LONG,
43+
colflags: myc::constants::ColumnFlags::UNSIGNED_FLAG,
44+
}];
45+
46+
let mut w = results.start(cols)?;
47+
w.write_row(iter::once(67108864u32))?;
48+
w.write_row(iter::once(167108864u32))?;
49+
50+
w.finish_with_info("ExtraInfo")
3751
}
3852

3953
/// authenticate method for the specified plugin

examples/serve_one.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ impl<W: io::Write> MysqlShim<W> for Backend {
2626
_: msql_srv::ParamParser,
2727
results: QueryResultWriter<W>,
2828
) -> io::Result<()> {
29-
results.completed(0, 0)
29+
results.completed(OkResponse::default())
3030
}
3131
fn on_close(&mut self, _: u32) {}
3232

src/commands.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ use crate::myc::constants::{CapabilityFlags, Command as CommandByte};
22

33
#[derive(Debug)]
44
pub struct ClientHandshake {
5-
capabilities: CapabilityFlags,
65
maxps: u32,
7-
collation: u16,
6+
pub(crate) capabilities: CapabilityFlags,
7+
pub(crate) collation: u16,
88
pub(crate) db: Option<Vec<u8>>,
99
pub(crate) username: Vec<u8>,
1010
pub(crate) auth_response: Vec<u8>,
@@ -66,7 +66,7 @@ pub fn client_handshake(i: &[u8]) -> nom::IResult<&[u8], ClientHandshake> {
6666
Ok((
6767
i,
6868
ClientHandshake {
69-
capabilities: CapabilityFlags::from_bits_truncate(cap),
69+
capabilities,
7070
maxps,
7171
collation: u16::from(collation[0]),
7272
username: username.to_vec(),
@@ -99,7 +99,7 @@ pub fn client_handshake(i: &[u8]) -> nom::IResult<&[u8], ClientHandshake> {
9999
Ok((
100100
i,
101101
ClientHandshake {
102-
capabilities: CapabilityFlags::from_bits_truncate(cap as u32),
102+
capabilities,
103103
maxps,
104104
collation: 0,
105105
username: username.to_vec(),

src/lib.rs

Lines changed: 72 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
//! _: ParamParser,
3535
//! results: QueryResultWriter<W>,
3636
//! ) -> io::Result<()> {
37-
//! results.completed(0, 0)
37+
//! results.completed(OkResponse::default())
3838
//! }
3939
//! fn on_close(&mut self, _: u32) {}
4040
//!
@@ -135,6 +135,25 @@ pub struct Column {
135135
pub colflags: ColumnFlags,
136136
}
137137

138+
/// QueryStatusInfo represents the status of a query.
139+
#[derive(Debug, Clone, PartialEq, Eq, Default)]
140+
pub struct OkResponse {
141+
/// header
142+
pub header: u8,
143+
/// affected rows in update/insert
144+
pub affected_rows: u64,
145+
/// insert_id in update/insert
146+
pub last_insert_id: u64,
147+
/// StatusFlags associated with this query
148+
pub status_flags: StatusFlags,
149+
/// Warnings
150+
pub warnings: u16,
151+
/// Extra infomation
152+
pub info: String,
153+
/// session state change information
154+
pub session_state_info: String,
155+
}
156+
138157
pub use crate::errorcodes::ErrorKind;
139158
pub use crate::params::{ParamParser, ParamValue, Params};
140159
pub use crate::resultset::{InitWriter, QueryResultWriter, RowWriter, StatementMetaWriter};
@@ -177,7 +196,7 @@ pub trait MysqlShim<W: Write> {
177196
for i in 0..SCRAMBLE_SIZE {
178197
scramble[i] = bs[i];
179198
if scramble[i] == b'\0' || scramble[i] == b'$' {
180-
scramble[i] = scramble[i] + 1;
199+
scramble[i] += 1;
181200
}
182201
}
183202
scramble
@@ -240,6 +259,7 @@ pub trait MysqlShim<W: Write> {
240259
/// A server that speaks the MySQL/MariaDB protocol, and can delegate client commands to a backend
241260
/// that implements [`MysqlShim`](trait.MysqlShim.html).
242261
pub struct MysqlIntermediary<B, R: Read, W: Write> {
262+
pub(crate) client_capabilities: CapabilityFlags,
243263
shim: B,
244264
reader: packet::PacketReader<R>,
245265
writer: packet::PacketWriter<W>,
@@ -279,6 +299,7 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
279299
let r = packet::PacketReader::new(reader);
280300
let w = packet::PacketWriter::new(writer);
281301
let mut mi = MysqlIntermediary {
302+
client_capabilities: CapabilityFlags::from_bits_truncate(0),
282303
shim,
283304
reader: r,
284305
writer: w,
@@ -305,6 +326,7 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
305326
| CapabilityFlags::CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA
306327
| CapabilityFlags::CLIENT_CONNECT_WITH_DB
307328
| CapabilityFlags::CLIENT_RESERVED
329+
| CapabilityFlags::CLIENT_DEPRECATE_EOF
308330
// | CapabilityFlags::CLIENT_SSL
309331
)
310332
.bits();
@@ -378,6 +400,7 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
378400
})?
379401
.1;
380402

403+
self.client_capabilities = handshake.capabilities;
381404
let mut auth_response = handshake.auth_response.clone();
382405
let auth_plugin_expect = self.shim.auth_plugin_for_username(&handshake.username);
383406

@@ -424,11 +447,15 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
424447
&mut self.writer,
425448
)?;
426449
self.writer.flush()?;
427-
return Err(io::Error::new(io::ErrorKind::PermissionDenied, err_msg))?;
450+
return Err(io::Error::new(io::ErrorKind::PermissionDenied, err_msg).into());
428451
}
429452
}
430453

431-
writers::write_ok_packet(&mut self.writer, 0, 0, StatusFlags::empty())?;
454+
writers::write_ok_packet(
455+
&mut self.writer,
456+
self.client_capabilities,
457+
OkResponse::default(),
458+
)?;
432459
self.writer.flush()?;
433460

434461
Ok(())
@@ -441,37 +468,51 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
441468
while let Some((seq, packet)) = self.reader.next()? {
442469
self.writer.set_seq(seq + 1);
443470
let cmd = commands::parse(&packet).unwrap().1;
471+
444472
match cmd {
445473
Command::Query(q) => {
446474
if q.starts_with(b"SELECT @@") || q.starts_with(b"select @@") {
447-
let w = QueryResultWriter::new(&mut self.writer, false);
475+
let w = QueryResultWriter::new(
476+
&mut self.writer,
477+
false,
478+
self.client_capabilities,
479+
);
448480
let var = &q[b"SELECT @@".len()..];
481+
let var_with_at = &q[b"SELECT ".len()..];
482+
let cols = &[Column {
483+
table: String::new(),
484+
column: String::from_utf8_lossy(var_with_at).to_string(),
485+
coltype: myc::constants::ColumnType::MYSQL_TYPE_LONG,
486+
colflags: myc::constants::ColumnFlags::UNSIGNED_FLAG,
487+
}];
488+
449489
match var {
450490
b"max_allowed_packet" => {
451-
let cols = &[Column {
452-
table: String::new(),
453-
column: "@@max_allowed_packet".to_owned(),
454-
coltype: myc::constants::ColumnType::MYSQL_TYPE_LONG,
455-
colflags: myc::constants::ColumnFlags::UNSIGNED_FLAG,
456-
}];
457491
let mut w = w.start(cols)?;
458492
w.write_row(iter::once(67108864u32))?;
459493
w.finish()?;
460494
}
461495
_ => {
462-
w.completed(0, 0)?;
496+
let mut w = w.start(cols)?;
497+
w.write_row(iter::once(0))?;
498+
w.finish()?;
463499
}
464500
}
465501
} else if q.starts_with(b"USE ") || q.starts_with(b"use ") {
466502
let w = InitWriter {
503+
client_capabilities: self.client_capabilities,
467504
writer: &mut self.writer,
468505
};
469506
let schema = ::std::str::from_utf8(&q[b"USE ".len()..])
470507
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
471508
let schema = schema.trim().trim_end_matches(';').trim_matches('`');
472509
self.shim.on_init(schema, w)?;
473510
} else {
474-
let w = QueryResultWriter::new(&mut self.writer, false);
511+
let w = QueryResultWriter::new(
512+
&mut self.writer,
513+
false,
514+
self.client_capabilities,
515+
);
475516
self.shim.on_query(
476517
::std::str::from_utf8(q)
477518
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
@@ -483,6 +524,7 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
483524
let w = StatementMetaWriter {
484525
writer: &mut self.writer,
485526
stmts: &mut stmts,
527+
client_capabilities: self.client_capabilities,
486528
};
487529

488530
self.shim.on_prepare(
@@ -500,7 +542,11 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
500542
})?;
501543
{
502544
let params = params::ParamParser::new(params, state);
503-
let w = QueryResultWriter::new(&mut self.writer, true);
545+
let w = QueryResultWriter::new(
546+
&mut self.writer,
547+
true,
548+
self.client_capabilities,
549+
);
504550
self.shim.on_execute(stmt, params, w)?;
505551
}
506552
state.long_data.clear();
@@ -531,10 +577,16 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
531577
coltype: myc::constants::ColumnType::MYSQL_TYPE_SHORT,
532578
colflags: myc::constants::ColumnFlags::UNSIGNED_FLAG,
533579
}];
534-
writers::write_column_definitions(cols, &mut self.writer, true, true)?;
580+
writers::write_column_definitions(
581+
cols,
582+
&mut self.writer,
583+
true,
584+
self.client_capabilities,
585+
)?;
535586
}
536587
Command::Init(schema) => {
537588
let w = InitWriter {
589+
client_capabilities: self.client_capabilities,
538590
writer: &mut self.writer,
539591
};
540592
self.shim.on_init(
@@ -544,7 +596,11 @@ impl<B: MysqlShim<W>, R: Read, W: Write> MysqlIntermediary<B, R, W> {
544596
)?;
545597
}
546598
Command::Ping => {
547-
writers::write_ok_packet(&mut self.writer, 0, 0, StatusFlags::empty())?;
599+
writers::write_ok_packet(
600+
&mut self.writer,
601+
self.client_capabilities,
602+
OkResponse::default(),
603+
)?;
548604
}
549605
Command::Quit => {
550606
break;

0 commit comments

Comments
 (0)