Skip to content

Commit 0b3b0ff

Browse files
authored
Add blocking client (#77)
1 parent c3bc304 commit 0b3b0ff

File tree

2 files changed

+265
-0
lines changed

2 files changed

+265
-0
lines changed

src/blocking_client.rs

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
//! Minimal blocking Redis client implementation
2+
//!
3+
//! Provides a blocking connect and methods for issuing the supported commands.
4+
5+
use bytes::Bytes;
6+
use std::time::Duration;
7+
use tokio::net::ToSocketAddrs;
8+
use tokio::runtime::Runtime;
9+
10+
pub use crate::client::Message;
11+
12+
/// Established connection with a Redis server.
13+
///
14+
/// Backed by a single `TcpStream`, `BlockingClient` provides basic network
15+
/// client functionality (no pooling, retrying, ...). Connections are
16+
/// established using the [`connect`](fn@connect) function.
17+
///
18+
/// Requests are issued using the various methods of `Client`.
19+
pub struct BlockingClient {
20+
/// The asynchronous `Client`.
21+
inner: crate::client::Client,
22+
23+
/// A `current_thread` runtime for executing operations on the asynchronous
24+
/// client in a blocking manner.
25+
rt: Runtime,
26+
}
27+
28+
/// A client that has entered pub/sub mode.
29+
///
30+
/// Once clients subscribe to a channel, they may only perform pub/sub related
31+
/// commands. The `BlockingClient` type is transitioned to a
32+
/// `BlockingSubscriber` type in order to prevent non-pub/sub methods from being
33+
/// called.
34+
pub struct BlockingSubscriber {
35+
/// The asynchronous `Subscriber`.
36+
inner: crate::client::Subscriber,
37+
38+
/// A `current_thread` runtime for executing operations on the asynchronous
39+
/// `Subscriber` in a blocking manner.
40+
rt: Runtime,
41+
}
42+
43+
/// The iterator returned by `Subscriber::into_iter`.
44+
struct SubscriberIterator {
45+
/// The asynchronous `Subscriber`.
46+
inner: crate::client::Subscriber,
47+
48+
/// A `current_thread` runtime for executing operations on the asynchronous
49+
/// `Subscriber` in a blocking manner.
50+
rt: Runtime,
51+
}
52+
53+
/// Establish a connection with the Redis server located at `addr`.
54+
///
55+
/// `addr` may be any type that can be asynchronously converted to a
56+
/// `SocketAddr`. This includes `SocketAddr` and strings. The `ToSocketAddrs`
57+
/// trait is the Tokio version and not the `std` version.
58+
///
59+
/// # Examples
60+
///
61+
/// ```no_run
62+
/// use mini_redis::blocking_client;
63+
///
64+
/// fn main() {
65+
/// let client = match blocking_client::connect("localhost:6379") {
66+
/// Ok(client) => client,
67+
/// Err(_) => panic!("failed to establish connection"),
68+
/// };
69+
/// # drop(client);
70+
/// }
71+
/// ```
72+
pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> {
73+
let rt = tokio::runtime::Builder::new_current_thread()
74+
.enable_all()
75+
.build()?;
76+
77+
let inner = rt.block_on(crate::client::connect(addr))?;
78+
79+
Ok(BlockingClient { inner, rt })
80+
}
81+
82+
impl BlockingClient {
83+
/// Get the value of key.
84+
///
85+
/// If the key does not exist the special value `None` is returned.
86+
///
87+
/// # Examples
88+
///
89+
/// Demonstrates basic usage.
90+
///
91+
/// ```no_run
92+
/// use mini_redis::blocking_client;
93+
///
94+
/// fn main() {
95+
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
96+
///
97+
/// let val = client.get("foo").unwrap();
98+
/// println!("Got = {:?}", val);
99+
/// }
100+
/// ```
101+
pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> {
102+
self.rt.block_on(self.inner.get(key))
103+
}
104+
105+
/// Set `key` to hold the given `value`.
106+
///
107+
/// The `value` is associated with `key` until it is overwritten by the next
108+
/// call to `set` or it is removed.
109+
///
110+
/// If key already holds a value, it is overwritten. Any previous time to
111+
/// live associated with the key is discarded on successful SET operation.
112+
///
113+
/// # Examples
114+
///
115+
/// Demonstrates basic usage.
116+
///
117+
/// ```no_run
118+
/// use mini_redis::blocking_client;
119+
///
120+
/// fn main() {
121+
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
122+
///
123+
/// client.set("foo", "bar".into()).unwrap();
124+
///
125+
/// // Getting the value immediately works
126+
/// let val = client.get("foo").unwrap().unwrap();
127+
/// assert_eq!(val, "bar");
128+
/// }
129+
/// ```
130+
pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> {
131+
self.rt.block_on(self.inner.set(key, value))
132+
}
133+
134+
/// Set `key` to hold the given `value`. The value expires after `expiration`
135+
///
136+
/// The `value` is associated with `key` until one of the following:
137+
/// - it expires.
138+
/// - it is overwritten by the next call to `set`.
139+
/// - it is removed.
140+
///
141+
/// If key already holds a value, it is overwritten. Any previous time to
142+
/// live associated with the key is discarded on a successful SET operation.
143+
///
144+
/// # Examples
145+
///
146+
/// Demonstrates basic usage. This example is not **guaranteed** to always
147+
/// work as it relies on time based logic and assumes the client and server
148+
/// stay relatively synchronized in time. The real world tends to not be so
149+
/// favorable.
150+
///
151+
/// ```no_run
152+
/// use mini_redis::blocking_client;
153+
/// use std::thread;
154+
/// use std::time::Duration;
155+
///
156+
/// fn main() {
157+
/// let ttl = Duration::from_millis(500);
158+
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
159+
///
160+
/// client.set_expires("foo", "bar".into(), ttl).unwrap();
161+
///
162+
/// // Getting the value immediately works
163+
/// let val = client.get("foo").unwrap().unwrap();
164+
/// assert_eq!(val, "bar");
165+
///
166+
/// // Wait for the TTL to expire
167+
/// thread::sleep(ttl);
168+
///
169+
/// let val = client.get("foo").unwrap();
170+
/// assert!(val.is_some());
171+
/// }
172+
/// ```
173+
pub fn set_expires(
174+
&mut self,
175+
key: &str,
176+
value: Bytes,
177+
expiration: Duration,
178+
) -> crate::Result<()> {
179+
self.rt
180+
.block_on(self.inner.set_expires(key, value, expiration))
181+
}
182+
183+
/// Posts `message` to the given `channel`.
184+
///
185+
/// Returns the number of subscribers currently listening on the channel.
186+
/// There is no guarantee that these subscribers receive the message as they
187+
/// may disconnect at any time.
188+
///
189+
/// # Examples
190+
///
191+
/// Demonstrates basic usage.
192+
///
193+
/// ```no_run
194+
/// use mini_redis::blocking_client;
195+
///
196+
/// fn main() {
197+
/// let mut client = blocking_client::connect("localhost:6379").unwrap();
198+
///
199+
/// let val = client.publish("foo", "bar".into()).unwrap();
200+
/// println!("Got = {:?}", val);
201+
/// }
202+
/// ```
203+
pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> {
204+
self.rt.block_on(self.inner.publish(channel, message))
205+
}
206+
207+
/// Subscribes the client to the specified channels.
208+
///
209+
/// Once a client issues a subscribe command, it may no longer issue any
210+
/// non-pub/sub commands. The function consumes `self` and returns a
211+
/// `BlockingSubscriber`.
212+
///
213+
/// The `BlockingSubscriber` value is used to receive messages as well as
214+
/// manage the list of channels the client is subscribed to.
215+
pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> {
216+
let subscriber = self.rt.block_on(self.inner.subscribe(channels))?;
217+
Ok(BlockingSubscriber {
218+
inner: subscriber,
219+
rt: self.rt,
220+
})
221+
}
222+
}
223+
224+
impl BlockingSubscriber {
225+
/// Returns the set of channels currently subscribed to.
226+
pub fn get_subscribed(&self) -> &[String] {
227+
self.inner.get_subscribed()
228+
}
229+
230+
/// Receive the next message published on a subscribed channel, waiting if
231+
/// necessary.
232+
///
233+
/// `None` indicates the subscription has been terminated.
234+
pub fn next_message(&mut self) -> crate::Result<Option<Message>> {
235+
self.rt.block_on(self.inner.next_message())
236+
}
237+
238+
/// Convert the subscriber into an `Iterator` yielding new messages published
239+
/// on subscribed channels.
240+
pub fn into_iter(self) -> impl Iterator<Item = crate::Result<Message>> {
241+
SubscriberIterator {
242+
inner: self.inner,
243+
rt: self.rt,
244+
}
245+
}
246+
247+
/// Subscribe to a list of new channels
248+
pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> {
249+
self.rt.block_on(self.inner.subscribe(channels))
250+
}
251+
252+
/// Unsubscribe to a list of new channels
253+
pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> {
254+
self.rt.block_on(self.inner.unsubscribe(channels))
255+
}
256+
}
257+
258+
impl Iterator for SubscriberIterator {
259+
type Item = crate::Result<Message>;
260+
261+
fn next(&mut self) -> Option<crate::Result<Message>> {
262+
self.rt.block_on(self.inner.next_message()).transpose()
263+
}
264+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
//! intermediate representation between a "command" and the byte
2626
//! representation.
2727
28+
pub mod blocking_client;
2829
pub mod client;
2930

3031
pub mod cmd;

0 commit comments

Comments
 (0)