Skip to content

Commit ca36bd4

Browse files
authored
Merge pull request #172 from avifenesh/cluster_scan/bunary_support
Add binary support
2 parents 062e727 + 789f324 commit ca36bd4

File tree

3 files changed

+91
-36
lines changed

3 files changed

+91
-36
lines changed

redis/src/cluster_async/mod.rs

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{
3333
cluster_topology::SLOT_SIZE,
3434
cmd,
3535
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
36-
FromRedisValue, InfoDict,
36+
FromRedisValue, InfoDict, ToRedisArgs,
3737
};
3838
#[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))]
3939
use async_std::task::{spawn, JoinHandle};
@@ -139,16 +139,16 @@ where
139139
})
140140
}
141141

142-
// Special handling for `SCAN` command, using cluster_scan
142+
/// Special handling for `SCAN` command, using `cluster_scan`.
143+
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
143144
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
144145
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
145146
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
146147
///
147148
/// # Arguments
148149
///
149150
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
150-
/// for each subsequent iteration use the returned [`ScanStateRC`].
151-
/// * `match_pattern` - An optional match pattern of requested keys.
151+
/// for each subsequent iteration use the returned [`ScanStateRC`].
152152
/// * `count` - An optional count of keys requested,
153153
/// the amount returned can vary and not obligated to return exactly count.
154154
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
@@ -174,7 +174,7 @@ where
174174
/// let mut keys: Vec<String> = vec![];
175175
/// loop {
176176
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
177-
/// connection.cluster_scan(scan_state_rc, None, None, None).await.unwrap();
177+
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
178178
/// scan_state_rc = next_cursor;
179179
/// let mut scan_keys = scan_keys
180180
/// .into_iter()
@@ -191,13 +191,73 @@ where
191191
pub async fn cluster_scan(
192192
&mut self,
193193
scan_state_rc: ScanStateRC,
194-
match_pattern: Option<&str>,
194+
count: Option<usize>,
195+
object_type: Option<ObjectType>,
196+
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
197+
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
198+
self.route_cluster_scan(cluster_scan_args).await
199+
}
200+
201+
/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
202+
/// It is a special case of [`cluster_scan`], with an additional match pattern.
203+
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
204+
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
205+
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
206+
///
207+
/// # Arguments
208+
///
209+
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
210+
/// for each subsequent iteration use the returned [`ScanStateRC`].
211+
/// * `match_pattern` - A match pattern of requested keys.
212+
/// * `count` - An optional count of keys requested,
213+
/// the amount returned can vary and not obligated to return exactly count.
214+
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
215+
///
216+
/// # Returns
217+
///
218+
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
219+
/// structure of returned value:
220+
/// `Ok((ScanStateRC, Vec<Value>))`
221+
///
222+
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
223+
///
224+
/// # Example
225+
/// ```rust,no_run
226+
/// use redis::cluster::ClusterClient;
227+
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
228+
///
229+
/// async fn scan_all_cluster() -> Vec<String> {
230+
/// let nodes = vec!["redis://127.0.0.1/"];
231+
/// let client = ClusterClient::new(nodes).unwrap();
232+
/// let mut connection = client.get_async_connection(None).await.unwrap();
233+
/// let mut scan_state_rc = ScanStateRC::new();
234+
/// let mut keys: Vec<String> = vec![];
235+
/// loop {
236+
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
237+
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap();
238+
/// scan_state_rc = next_cursor;
239+
/// let mut scan_keys = scan_keys
240+
/// .into_iter()
241+
/// .map(|v| from_redis_value(&v).unwrap())
242+
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
243+
/// keys.append(&mut scan_keys);
244+
/// if scan_state_rc.is_finished() {
245+
/// break;
246+
/// }
247+
/// }
248+
/// keys
249+
/// }
250+
/// ```
251+
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
252+
&mut self,
253+
scan_state_rc: ScanStateRC,
254+
match_pattern: K,
195255
count: Option<usize>,
196256
object_type: Option<ObjectType>,
197257
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
198258
let cluster_scan_args = ClusterScanArgs::new(
199259
scan_state_rc,
200-
match_pattern.map(|s| s.to_string()),
260+
Some(match_pattern.to_redis_args().concat()),
201261
count,
202262
object_type,
203263
);

