Skip to content

Commit b5c2d0c

Browse files
muzarskiLorak-mmk
authored andcommitted
ccm: introduce ip_allocator module
Few things happen in this commit 1. Move NetPrefix from cluster.rs to ip_allocator.rs 2. Introduce IpAllocator (with LocalSubnetIdentifier) LocalSubnetIdentifier is a utility wrapper over two ip octets, by which we identify a local subnet (127.x.y.0/24). It can be converted to Ipv4Addr and vice-versa. For now, we only support ipv4 (even though NetPrefix) supports ipv6 as well. This may change in the future. IpAllocator does exactly what we would expect - it keeps track of the ip address pool and allows to allocate/free addresses (subnet ids) from this pool. Upon construction, it scans the /proc/net/tcp file to find out which subnets are used before the tests start. This way, the pool is initialized. We introduce a global IpAllocator for test purposes. It's fine, because IpAllocator is thread-safe. The only assumption we make, is that the ip pool is exclusive to the tests when they run. This is true for CI, and for most local use cases. There are some niche cases which break this assumption, but we do not care about it now. This solution works just fine for hackathon purposes. I introduced two new unit tests for Ipv4Addr and LocalSubnetIdentifier conversions. 3. Adjust cluster.rs to the allocator. The `alloc_ip_prefix` is called where NetPrefix::sniff_ip_prefix was originally called. The `free_ip_prefix` is called in `Cluster` destructor.
1 parent 15696d4 commit b5c2d0c

File tree

3 files changed

+259
-160
lines changed

3 files changed

+259
-160
lines changed

scylla/tests/ccm_integration/ccm/cluster.rs

Lines changed: 16 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
use crate::ccm::ROOT_CCM_DIR;
1+
use crate::ccm::{IP_ALLOCATOR, ROOT_CCM_DIR};
22

3+
use super::ip_allocator::NetPrefix;
34
use super::logged_cmd::{LoggedCmd, RunOptions};
45
use super::node_config::NodeConfig;
56
use anyhow::{Context, Error};
67
use scylla::client::session_builder::SessionBuilder;
7-
use std::collections::{HashMap, HashSet};
8-
use std::fmt::Display;
9-
use std::net::{AddrParseError, IpAddr, Ipv4Addr, Ipv6Addr};
8+
use std::collections::HashMap;
9+
use std::net::IpAddr;
1010
use std::path::{Path, PathBuf};
1111
use std::process::Command;
1212
use std::str::FromStr;
1313
use std::sync::Arc;
1414
use tempfile::TempDir;
15-
use tokio::fs::{metadata, File};
16-
use tokio::io::{AsyncBufReadExt, BufReader};
15+
use tokio::fs::metadata;
1716
use tokio::sync::RwLock;
1817
use tracing::{debug, info};
1918

