Skip to content

Commit 96d974d

Browse files
authored
Core + Py: add allow_non_covered_slots to ClusterScan and related commands (valkey-io#2814)
* added to core and python the option to scan uncoverd slots in a cluster Signed-off-by: avifenesh <[email protected]> * addresed comments Signed-off-by: avifenesh <[email protected]> * Refactor slot scanning logic and improve test coverage for cluster scan functionality Signed-off-by: avifenesh <[email protected]> * Refactor cluster readiness check and enhance scan error handling in tests Signed-off-by: avifenesh <[email protected]> --------- Signed-off-by: avifenesh <[email protected]>
1 parent ef18be3 commit 96d974d

File tree

15 files changed

+1407
-726
lines changed

15 files changed

+1407
-726
lines changed

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 37 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,13 @@ pub mod testing {
3232
use crate::{
3333
client::GlideConnectionOptions,
3434
cluster_routing::{Routable, RoutingInfo, ShardUpdateResult},
35-
cluster_slotmap::SlotMap,
3635
cluster_topology::{
3736
calculate_topology, get_slot, SlotRefreshState, DEFAULT_NUMBER_OF_REFRESH_SLOTS_RETRIES,
3837
DEFAULT_REFRESH_SLOTS_RETRY_BASE_DURATION_MILLIS, DEFAULT_REFRESH_SLOTS_RETRY_BASE_FACTOR,
39-
SLOT_SIZE,
4038
},
4139
cmd,
42-
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ObjectType, ScanStateRC},
43-
FromRedisValue, InfoDict, ToRedisArgs,
40+
commands::cluster_scan::{cluster_scan, ClusterScanArgs, ScanStateRC},
41+
FromRedisValue, InfoDict,
4442
};
4543
use dashmap::DashMap;
4644
use std::{
@@ -111,7 +109,7 @@ use crate::types::RetryMethod;
111109

112110
pub(crate) const MUTEX_READ_ERR: &str = "Failed to obtain read lock. Poisoned mutex?";
113111
const MUTEX_WRITE_ERR: &str = "Failed to obtain write lock. Poisoned mutex?";
114-
/// This represents an async Redis Cluster connection. It stores the
112+
/// This represents an async Cluster connection. It stores the
115113
/// underlying connections maintained for each node in the cluster, as well
116114
/// as common parameters for connecting to nodes and executing commands.
117115
#[derive(Clone)]
@@ -142,79 +140,18 @@ where
142140
})
143141
}
144142

