Skip to content

Commit 611cead

Browse files
committed
Add support for multirow
1 parent 62e4f31 commit 611cead

File tree

3 files changed

+123
-2
lines changed

3 files changed

+123
-2
lines changed

src/protocol.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,40 @@ impl RowState {
103103
}
104104
}
105105

106+
#[derive(Debug, PartialEq)]
107+
struct MultiRowState {
108+
c_row: Option<RowState>,
109+
rows: Vec<Row>,
110+
md_state: u8,
111+
md1_target: u64,
112+
md2_col_cnt: u64,
113+
}
114+
115+
impl Default for MultiRowState {
116+
fn default() -> Self {
117+
Self::new(None, vec![], 0, 0, 0)
118+
}
119+
}
120+
121+
impl MultiRowState {
122+
fn new(c_row: Option<RowState>, rows: Vec<Row>, md_s: u8, md_cnt: u64, md_target: u64) -> Self {
123+
Self {
124+
c_row,
125+
rows,
126+
md_state: md_s,
127+
md1_target: md_target,
128+
md2_col_cnt: md_cnt,
129+
}
130+
}
131+
}
132+
106133
#[derive(Debug, PartialEq)]
107134
enum ResponseState {
108135
Initial,
109136
PValue(PendingValue),
110137
PError,
111138
PRow(RowState),
139+
PMultiRow(MultiRowState),
112140
}
113141

114142
#[derive(Debug, PartialEq)]
@@ -147,6 +175,7 @@ impl<'a> Decoder<'a> {
147175
ResponseState::PError => self.resume_error(),
148176
ResponseState::PValue(v) => self.resume_value(v),
149177
ResponseState::PRow(r) => self.resume_row(r),
178+
ResponseState::PMultiRow(mr) => self.resume_rows(mr),
150179
}
151180
}
152181
pub fn position(&self) -> usize {
@@ -159,6 +188,7 @@ impl<'a> Decoder<'a> {
159188
0x10 => self.resume_error(),
160189
0x11 => self.resume_row(RowState::new(ValueStateMeta::zero(), vec![], None)),
161190
0x12 => return DecodeState::Completed(Response::Empty),
191+
0x13 => self.resume_rows(MultiRowState::default()),
162192
code => match self.start_decode(true, code, vec![], None) {
163193
Ok(ValueDecodeStateAny::Decoded(v)) => DecodeState::Completed(Response::Value(v)),
164194
Ok(ValueDecodeStateAny::Pending(pv)) => {
@@ -198,6 +228,9 @@ impl<'a> Decoder<'a> {
198228
Err(e) => return DecodeState::Error(e),
199229
}
200230
}
231+
self._decode_row_core(row_state)
232+
}
233+
fn _decode_row_core(&mut self, mut row_state: RowState) -> DecodeState {
201234
while row_state.row.len() as u64 != row_state.meta.md1 {
202235
let r = match row_state.tmp.take() {
203236
None => {
@@ -228,6 +261,55 @@ impl<'a> Decoder<'a> {
228261
}
229262
DecodeState::Completed(Response::Row(Row::new(row_state.row)))
230263
}
264+
fn resume_rows(&mut self, mut multirow: MultiRowState) -> DecodeState {
265+
if multirow.md_state == 0 {
266+
match self.__resume_decode(multirow.md1_target, ValueStateMeta::zero()) {
267+
Ok(ValueDecodeStateAny::Pending(ValueState { v, .. })) => {
268+
multirow.md1_target = v.u64();
269+
return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow)));
270+
}
271+
Ok(ValueDecodeStateAny::Decoded(v)) => {
272+
multirow.md1_target = v.u64();
273+
multirow.md_state += 1;
274+
}
275+
Err(e) => return DecodeState::Error(e),
276+
}
277+
}
278+
if multirow.md_state == 1 {
279+
match self.__resume_decode(multirow.md2_col_cnt, ValueStateMeta::zero()) {
280+
Ok(ValueDecodeStateAny::Pending(ValueState { v, .. })) => {
281+
multirow.md2_col_cnt = v.u64();
282+
return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow)));
283+
}
284+
Ok(ValueDecodeStateAny::Decoded(v)) => {
285+
multirow.md2_col_cnt = v.u64();
286+
multirow.md_state += 1;
287+
}
288+
Err(e) => return DecodeState::Error(e),
289+
}
290+
}
291+
while multirow.rows.len() as u64 != multirow.md1_target {
292+
let ret = match multirow.c_row.take() {
293+
Some(r) => self._decode_row_core(r),
294+
None => self._decode_row_core(RowState::new(
295+
ValueStateMeta::new(0, multirow.md2_col_cnt, true),
296+
vec![],
297+
None,
298+
)),
299+
};
300+
match ret {
301+
DecodeState::Completed(Response::Row(r)) => multirow.rows.push(r),
302+
DecodeState::Completed(_) => unreachable!(),
303+
e @ DecodeState::Error(_) => return e,
304+
DecodeState::ChangeState(RState(ResponseState::PRow(pr))) => {
305+
multirow.c_row = Some(pr);
306+
return DecodeState::ChangeState(RState(ResponseState::PMultiRow(multirow)));
307+
}
308+
DecodeState::ChangeState(_) => unreachable!(),
309+
}
310+
}
311+
DecodeState::Completed(Response::Rows(multirow.rows))
312+
}
231313
}
232314