@@ -450,6 +449,13 @@ impl Drop for Cluster {
450449
fn drop(&mut self) {
451450
if !self.opts.do_not_remove_on_drop {
452451
self.destroy_sync().ok();
452+
453+
// Return the IP prefix to the pool.
454+
IP_ALLOCATOR
455+
.lock()
456+
.expect("Failed to acquire IP_ALLOCATOR lock")
457+
.free_ip_prefix(&self.opts.ip_prefix)
458+
.expect("Failed to return ip prefix");
453459
}
454460
}
455461
}
@@ -530,7 +536,10 @@ impl Cluster {
530536
pub(crate) async fn new(opts: ClusterOptions) -> Result<Self, Error> {
531537
let mut opts = opts.clone();
532538
if opts.ip_prefix.is_empty() {
533-
opts.ip_prefix = NetPrefix::sniff_ipv4_prefix().await?
539+
opts.ip_prefix = IP_ALLOCATOR
540+
.lock()
541+
.expect("Failed to acquire IP_ALLOCATOR lock")
542+
.alloc_ip_prefix()?
534543
};
535544

536545
let config_dir = TempDir::with_prefix_in(&opts.name, &*ROOT_CCM_DIR)
@@ -758,152 +767,3 @@ impl Cluster {
758767
&self.nodes
759768
}
760769
}
761-
762-
#[derive(Debug, Clone)]
763-
pub(crate) struct NetPrefix(IpAddr);
764-
765-
impl NetPrefix {
766-
/// This method looks at all busy /24 networks in 127.0.0.0/8 and return first available
767-
/// network goes as busy if anything is listening on any port and any address in this network
768-
/// 127.0.0.0/24 is skipped and never returned
769-
async fn sniff_ipv4_prefix() -> Result<NetPrefix, Error> {
770-
let mut used_ips: HashSet<IpAddr> = HashSet::new();
771-
let file = File::open("/proc/net/tcp").await?;
772-
let mut lines = BufReader::new(file).lines();
773-
while let Some(line) = lines.next_line().await? {
774-
let parts: Vec<&str> = line.split_whitespace().collect();
775-
if let Some(ip_hex) = parts.get(1) {
776-
let ip_port: Vec<&str> = ip_hex.split(':').collect();
777-
if let Some(ip_hex) = ip_port.first() {
778-
if let Ok(ip) = u32::from_str_radix(ip_hex, 16) {
779-
used_ips.insert(
780-
Ipv4Addr::new(ip as u8, (ip >> 8) as u8, (ip >> 16) as u8, 0).into(),
781-
);
782-
}
783-
}
784-
}
785-
}
786-
787-
for a in 0..=255 {
788-
for b in 0..=255 {
789-
if a == 0 && b == 0 {
790-
continue;
791-
}
792-
let ip_prefix: IpAddr = Ipv4Addr::new(127, a, b, 0).into();
793-
if !used_ips.contains(&ip_prefix) {
794-
return Ok(ip_prefix.into());
795-
}
796-
}
797-
}
798-
Err(Error::msg("All ip ranges are busy"))
799-
}
800-
801-
fn empty() -> Self {
802-
NetPrefix(IpAddr::V6(Ipv6Addr::UNSPECIFIED))
803-
}
804-
805-
fn is_empty(&self) -> bool {
806-
self.0.is_unspecified()
807-
}
808-
809-
fn from_string(value: String) -> Result<Self, AddrParseError> {
810-
Ok(IpAddr::from_str(&value)?.into())
811-
}
812-
813-
fn to_str(&self) -> String {
814-
match self.0 {
815-
IpAddr::V4(v4) => {
816-
let octets = v4.octets();
817-
format!("{}.{}.{}.", octets[0], octets[1], octets[2])
818-
}
819-
IpAddr::V6(v6) => {
820-
let mut segments = v6.segments();
821-
segments[7] = 0; // Set last segment to 0
822-
let new_ip = Ipv6Addr::from(segments);
823-
let formatted = new_ip.to_string();
824-
formatted.rsplit_once(':').map(|x| x.0).unwrap().to_string() + ":"
825-
}
826-
}
827-
}
828-
829-
fn to_ipaddress(&self, id: u16) -> IpAddr {
830-
match self.0 {
831-
IpAddr::V4(v4) => {
832-
let mut octets = v4.octets();
833-
octets[3] = id as u8;
834-
IpAddr::V4(Ipv4Addr::from(octets))
835-
}
836-
IpAddr::V6(v6) => {
837-
let mut segments = v6.segments();
838-
segments[7] = id;
839-
IpAddr::V6(Ipv6Addr::from(segments))
840-
}
841-
}
842-
}
843-
}
844-
845-
impl Display for NetPrefix {
846-
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
847-
write!(f, "{}", self.0)
848-
}
849-
}
850-
851-
impl From<IpAddr> for NetPrefix {
852-
fn from(ip: IpAddr) -> Self {
853-
NetPrefix(ip)
854-
}
855-
}
856-
857-
impl From<NetPrefix> for String {
858-
fn from(value: NetPrefix) -> Self {
859-
value.0.to_string()
860-
}
861-
}
862-
863-
impl Default for NetPrefix {
864-
fn default() -> Self {
865-
NetPrefix::empty()
866-
}
867-
}
868-
869-
#[cfg(test)]
870-
mod tests {
871-
use super::*;
872-
873-
#[test]
874-
fn test_ipv4_prefix() {
875-
let ip = NetPrefix::from_string("192.168.1.100".to_string()).unwrap();
876-
assert_eq!(ip.to_str(), "192.168.1.");
877-
}
878-
879-
#[test]
880-
fn test_ipv4_loopback() {
881-
let ip = NetPrefix::from_string("127.0.0.1".to_string()).unwrap();
882-
assert_eq!(ip.to_str(), "127.0.0.");
883-
}
884-
885-
#[test]
886-
fn test_ipv4_edge_case() {
887-
let ip = NetPrefix::from_string("0.0.0.0".to_string()).unwrap();
888-
assert_eq!(ip.to_str(), "0.0.0.");
889-
}
890-
891-
#[test]
892-
fn test_ipv6_prefix() {
893-
let ip =
894-
NetPrefix::from_string("2001:0db8:85a3:0000:0000:8a2e:0370:7334".to_string()).unwrap();
895-
assert_eq!(ip.to_str(), "2001:db8:85a3::8a2e:370:");
896-
}
897-
898-
#[test]
899-
fn test_ipv6_loopback() {
900-
let ip = NetPrefix::from_string("::1".to_string()).unwrap();
901-
assert_eq!(ip.to_str(), "::");
902-
}
903-
904-
#[test]
905-
fn test_ipv6_shortened() {
906-
let ip = NetPrefix::from_string("2001:db8::ff00:42:8329".to_string()).unwrap();
907-
assert_eq!(ip.to_str(), "2001:db8::ff00:42:");
908-
}
909-
}

0 commit comments

Comments
 (0)