145-
/// Special handling for `SCAN` command, using `cluster_scan`.
146-
/// If you wish to use a match pattern, use [`cluster_scan_with_pattern`].
147-
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
148-
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
149-
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
150-
///
151-
/// # Arguments
152-
///
153-
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
154-
/// for each subsequent iteration use the returned [`ScanStateRC`].
155-
/// * `count` - An optional count of keys requested,
156-
/// the amount returned can vary and not obligated to return exactly count.
157-
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
158-
///
159-
/// # Returns
160-
///
161-
/// A [`ScanStateRC`] for the updated state of the scan and the vector of keys that were found in the scan.
162-
/// structure of returned value:
163-
/// `Ok((ScanStateRC, Vec<Value>))`
164-
///
165-
/// When the scan is finished [`ScanStateRC`] will be None, and can be checked by calling `scan_state_wrapper.is_finished()`.
166-
///
167-
/// # Example
168-
/// ```rust,no_run
169-
/// use redis::cluster::ClusterClient;
170-
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
171-
///
172-
/// async fn scan_all_cluster() -> Vec<String> {
173-
/// let nodes = vec!["redis://127.0.0.1/"];
174-
/// let client = ClusterClient::new(nodes).unwrap();
175-
/// let mut connection = client.get_async_connection(None).await.unwrap();
176-
/// let mut scan_state_rc = ScanStateRC::new();
177-
/// let mut keys: Vec<String> = vec![];
178-
/// loop {
179-
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
180-
/// connection.cluster_scan(scan_state_rc, None, None).await.unwrap();
181-
/// scan_state_rc = next_cursor;
182-
/// let mut scan_keys = scan_keys
183-
/// .into_iter()
184-
/// .map(|v| from_redis_value(&v).unwrap())
185-
/// .collect::<Vec<String>>(); // Change the type of `keys` to `Vec<String>`
186-
/// keys.append(&mut scan_keys);
187-
/// if scan_state_rc.is_finished() {
188-
/// break;
189-
/// }
190-
/// }
191-
/// keys
192-
/// }
193-
/// ```
194-
pub async fn cluster_scan(
195-
&mut self,
196-
scan_state_rc: ScanStateRC,
197-
count: Option<usize>,
198-
object_type: Option<ObjectType>,
199-
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
200-
let cluster_scan_args = ClusterScanArgs::new(scan_state_rc, None, count, object_type);
201-
self.route_cluster_scan(cluster_scan_args).await
202-
}
203-
204143
/// Special handling for `SCAN` command, using `cluster_scan_with_pattern`.
205144
/// It is a special case of [`cluster_scan`], with an additional match pattern.
206-
/// Perform a `SCAN` command on a Redis cluster, using scan state object in order to handle changes in topology
145+
/// Perform a `SCAN` command on a cluster, using scan state object in order to handle changes in topology
207146
/// and make sure that all keys that were in the cluster from start to end of the scan are scanned.
208147
/// In order to make sure all keys in the cluster scanned, topology refresh occurs more frequently and may affect performance.
209148
///
210149
/// # Arguments
211150
///
212151
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
213152
/// for each subsequent iteration use the returned [`ScanStateRC`].
214-
/// * `match_pattern` - A match pattern of requested keys.
215-
/// * `count` - An optional count of keys requested,
216-
/// the amount returned can vary and not obligated to return exactly count.
217-
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
153+
/// * `cluster_scan_args` - A [`ClusterScanArgs`] struct containing the arguments for the cluster scan command - match pattern, count,
154+
/// object type and the allow_non_covered_slots flag.
218155
///
219156
/// # Returns
220157
///
@@ -227,17 +164,18 @@ where
227164
/// # Example
228165
/// ```rust,no_run
229166
/// use redis::cluster::ClusterClient;
230-
/// use redis::{ScanStateRC, FromRedisValue, from_redis_value, Value, ObjectType};
167+
/// use redis::{ScanStateRC, from_redis_value, Value, ObjectType, ClusterScanArgs};
231168
///
232169
/// async fn scan_all_cluster() -> Vec<String> {
233170
/// let nodes = vec!["redis://127.0.0.1/"];
234171
/// let client = ClusterClient::new(nodes).unwrap();
235172
/// let mut connection = client.get_async_connection(None).await.unwrap();
236173
/// let mut scan_state_rc = ScanStateRC::new();
237174
/// let mut keys: Vec<String> = vec![];
175+
/// let cluster_scan_args = ClusterScanArgs::builder().with_count(1000).with_object_type(ObjectType::String).build();
238176
/// loop {
239177
/// let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) =
240-
/// connection.cluster_scan_with_pattern(scan_state_rc, b"my_key", None, None).await.unwrap();
178+
/// connection.cluster_scan(scan_state_rc, cluster_scan_args.clone()).await.unwrap();
241179
/// scan_state_rc = next_cursor;
242180
/// let mut scan_keys = scan_keys
243181
/// .into_iter()
@@ -251,19 +189,12 @@ where
251189
/// keys
252190
/// }
253191
/// ```
254-
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
192+
pub async fn cluster_scan(
255193
&mut self,
256194
scan_state_rc: ScanStateRC,
257-
match_pattern: K,
258-
count: Option<usize>,
259-
object_type: Option<ObjectType>,
195+
mut cluster_scan_args: ClusterScanArgs,
260196
) -> RedisResult<(ScanStateRC, Vec<Value>)> {
261-
let cluster_scan_args = ClusterScanArgs::new(
262-
scan_state_rc,
263-
Some(match_pattern.to_redis_args().concat()),
264-
count,
265-
object_type,
266-
);
197+
cluster_scan_args.set_scan_state_cursor(scan_state_rc);
267198
self.route_cluster_scan(cluster_scan_args).await
268199
}
269200

