Skip to content

Commit 9a34fd8

Browse files
authored
Add pipeline support (#10)
* Add pipeline object * Add methods to enable sync/async pipeline exec * Add changelog entry
1 parent 2f67d7c commit 9a34fd8

File tree

5 files changed

+165
-58
lines changed

5 files changed

+165
-58
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
All changes in this project will be noted in this file.
44

5+
## Unreleased
6+
7+
### Additions
8+
9+
- Added support for pipelined queries
10+
511
## Version 0.6.1
612

713
> Breaking changes!

src/aio.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
use crate::deserializer::{ParseError, Parser, RawResponse};
2828
use crate::error::Error;
2929
use crate::error::SkyhashError;
30+
use crate::Element;
3031
use crate::IoResult;
32+
use crate::Pipeline;
3133
use crate::Query;
34+
use crate::SkyRawResult;
3235
use crate::SkyResult;
36+
use crate::WriteQueryAsync;
3337
use bytes::{Buf, BytesMut};
3438
use std::io::{Error as IoError, ErrorKind};
3539
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter};
@@ -39,7 +43,7 @@ use tokio::net::TcpStream;
3943
const BUF_CAP: usize = 4096;
4044

4145
macro_rules! impl_async_methods {
42-
($ty:ty) => {
46+
($ty:ty, $inner:ty) => {
4347
impl $ty {
4448
/// This function will write a [`Query`] to the stream and read the response from the
4549
/// server. It will then determine if the returned response is complete or incomplete
@@ -49,8 +53,22 @@ macro_rules! impl_async_methods {
4953
/// ## Panics
5054
/// This method will panic if the [`Query`] supplied is empty (i.e has no arguments)
5155
pub async fn run_simple_query(&mut self, query: &Query) -> SkyResult {
52-
assert!(query.len() != 0, "A `Query` cannot be of zero length!");
53-
query.write_query_to(&mut self.stream).await?;
56+
match self._run_query(query).await? {
57+
RawResponse::SimpleQuery(sq) => Ok(sq),
58+
RawResponse::PipelinedQuery(_) => Err(SkyhashError::InvalidResponse.into()),
59+
}
60+
}
61+
pub async fn run_pipeline(&mut self, pipeline: Pipeline) -> SkyRawResult<Vec<Element>> {
62+
match self._run_query(&pipeline).await? {
63+
RawResponse::PipelinedQuery(pq) => Ok(pq),
64+
RawResponse::SimpleQuery(_) => Err(SkyhashError::InvalidResponse.into()),
65+
}
66+
}
67+
async fn _run_query<Q: WriteQueryAsync<$inner>>(
68+
&mut self,
69+
query: &Q,
70+
) -> SkyRawResult<RawResponse> {
71+
query.write_async(&mut self.stream).await?;
5472
self.stream.flush().await?;
5573
loop {
5674
if 0usize == self.stream.read_buf(&mut self.buffer).await? {
@@ -59,12 +77,7 @@ macro_rules! impl_async_methods {
5977
match self.try_response() {
6078
Ok((query, forward_by)) => {
6179
self.buffer.advance(forward_by);
62-
match query {
63-
RawResponse::SimpleQuery(s) => return Ok(s),
64-
RawResponse::PipelinedQuery(_) => {
65-
unimplemented!("Pipelined queries aren't implemented yet")
66-
}
67-
}
80+
return Ok(query);
6881
}
6982
Err(e) => match e {
7083
ParseError::NotEnough => (),
@@ -119,7 +132,7 @@ cfg_async!(
119132
})
120133
}
121134
}
122-
impl_async_methods!(Connection);
135+
impl_async_methods!(Connection, BufWriter<TcpStream>);
123136
);
124137

125138
cfg_async_ssl_any!(
@@ -148,5 +161,5 @@ cfg_async_ssl_any!(
148161
})
149162
}
150163
}
151-
impl_async_methods!(TlsConnection);
164+
impl_async_methods!(TlsConnection, SslStream<TcpStream>);
152165
);

src/deserializer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ pub enum ParseError {
117117
}
118118

119119
#[derive(Debug, PartialEq)]
120-
/// # Types of Response
120+
/// # Response types
121121
///
122-
/// A simple response carries the data for one action while a complex response carries data for
123-
/// multiple actions
122+
/// A simple response carries the response for a simple query while a pipelined response carries the response
123+
/// for pipelined queries
124124
pub enum RawResponse {
125125
/// A simple query will just hold one element
126126
SimpleQuery(Element),

src/lib.rs

Lines changed: 109 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,74 @@ impl ConnectionBuilder {
336336
}
337337
}
338338

339+
cfg_sync! {
340+
trait WriteQuerySync {
341+
fn write_sync(&self, b: &mut impl std::io::Write) -> IoResult<()>;
342+
}
343+
344+
impl WriteQuerySync for Query {
345+
fn write_sync(&self, stream: &mut impl std::io::Write) -> IoResult<()> {
346+
// Write the metaframe
347+
stream.write_all(b"*1\n")?;
348+
// Add the dataframe
349+
let number_of_items_in_datagroup = self.len().to_string().into_bytes();
350+
stream.write_all(&[b'~'])?;
351+
stream.write_all(&number_of_items_in_datagroup)?;
352+
stream.write_all(&[b'\n'])?;
353+
stream.write_all(self.get_holding_buffer())?;
354+
stream.flush()?;
355+
Ok(())
356+
}
357+
}
358+
359+
impl WriteQuerySync for Pipeline {
360+
fn write_sync(&self, stream: &mut impl std::io::Write) -> IoResult<()> {
361+
let len = self.len.to_string().into_bytes();
362+
stream.write_all(b"*")?;
363+
stream.write_all(&len)?;
364+
stream.write_all(b"\n")?;
365+
stream.write_all(&self.chain)
366+
}
367+
}
368+
}
369+
370+
cfg_async! {
371+
use core::pin::Pin;
372+
use core::future::Future;
373+
use tokio::io::AsyncWrite;
374+
type FutureRet<'s> = Pin<Box<dyn Future<Output = IoResult<()>> + Send + Sync + 's>>;
375+
trait WriteQueryAsync<T: AsyncWrite + Unpin + Send + Sync>: Unpin + Sync + Send {
376+
fn write_async<'s>(&'s self, b: &'s mut T) -> FutureRet<'s>;
377+
}
378+
impl<T: AsyncWrite + Unpin + Send + Sync> WriteQueryAsync<T> for Query {
379+
fn write_async<'s>(&'s self, stream: &'s mut T) -> FutureRet {
380+
Box::pin(async move {
381+
// Write the metaframe
382+
stream.write_all(b"*1\n").await?;
383+
// Add the dataframe
384+
let number_of_items_in_datagroup = self.len().to_string().into_bytes();
385+
stream.write_all(&[b'~']).await?;
386+
stream.write_all(&number_of_items_in_datagroup).await?;
387+
stream.write_all(&[b'\n']).await?;
388+
stream.write_all(self.get_holding_buffer()).await?;
389+
stream.flush().await?;
390+
Ok(())
391+
})
392+
}
393+
}
394+
impl<T: AsyncWrite + Unpin + Send + Sync> WriteQueryAsync<T> for Pipeline {
395+
fn write_async<'s>(&'s self, stream: &'s mut T) -> FutureRet {
396+
Box::pin(async move {
397+
let len = self.len.to_string().into_bytes();
398+
stream.write_all(b"*").await?;
399+
stream.write_all(&len).await?;
400+
stream.write_all(b"\n").await?;
401+
stream.write_all(&self.chain).await
402+
})
403+
}
404+
}
405+
}
406+
339407
#[macro_export]
340408
/// A macro that can be used to easily create queries with _almost_ variadic properties.
341409
/// Where you'd normally create queries like this:
@@ -472,42 +540,14 @@ impl Query {
472540
fn get_holding_buffer(&self) -> &[u8] {
473541
&self.data
474542
}
475-
cfg_async!(
476-
/// Write a query to a given stream
477-
async fn write_query_to<T>(&self, stream: &mut T) -> IoResult<()>
478-
where
479-
T: tokio::io::AsyncWrite + Unpin,
480-
{
481-
// Write the metaframe
482-
stream.write_all(b"*1\n").await?;
483-
// Add the dataframe
484-
let number_of_items_in_datagroup = self.len().to_string().into_bytes();
485-
stream.write_all(&[b'~']).await?;
486-
stream.write_all(&number_of_items_in_datagroup).await?;
487-
stream.write_all(&[b'\n']).await?;
488-
stream.write_all(self.get_holding_buffer()).await?;
489-
stream.flush().await?;
490-
Ok(())
491-
}
492-
);
493-
cfg_sync!(
494-
/// Write a query to a given stream
495-
fn write_query_to_sync<T>(&self, stream: &mut T) -> IoResult<()>
496-
where
497-
T: std::io::Write,
498-
{
499-
// Write the metaframe
500-
stream.write_all(b"*1\n")?;
501-
// Add the dataframe
502-
let number_of_items_in_datagroup = self.len().to_string().into_bytes();
503-
stream.write_all(&[b'~'])?;
504-
stream.write_all(&number_of_items_in_datagroup)?;
505-
stream.write_all(&[b'\n'])?;
506-
stream.write_all(self.get_holding_buffer())?;
507-
stream.flush()?;
508-
Ok(())
509-
}
510-
);
543+
fn write_query_to_writable(&self, buffer: &mut Vec<u8>) {
544+
// Add the dataframe element
545+
let number_of_items_in_datagroup = self.len().to_string().into_bytes();
546+
buffer.extend([b'~']);
547+
buffer.extend(&number_of_items_in_datagroup);
548+
buffer.extend([b'\n']);
549+
buffer.extend(self.get_holding_buffer());
550+
}
511551
cfg_dbg!(
512552
/// Get the raw bytes of a query
513553
///
@@ -564,3 +604,36 @@ cfg_dbg!(
564604
assert_eq!(ma_query_len, q_len);
565605
}
566606
);
607+
608+
/// # Pipeline
609+
///
610+
/// A pipeline is a way of queing up multiple queries, sending them to the server at once instead of sending them individually, avoiding
611+
/// round-trip-times while also simplifying usage in several places. Responses are returned in the order they are sent
612+
pub struct Pipeline {
613+
len: usize,
614+
chain: Vec<u8>,
615+
}
616+
617+
impl Pipeline {
618+
/// Initializes a new empty pipeline
619+
pub fn new() -> Self {
620+
Self {
621+
len: 0usize,
622+
chain: Vec::new(),
623+
}
624+
}
625+
/// Append a query (builder pattern)
626+
pub fn add(mut self, query: Query) -> Self {
627+
self.len += 1;
628+
query.write_query_to_writable(&mut self.chain);
629+
self
630+
}
631+
/// Append a query by taking reference
632+
pub fn push(&mut self, query: Query) {
633+
self.len += 1;
634+
query.write_query_to_writable(&mut self.chain);
635+
}
636+
pub fn len(&self) -> usize {
637+
self.len
638+
}
639+
}

src/sync.rs

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@
2626
2727
use crate::deserializer::{ParseError, Parser, RawResponse};
2828
use crate::error::SkyhashError;
29+
use crate::Element;
2930
use crate::IoResult;
31+
use crate::Pipeline;
3032
use crate::Query;
33+
use crate::SkyRawResult;
3134
use crate::SkyResult;
35+
use crate::WriteQuerySync;
3236
use std::io::{Error as IoError, ErrorKind, Read, Write};
3337
use std::net::TcpStream;
3438

@@ -41,11 +45,27 @@ macro_rules! impl_sync_methods {
4145
/// for any I/O errors that may occur
4246
///
4347
/// ## Panics
44-
/// This method will panic if the [`Query`] supplied is empty (i.e has no arguments)
48+
/// This method will panic:
49+
/// - if the [`Query`] supplied is empty (i.e has no arguments)
4550
/// This function is a subroutine of `run_query` used to parse the response packet
4651
pub fn run_simple_query(&mut self, query: &Query) -> SkyResult {
4752
assert!(query.len() != 0, "A `Query` cannot be of zero length!");
48-
query.write_query_to_sync(&mut self.stream)?;
53+
match self._run_query(query)? {
54+
RawResponse::SimpleQuery(sq) => Ok(sq),
55+
RawResponse::PipelinedQuery(_) => Err(SkyhashError::InvalidResponse.into()),
56+
}
57+
}
58+
/// This function will write a pipelined query to the stream and read the response from the server,
59+
/// returning errors if they do occur.
60+
pub fn run_pipeline(&mut self, pipeline: Pipeline) -> SkyRawResult<Vec<Element>> {
61+
assert!(pipeline.len() != 0, "A `Pipeline` cannot be empty!");
62+
match self._run_query(&pipeline)? {
63+
RawResponse::PipelinedQuery(pq) => Ok(pq),
64+
RawResponse::SimpleQuery(_) => Err(SkyhashError::InvalidResponse.into()),
65+
}
66+
}
67+
fn _run_query<T: WriteQuerySync>(&mut self, query: &T) -> SkyRawResult<RawResponse> {
68+
query.write_sync(&mut self.stream)?;
4969
self.stream.flush()?;
5070
loop {
5171
let mut buffer = [0u8; 1024];
@@ -59,12 +79,7 @@ macro_rules! impl_sync_methods {
5979
match self.try_response() {
6080
Ok((query, forward_by)) => {
6181
self.buffer.drain(..forward_by);
62-
match query {
63-
RawResponse::SimpleQuery(s) => return Ok(s),
64-
RawResponse::PipelinedQuery(_) => {
65-
unimplemented!("Pipelined queries aren't implemented yet")
66-
}
67-
}
82+
return Ok(query);
6883
}
6984
Err(e) => match e {
7085
ParseError::NotEnough => (),

0 commit comments

Comments
 (0)