233315
impl<'a> Decoder<'a> {
@@ -634,3 +716,34 @@ fn t_row() {
634716
])))
635717
);
636718
}
719+
720+
#[test]
721+
fn t_mrow() {
722+
let mut decoder = Decoder::new(b"\x133\n5\n\x00\x01\x01\x0D5\nsayan\x0220\n\x0E0\n\x00\x01\x01\x0D5\nelana\x0221\n\x0E0\n\x00\x01\x01\x0D5\nemily\x0222\n\x0E0\n", 0);
723+
assert_eq!(
724+
decoder.validate_response(RState::default()),
725+
DecodeState::Completed(Response::Rows(vec![
726+
Row::new(vec![
727+
Value::Null,
728+
Value::Bool(true),
729+
Value::String("sayan".into()),
730+
Value::UInt8(20),
731+
Value::List(vec![])
732+
]),
733+
Row::new(vec![
734+
Value::Null,
735+
Value::Bool(true),
736+
Value::String("elana".into()),
737+
Value::UInt8(21),
738+
Value::List(vec![])
739+
]),
740+
Row::new(vec![
741+
Value::Null,
742+
Value::Bool(true),
743+
Value::String("emily".into()),
744+
Value::UInt8(22),
745+
Value::List(vec![])
746+
])
747+
]))
748+
);
749+
}

src/query.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ where
129129
}
130130
}
131131
}
132+
pub struct Null;
133+
impl SQParam for Null {
134+
fn append_param(self, buf: &mut Vec<u8>) {
135+
buf.push(0);
136+
}
137+
}
132138
// bool
133139
impl SQParam for bool {
134140
fn append_param(self, buf: &mut Vec<u8>) {

src/response.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ pub enum Response {
6767
Value(Value),
6868
/// The server returned a row
6969
Row(Row),
70+
/// A list of rows
71+
Rows(Vec<Row>),
7072
/// The server returned an error code
7173
Error(u16),
7274
}
@@ -97,7 +99,7 @@ impl<V: FromValue> FromResponse for V {
9799
fn from_response(resp: Response) -> ClientResult<Self> {
98100
match resp {
99101
Response::Value(v) => V::from_value(v),
100-
Response::Row(_) | Response::Empty => {
102+
Response::Row(_) | Response::Empty | Response::Rows(_) => {
101103
Err(Error::ParseError(ParseError::ResponseMismatch))
102104
}
103105
Response::Error(e) => Err(Error::ServerError(e)),
@@ -144,7 +146,7 @@ macro_rules! from_response_row {
144146
fn from_response(resp: Response) -> ClientResult<Self> {
145147
let row = match resp {
146148
Response::Row(r) => r.into_values(),
147-
Response::Empty | Response::Value(_) => return Err(Error::ParseError(ParseError::ResponseMismatch)),
149+
Response::Empty | Response::Value(_) | Response::Rows(_) => return Err(Error::ParseError(ParseError::ResponseMismatch)),
148150
Response::Error(e) => return Err(Error::ServerError(e)),
149151
};
150152
if row.len() != $size {

0 commit comments

Comments
 (0)