@@ -279,18 +210,18 @@ where
279210
sender,
280211
})
281212
.await
282-
.map_err(|_| {
213+
.map_err(|e| {
283214
RedisError::from(io::Error::new(
284215
io::ErrorKind::BrokenPipe,
285-
"redis_cluster: Unable to send command",
216+
format!("Cluster: Error occurred while trying to send SCAN command to internal send task. {e:?}"),
286217
))
287218
})?;
288219
receiver
289220
.await
290-
.unwrap_or_else(|_| {
221+
.unwrap_or_else(|e| {
291222
Err(RedisError::from(io::Error::new(
292223
io::ErrorKind::BrokenPipe,
293-
"redis_cluster: Unable to receive command",
224+
format!("Cluster: Failed to receive SCAN command response from internal send task. {e:?}"),
294225
)))
295226
})
296227
.map(|response| match response {
@@ -316,18 +247,20 @@ where
316247
sender,
317248
})
318249
.await
319-
.map_err(|_| {
250+
.map_err(|e| {
320251
RedisError::from(io::Error::new(
321252
io::ErrorKind::BrokenPipe,
322-
"redis_cluster: Unable to send command",
253+
format!("Cluster: Error occurred while trying to send command to internal sender. {e:?}"),
323254
))
324255
})?;
325256
receiver
326257
.await
327-
.unwrap_or_else(|_| {
258+
.unwrap_or_else(|e| {
328259
Err(RedisError::from(io::Error::new(
329260
io::ErrorKind::BrokenPipe,
330-
"redis_cluster: Unable to receive command",
261+
format!(
262+
"Cluster: Failed to receive command response from internal sender. {e:?}"
263+
),
331264
)))
332265
})
333266
.map(|response| match response {
@@ -489,21 +422,8 @@ where
489422
.map_err(|_| RedisError::from((ErrorKind::ClientError, MUTEX_WRITE_ERR)))
490423
}
491424

492-
// return address of node for slot
493-
pub(crate) async fn get_address_from_slot(
494-
&self,
495-
slot: u16,
496-
slot_addr: SlotAddr,
497-
) -> Option<Arc<String>> {
498-
self.conn_lock
499-
.read()
500-
.expect(MUTEX_READ_ERR)
501-
.slot_map
502-
.get_node_address_for_slot(slot, slot_addr)
503-
}
504-
505425
// return epoch of node
506-
pub(crate) async fn get_address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
426+
pub(crate) async fn address_epoch(&self, node_address: &str) -> Result<u64, RedisError> {
507427
let command = cmd("CLUSTER").arg("INFO").to_owned();
508428
let node_conn = self
509429
.conn_lock
@@ -541,14 +461,26 @@ where
541461
}
542462
}
543463

544-
// return slots of node
545-
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
464+
/// return slots of node
465+
pub(crate) async fn slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
546466
self.conn_lock
547467
.read()
548468
.expect(MUTEX_READ_ERR)
549469
.slot_map
550470
.get_slots_of_node(node_address)
551471
}
472+
473+
/// Get connection for address
474+
pub(crate) async fn connection_for_address(
475+
&self,
476+
address: &str,
477+
) -> Option<ConnectionFuture<C>> {
478+
self.conn_lock
479+
.read()
480+
.expect(MUTEX_READ_ERR)
481+
.connection_for_address(address)
482+
.map(|(_, conn)| conn)
483+
}
552484
}
553485

554486
pub(crate) struct ClusterConnInner<C> {
@@ -1884,14 +1816,6 @@ where
18841816
Self::refresh_slots_inner(inner, curr_retry).await
18851817
}
18861818

1887-
pub(crate) fn check_if_all_slots_covered(slot_map: &SlotMap) -> bool {
1888-
let mut slots_covered = 0;
1889-
for (end, slots) in slot_map.slots.iter() {
1890-
slots_covered += end.saturating_sub(slots.start).saturating_add(1);
1891-
}
1892-
slots_covered == SLOT_SIZE
1893-
}
1894-
18951819
// Query a node to discover slot-> master mappings
18961820
async fn refresh_slots_inner(inner: Arc<InnerCore<C>>, curr_retry: usize) -> RedisResult<()> {
18971821
let num_of_nodes = inner.conn_lock.read().expect(MUTEX_READ_ERR).len();

glide-core/redis-rs/redis/src/cluster_slotmap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl SlotMap {
202202
.collect()
203203
}
204204

205-
pub(crate) fn get_node_address_for_slot(
205+
pub(crate) fn node_address_for_slot(
206206
&self,
207207
slot: u16,
208208
slot_addr: SlotAddr,

0 commit comments

Comments
 (0)