redis/src/commands/cluster_scan.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ const BITS_ARRAY_SIZE: usize = NUM_OF_SLOTS / BITS_PER_U64;
3131
const END_OF_SCAN: u16 = NUM_OF_SLOTS as u16 + 1;
3232
type SlotsBitsArray = [u64; BITS_ARRAY_SIZE];
3333

34-
#[derive(Debug, Clone)]
34+
#[derive(Clone)]
3535
pub(crate) struct ClusterScanArgs {
3636
pub(crate) scan_state_cursor: ScanStateRC,
37-
match_pattern: Option<String>,
37+
match_pattern: Option<Vec<u8>>,
3838
count: Option<usize>,
3939
object_type: Option<ObjectType>,
4040
}
@@ -59,7 +59,7 @@ pub enum ObjectType {
5959
impl ClusterScanArgs {
6060
pub(crate) fn new(
6161
scan_state_cursor: ScanStateRC,
62-
match_pattern: Option<String>,
62+
match_pattern: Option<Vec<u8>>,
6363
count: Option<usize>,
6464
object_type: Option<ObjectType>,
6565
) -> Self {
@@ -487,7 +487,7 @@ where
487487
async fn send_scan<C>(
488488
scan_state: &ScanState,
489489
core: &C,
490-
match_pattern: Option<String>,
490+
match_pattern: Option<Vec<u8>>,
491491
count: Option<usize>,
492492
object_type: Option<ObjectType>,
493493
) -> RedisResult<Value>
@@ -518,7 +518,7 @@ where
518518
async fn retry_scan<C>(
519519
scan_state: &ScanState,
520520
core: &C,
521-
match_pattern: Option<String>,
521+
match_pattern: Option<Vec<u8>>,
522522
count: Option<usize>,
523523
object_type: Option<ObjectType>,
524524
) -> RedisResult<(RedisResult<Value>, ScanState)>

redis/tests/test_cluster_scan.rs

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ mod test_cluster_scan_async {
6666
let mut keys: Vec<String> = vec![];
6767
loop {
6868
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
69-
.cluster_scan(scan_state_rc, None, None, None)
69+
.cluster_scan(scan_state_rc, None, None)
7070
.await
7171
.unwrap();
7272
scan_state_rc = next_cursor;
@@ -112,7 +112,7 @@ mod test_cluster_scan_async {
112112
loop {
113113
count += 1;
114114
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
115-
.cluster_scan(scan_state_rc, None, None, None)
115+
.cluster_scan(scan_state_rc, None, None)
116116
.await
117117
.unwrap();
118118
scan_state_rc = next_cursor;
@@ -183,9 +183,8 @@ mod test_cluster_scan_async {
183183
let mut result: RedisResult<Value> = Ok(Value::Nil);
184184
loop {
185185
count += 1;
186-
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
187-
.cluster_scan(scan_state_rc, None, None, None)
188-
.await;
186+
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
187+
connection.cluster_scan(scan_state_rc, None, None).await;
189188
let (next_cursor, scan_keys) = match scan_response {
190189
Ok((cursor, keys)) => (cursor, keys),
191190
Err(e) => {
@@ -256,9 +255,8 @@ mod test_cluster_scan_async {
256255
let mut count = 0;
257256
loop {
258257
count += 1;
259-
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
260-
.cluster_scan(scan_state_rc, None, None, None)
261-
.await;
258+
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
259+
connection.cluster_scan(scan_state_rc, None, None).await;
262260
if scan_response.is_err() {
263261
println!("error: {:?}", scan_response);
264262
}
@@ -427,9 +425,8 @@ mod test_cluster_scan_async {
427425
let mut count = 0;
428426
loop {
429427
count += 1;
430-
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
431-
.cluster_scan(scan_state_rc, None, None, None)
432-
.await;
428+
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
429+
connection.cluster_scan(scan_state_rc, None, None).await;
433430
if scan_response.is_err() {
434431
println!("error: {:?}", scan_response);
435432
}
@@ -497,7 +494,7 @@ mod test_cluster_scan_async {
497494
let mut keys: Vec<String> = vec![];
498495
loop {
499496
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
500-
.cluster_scan(scan_state_rc, None, None, None)
497+
.cluster_scan(scan_state_rc, None, None)
501498
.await
502499
.unwrap();
503500
scan_state_rc = next_cursor;
@@ -557,7 +554,7 @@ mod test_cluster_scan_async {
557554
let mut keys: Vec<String> = vec![];
558555
loop {
559556
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
560-
.cluster_scan(scan_state_rc, None, None, None)
557+
.cluster_scan(scan_state_rc, None, None)
561558
.await
562559
.unwrap();
563560
scan_state_rc = next_cursor;
@@ -625,7 +622,7 @@ mod test_cluster_scan_async {
625622
let mut keys: Vec<String> = vec![];
626623
loop {
627624
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
628-
.cluster_scan(scan_state_rc, Some("key:pattern:*"), None, None)
625+
.cluster_scan_with_pattern(scan_state_rc, "key:pattern:*", None, None)
629626
.await
630627
.unwrap();
631628
scan_state_rc = next_cursor;
@@ -683,7 +680,7 @@ mod test_cluster_scan_async {
683680
let mut keys: Vec<String> = vec![];
684681
loop {
685682
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
686-
.cluster_scan(scan_state_rc, None, None, Some(ObjectType::Set))
683+
.cluster_scan(scan_state_rc, None, Some(ObjectType::Set))
687684
.await
688685
.unwrap();
689686
scan_state_rc = next_cursor;
@@ -736,11 +733,11 @@ mod test_cluster_scan_async {
736733
let mut comparing_times = 0;
737734
loop {
738735
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
739-
.cluster_scan(scan_state_rc.clone(), None, Some(100), None)
736+
.cluster_scan(scan_state_rc.clone(), Some(100), None)
740737
.await
741738
.unwrap();
742739
let (_, scan_without_count_keys): (ScanStateRC, Vec<Value>) = connection
743-
.cluster_scan(scan_state_rc, None, Some(100), None)
740+
.cluster_scan(scan_state_rc, Some(100), None)
744741
.await
745742
.unwrap();
746743
if !scan_keys.is_empty() && !scan_without_count_keys.is_empty() {
@@ -795,9 +792,8 @@ mod test_cluster_scan_async {
795792
let mut count = 0;
796793
loop {
797794
count += 1;
798-
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
799-
.cluster_scan(scan_state_rc, None, None, None)
800-
.await;
795+
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
796+
connection.cluster_scan(scan_state_rc, None, None).await;
801797
if scan_response.is_err() {
802798
println!("error: {:?}", scan_response);
803799
}
@@ -810,7 +806,7 @@ mod test_cluster_scan_async {
810806
if count == 5 {
811807
drop(cluster);
812808
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
813-
.cluster_scan(scan_state_rc.clone(), None, None, None)
809+
.cluster_scan(scan_state_rc.clone(), None, None)
814810
.await;
815811
assert!(scan_response.is_err());
816812
break;
@@ -819,9 +815,8 @@ mod test_cluster_scan_async {
819815
cluster = TestClusterContext::new(3, 0);
820816
connection = cluster.async_connection(None).await;
821817
loop {
822-
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
823-
.cluster_scan(scan_state_rc, None, None, None)
824-
.await;
818+
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> =
819+
connection.cluster_scan(scan_state_rc, None, None).await;
825820
if scan_response.is_err() {
826821
println!("error: {:?}", scan_response);
827822
}

0 commit comments

Comments
 (0)