diff --git a/flow-filter/src/lib.rs b/flow-filter/src/lib.rs index d70c0b647..930f0edbe 100644 --- a/flow-filter/src/lib.rs +++ b/flow-filter/src/lib.rs @@ -77,16 +77,16 @@ impl FlowFilter { }); let log_str = format_packet_addrs_ports(&src_ip, &dst_ip, ports); - let Some(VpcdLookupResult::Single(dst_vpcd)) = - tablesr.lookup(src_vpcd, &src_ip, &dst_ip, ports) - else { - debug!("{nfi}: Flow not allowed, dropping packet: {log_str}"); - packet.done(DoneReason::Filtered); - return; - }; - - debug!("{nfi}: Flow allowed: {log_str}, setting packet dst_vpcd to {dst_vpcd}"); - packet.meta.dst_vpcd = Some(dst_vpcd); + match tablesr.lookup(src_vpcd, &src_ip, &dst_ip, ports) { + Some(VpcdLookupResult::Single(dst_vpcd) | VpcdLookupResult::Default(dst_vpcd)) => { + debug!("{nfi}: Flow allowed: {log_str}, setting packet dst_vpcd to {dst_vpcd}"); + packet.meta.dst_vpcd = Some(dst_vpcd); + } + None => { + debug!("{nfi}: Flow not allowed, dropping packet: {log_str}"); + packet.done(DoneReason::Filtered); + } + } } } @@ -188,7 +188,7 @@ mod tests { table .insert( src_vpcd, - VpcdLookupResult::Single(dst_vpcd), + dst_vpcd, Prefix::from("10.0.0.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.0.0/24"), @@ -227,7 +227,7 @@ mod tests { table .insert( src_vpcd, - VpcdLookupResult::Single(dst_vpcd), + dst_vpcd, Prefix::from("10.0.0.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.0.0/24"), @@ -288,7 +288,7 @@ mod tests { table .insert( src_vpcd, - VpcdLookupResult::Single(dst_vpcd), + dst_vpcd, Prefix::from("10.0.0.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.0.0/24"), @@ -326,7 +326,7 @@ mod tests { table .insert( src_vpcd, - VpcdLookupResult::Single(dst_vpcd), + dst_vpcd, Prefix::from("2001:db8::/32"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("2001:db9::/32"), @@ -365,7 +365,7 @@ mod tests { table .insert( src_vpcd, - VpcdLookupResult::Single(dst_vpcd), + dst_vpcd, Prefix::from("10.0.0.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.0.0/24"), diff --git a/flow-filter/src/setup.rs b/flow-filter/src/setup.rs index 8b277fcf8..a2e15214a 100644 --- a/flow-filter/src/setup.rs +++ b/flow-filter/src/setup.rs @@ -2,7 +2,6 @@ // Copyright Open Network Fabric Authors use crate::FlowFilterTable; -use crate::tables::VpcdLookupResult; use config::ConfigError; use config::external::overlay::Overlay; use config::external::overlay::vpc::{Peering, Vpc}; @@ -42,20 +41,46 @@ impl FlowFilterTable { .remote .exposes .iter() - .flat_map(|expose| expose.public_ips()); - - // For each local prefix, add one entry for each associated remote prefix - for local_prefix in local_prefixes { - for remote_prefix in remote_prefixes.clone() { - self.insert( + .flat_map(|expose| expose.public_ips()) + .collect::>(); + // We support one default at most for now + let remote_has_default = peering.remote.exposes.iter().any(|expose| expose.default); + + if remote_prefixes.is_empty() && !remote_has_default { + return Err(ConfigError::FailureApply( + "No remote prefixes found".to_string(), + )); + } else if remote_prefixes.is_empty() && remote_has_default { + // Corner case: all prefixes go to the default remote. In this case we need to build + // entries for the source prefixes, so that we can validate that packets come from + // legitimate source prefixes, but we do not associate any destination (we'll fall back + // to the default destination) + for local_prefix in local_prefixes { + self.insert_default_only( local_vpcd, - VpcdLookupResult::Single(dst_vpcd), local_prefix.prefix(), local_prefix.ports().into(), - remote_prefix.prefix(), - remote_prefix.ports().into(), )?; } + } else { + // remote_prefixes is not empty: for each local prefix, add one entry for each + // associated remote prefix + for local_prefix in local_prefixes { + for remote_prefix in &remote_prefixes { + self.insert( + local_vpcd, + dst_vpcd, + local_prefix.prefix(), + local_prefix.ports().into(), + remote_prefix.prefix(), + remote_prefix.ports().into(), + )?; + } + } + } + + if remote_has_default { + self.add_default_remote(local_vpcd, dst_vpcd)?; } Ok(()) } diff --git a/flow-filter/src/tables.rs b/flow-filter/src/tables.rs index 6f90a9026..8e840ac35 100644 --- a/flow-filter/src/tables.rs +++ b/flow-filter/src/tables.rs @@ -22,6 +22,8 @@ trace_target!("flow-filter-tables", LevelFilter::INFO, &[]); pub(crate) enum VpcdLookupResult { /// A single VPC discriminant was found. Single(VpcDiscriminant), + /// Fallback to the Default VPC discriminant for this peering. + Default(VpcDiscriminant), // Note: This enum briefly supported another "MultipleMatches" variant, introduced in commit // 2461c5e90579 ("feat(flow-filter): Add new flow-filter stage to enforce peering rules") for // supporting overlapping exposed prefixes, but this is now rejected at validation time. @@ -119,7 +121,13 @@ impl FlowFilterTable { // We have a dst_connection_data object for our source VPC, IP, port. From this object, we // need to retrieve the prefix information associated to our destination IP and port. - let remote_prefix_data = dst_connection_data.lookup(dst_addr, dst_port)?; + let Some(remote_prefix_data) = dst_connection_data.lookup(dst_addr, dst_port) else { + let default_remote_opt = table.default_remote(); + debug!( + "No remote prefix information found, looking for default remote: {default_remote_opt:?}" + ); + return default_remote_opt.map(VpcdLookupResult::Default); + }; debug!("Found remote_prefix_data: {remote_prefix_data:?}"); // We have a remote_prefix_data object for our destination address, and the port ranges @@ -131,7 +139,7 @@ impl FlowFilterTable { pub(crate) fn insert( &mut self, src_vpcd: VpcDiscriminant, - dst_vpcd: VpcdLookupResult, + dst_vpcd: VpcDiscriminant, src_prefix: Prefix, src_port_range: OptionalPortRange, dst_prefix: Prefix, @@ -139,7 +147,7 @@ impl FlowFilterTable { ) -> Result<(), ConfigError> { if let Some(table) = self.get_table_mut(src_vpcd) { table.insert( - dst_vpcd, + VpcdLookupResult::Single(dst_vpcd), src_prefix, src_port_range, dst_prefix, @@ -148,7 +156,7 @@ impl FlowFilterTable { } else { let mut table = VpcConnectionsTable::new(); table.insert( - dst_vpcd, + VpcdLookupResult::Single(dst_vpcd), src_prefix, src_port_range, dst_prefix, @@ -158,18 +166,54 @@ impl FlowFilterTable { } Ok(()) } + + pub(crate) fn insert_default_only( + &mut self, + src_vpcd: VpcDiscriminant, + src_prefix: Prefix, + src_port_range: OptionalPortRange, + ) -> Result<(), ConfigError> { + if self.get_table_mut(src_vpcd).is_some() { + Err(ConfigError::InternalFailure(format!( + "Trying to add default remote for VPC discriminant {src_vpcd:?} that already has a table" + ))) + } else { + let mut table = VpcConnectionsTable::new(); + table.insert_empty(src_prefix, src_port_range)?; + Ok(()) + } + } + + pub(crate) fn add_default_remote( + &mut self, + local_vpcd: VpcDiscriminant, + dst_vpcd: VpcDiscriminant, + ) -> Result<(), ConfigError> { + let Some(table) = self.get_table_mut(local_vpcd) else { + return Err(ConfigError::InternalFailure(format!( + "No table found for VPC discriminant {local_vpcd:?}" + ))); + }; + table.add_default_remote(dst_vpcd) + } } #[derive(Debug, Clone)] -struct VpcConnectionsTable(IpPortPrefixTrie); +struct VpcConnectionsTable { + trie: IpPortPrefixTrie, + default_remote: Option, +} impl VpcConnectionsTable { fn new() -> Self { - Self(IpPortPrefixTrie::new()) + Self { + trie: IpPortPrefixTrie::new(), + default_remote: None, + } } fn lookup(&self, addr: &IpAddr, port: Option) -> Option<(Prefix, &SrcConnectionData)> { - self.0.lookup(addr, port) + self.trie.lookup(addr, port) } fn insert( @@ -180,15 +224,47 @@ impl VpcConnectionsTable { dst_prefix: Prefix, dst_port_range: OptionalPortRange, ) -> Result<(), ConfigError> { - if let Some(value) = self.0.get_mut(src_prefix) { + if let Some(value) = self.trie.get_mut(src_prefix) { value.update(src_port_range, dst_vpcd, dst_prefix, dst_port_range)?; } else { let value = SrcConnectionData::new(src_port_range, dst_vpcd, dst_prefix, dst_port_range); - self.0.insert(src_prefix, value); + self.trie.insert(src_prefix, value); } Ok(()) } + + fn insert_empty( + &mut self, + src_prefix: Prefix, + src_port_range: OptionalPortRange, + ) -> Result<(), ConfigError> { + if self.trie.contains_key(src_prefix) { + Err(ConfigError::InternalFailure(format!( + "Trying to insert empty connection data for prefix {src_prefix:?} that already has a connection data" + ))) + } else { + self.trie.insert( + src_prefix, + SrcConnectionData::new_empty_dst_data(src_port_range), + ); + Ok(()) + } + } + + fn default_remote(&self) -> Option { + self.default_remote + } + + fn add_default_remote(&mut self, dst_vpcd: VpcDiscriminant) -> Result<(), ConfigError> { + if let Some(current) = self.default_remote { + return Err(ConfigError::InternalFailure(format!( + "Trying to overwrite default VPC discriminant {current:?} with {dst_vpcd:?}" + ))); + } + self.default_remote = Some(dst_vpcd); + Ok(()) + } } #[derive(Debug, Clone)] @@ -207,6 +283,18 @@ impl SrcConnectionData { dst_port_range: OptionalPortRange, ) -> Self { let connection_data = DstConnectionData::new(dst_vpcd, dst_prefix, dst_port_range); + Self::new_from_dst_connection_data(src_port_range, connection_data) + } + + fn new_empty_dst_data(src_port_range: OptionalPortRange) -> Self { + let connection_data = DstConnectionData::new_for_default_remote(); + Self::new_from_dst_connection_data(src_port_range, connection_data) + } + + fn new_from_dst_connection_data( + src_port_range: OptionalPortRange, + connection_data: DstConnectionData, + ) -> Self { match src_port_range { OptionalPortRange::NoPortRangeMeansAllPorts => { SrcConnectionData::AllPorts(connection_data) @@ -217,6 +305,7 @@ impl SrcConnectionData { } } } + fn get_remote_prefixes_data(&self, src_port: Option) -> Option<&DstConnectionData> { match self { SrcConnectionData::AllPorts(remote_prefixes_data) => Some(remote_prefixes_data), @@ -334,7 +423,7 @@ impl ValueWithAssociatedRanges for RemotePrefixPortData { } #[derive(Debug, Clone)] -pub(crate) struct DstConnectionData(IpPortPrefixTrie); +pub(crate) struct DstConnectionData(Option>); impl DstConnectionData { fn new(vpcd: VpcdLookupResult, prefix: Prefix, port_range: OptionalPortRange) -> Self { @@ -344,11 +433,17 @@ impl DstConnectionData { RemotePrefixPortData::Ranges(DisjointRangesBTreeMap::from_iter([(range, vpcd)])) } }; - DstConnectionData(IpPortPrefixTrie::from(prefix, remote_data)) + DstConnectionData(Some(IpPortPrefixTrie::from(prefix, remote_data))) + } + + fn new_for_default_remote() -> Self { + DstConnectionData(None) } fn lookup(&self, addr: &IpAddr, port: Option) -> Option<&RemotePrefixPortData> { - self.0.lookup(addr, port).map(|(_, data)| data) + self.0 + .as_ref() + .and_then(|trie| trie.lookup(addr, port).map(|(_, data)| data)) } fn update( @@ -357,7 +452,12 @@ impl DstConnectionData { prefix: Prefix, port_range: OptionalPortRange, ) -> Result<(), ConfigError> { - match (self.0.get_mut(prefix), port_range) { + let Some(trie) = &mut self.0 else { + return Err(ConfigError::InternalFailure( + "Trying to update default remote information".to_string(), + )); + }; + match (trie.get_mut(prefix), port_range) { ( Some(RemotePrefixPortData::Ranges(existing_range_map)), OptionalPortRange::Some(range), @@ -373,7 +473,7 @@ impl DstConnectionData { } (None, range) => { let prefix_data = RemotePrefixPortData::new(range, vpcd); - self.0.insert(prefix, prefix_data); + trie.insert(prefix, prefix_data); } } Ok(()) @@ -420,7 +520,7 @@ mod tests { fn test_flow_filter_table_insert_and_contains_simple() { let mut table = FlowFilterTable::new(); let src_vpcd = vpcd(100); - let dst_vpcd = VpcdLookupResult::Single(vpcd(200)); + let dst_vpcd = vpcd(200); let src_prefix = Prefix::from("10.0.0.0/24"); let dst_prefix = Prefix::from("20.0.0.0/24"); @@ -428,7 +528,7 @@ mod tests { table .insert( src_vpcd, - dst_vpcd.clone(), + dst_vpcd, src_prefix, OptionalPortRange::NoPortRangeMeansAllPorts, dst_prefix, @@ -440,7 +540,7 @@ mod tests { let src_addr = "10.0.0.5".parse().unwrap(); let dst_addr = "20.0.0.10".parse().unwrap(); let vpcd_result = table.lookup(src_vpcd, &src_addr, &dst_addr, None); - assert_eq!(vpcd_result, Some(dst_vpcd)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd))); // Should not allow traffic from different src let wrong_src_addr = "10.1.0.5".parse().unwrap(); @@ -457,7 +557,7 @@ mod tests { fn test_flow_filter_table_with_port_ranges() { let mut table = FlowFilterTable::new(); let src_vpcd = vpcd(100); - let dst_vpcd = VpcdLookupResult::Single(vpcd(200)); + let dst_vpcd = vpcd(200); let src_prefix = Prefix::from("10.0.0.0/24"); let dst_prefix = Prefix::from("20.0.0.0/24"); @@ -467,7 +567,7 @@ mod tests { table .insert( src_vpcd, - dst_vpcd.clone(), + dst_vpcd, src_prefix, src_port_range, dst_prefix, @@ -480,7 +580,7 @@ mod tests { // Should allow with matching ports let vpcd_result = table.lookup(src_vpcd, &src_addr, &dst_addr, Some((1500, 80))); - assert_eq!(vpcd_result, Some(dst_vpcd)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd))); // Should not allow with non-matching src port let vpcd_result = table.lookup(src_vpcd, &src_addr, &dst_addr, Some((500, 80))); @@ -499,14 +599,14 @@ mod tests { fn test_flow_filter_table_multiple_entries() { let mut table = FlowFilterTable::new(); let src_vpcd = vpcd(100); - let dst_vpcd1 = VpcdLookupResult::Single(vpcd(200)); - let dst_vpcd2 = VpcdLookupResult::Single(vpcd(300)); + let dst_vpcd1 = vpcd(200); + let dst_vpcd2 = vpcd(300); // Add two entries for different destination prefixes table .insert( src_vpcd, - dst_vpcd1.clone(), + dst_vpcd1, Prefix::from("10.0.0.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.0.0/24"), @@ -517,7 +617,7 @@ mod tests { table .insert( src_vpcd, - dst_vpcd2.clone(), + dst_vpcd2, Prefix::from("10.0.0.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("30.0.0.0/24"), @@ -529,24 +629,24 @@ mod tests { // Should route to dst_vpcd1 let vpcd_result = table.lookup(src_vpcd, &src_addr, &"20.0.0.10".parse().unwrap(), None); - assert_eq!(vpcd_result, Some(dst_vpcd1)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd1))); // Should route to dst_vpcd2 let vpcd_result = table.lookup(src_vpcd, &src_addr, &"30.0.0.10".parse().unwrap(), None); - assert_eq!(vpcd_result, Some(dst_vpcd2)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd2))); } #[test] fn test_vpc_connections_table_lookup() { let mut table = VpcConnectionsTable::new(); - let dst_vpcd = VpcdLookupResult::Single(vpcd(200)); + let dst_vpcd = vpcd(200); let src_prefix = Prefix::from("10.0.0.0/24"); let dst_prefix = Prefix::from("20.0.0.0/24"); table .insert( - dst_vpcd, + VpcdLookupResult::Single(dst_vpcd), src_prefix, OptionalPortRange::NoPortRangeMeansAllPorts, dst_prefix, @@ -568,7 +668,7 @@ mod tests { #[test] fn test_vpc_connections_table_with_ports() { let mut table = VpcConnectionsTable::new(); - let dst_vpcd = VpcdLookupResult::Single(vpcd(200)); + let dst_vpcd = vpcd(200); let src_prefix = Prefix::from("10.0.0.0/24"); let dst_prefix = Prefix::from("20.0.0.0/24"); @@ -577,7 +677,7 @@ mod tests { table .insert( - dst_vpcd, + VpcdLookupResult::Single(dst_vpcd), src_prefix, src_port_range, dst_prefix, @@ -610,7 +710,7 @@ mod tests { fn test_flow_filter_table_ipv6() { let mut table = FlowFilterTable::new(); let src_vpcd = vpcd(100); - let dst_vpcd = VpcdLookupResult::Single(vpcd(200)); + let dst_vpcd = vpcd(200); let src_prefix = Prefix::from("2001:db8::/32"); let dst_prefix = Prefix::from("2001:db9::/32"); @@ -618,7 +718,7 @@ mod tests { table .insert( src_vpcd, - dst_vpcd.clone(), + dst_vpcd, src_prefix, OptionalPortRange::NoPortRangeMeansAllPorts, dst_prefix, @@ -629,21 +729,21 @@ mod tests { let src_addr = "2001:db8::1".parse().unwrap(); let dst_addr = "2001:db9::1".parse().unwrap(); let vpcd_result = table.lookup(src_vpcd, &src_addr, &dst_addr, None); - assert_eq!(vpcd_result, Some(dst_vpcd)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd))); } #[test] fn test_flow_filter_table_longest_prefix_match() { let mut table = FlowFilterTable::new(); let src_vpcd = vpcd(100); - let dst_vpcd1 = VpcdLookupResult::Single(vpcd(200)); - let dst_vpcd2 = VpcdLookupResult::Single(vpcd(300)); + let dst_vpcd1 = vpcd(200); + let dst_vpcd2 = vpcd(300); // Insert broader prefix table .insert( src_vpcd, - dst_vpcd1.clone(), + dst_vpcd1, Prefix::from("10.0.0.0/16"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.0.0/16"), @@ -655,7 +755,7 @@ mod tests { table .insert( src_vpcd, - dst_vpcd2.clone(), + dst_vpcd2, Prefix::from("10.0.1.0/24"), OptionalPortRange::NoPortRangeMeansAllPorts, Prefix::from("20.0.1.0/24"), @@ -670,7 +770,7 @@ mod tests { &"20.0.1.10".parse().unwrap(), None, ); - assert_eq!(vpcd_result, Some(dst_vpcd2)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd2))); // Should match the broader prefix for source let vpcd_result = table.lookup( @@ -679,7 +779,7 @@ mod tests { &"20.0.2.10".parse().unwrap(), None, ); - assert_eq!(vpcd_result, Some(dst_vpcd1)); + assert_eq!(vpcd_result, Some(VpcdLookupResult::Single(dst_vpcd1))); } #[test] diff --git a/lpm/src/trie/ip_port_prefix_trie.rs b/lpm/src/trie/ip_port_prefix_trie.rs index 405119cd0..50eec8ccf 100644 --- a/lpm/src/trie/ip_port_prefix_trie.rs +++ b/lpm/src/trie/ip_port_prefix_trie.rs @@ -72,6 +72,11 @@ where self.0.get_mut(prefix) } + /// Check if the trie contains a prefix. + pub fn contains_key(&self, prefix: Prefix) -> bool { + self.0.contains_key(prefix) + } + /// Look up an IP address and optional port in the trie. /// /// Returns the longest matching prefix and its associated value, if any. diff --git a/lpm/src/trie/mod.rs b/lpm/src/trie/mod.rs index bfe3767ce..5ff58fe74 100644 --- a/lpm/src/trie/mod.rs +++ b/lpm/src/trie/mod.rs @@ -43,6 +43,10 @@ pub trait TrieMap { where B: Borrow; + fn contains_key(&self, prefix: B) -> bool + where + B: Borrow; + fn iter(&self) -> impl Iterator; fn iter_mut(&mut self) -> impl Iterator; fn is_empty(&self) -> bool; @@ -104,6 +108,16 @@ impl IpPrefixTrie { } } + pub fn contains_key(&self, prefix: Q) -> bool + where + Q: Into, + { + match prefix.into() { + Prefix::IPV4(prefix) => self.ipv4.contains_key(prefix), + Prefix::IPV6(prefix) => self.ipv6.contains_key(prefix), + } + } + pub fn get_mut(&mut self, prefix: Q) -> Option<&mut V> where Q: Into, diff --git a/lpm/src/trie/prefix_map_impl.rs b/lpm/src/trie/prefix_map_impl.rs index ad3c48edf..9d8ec9433 100644 --- a/lpm/src/trie/prefix_map_impl.rs +++ b/lpm/src/trie/prefix_map_impl.rs @@ -105,6 +105,13 @@ where self.0.get_mut(&IpPrefixW(prefix.borrow().clone())) } + fn contains_key(&self, prefix: B) -> bool + where + B: Borrow, + { + self.0.contains_key(&IpPrefixW(prefix.borrow().clone())) + } + fn len(&self) -> usize { self.0.len() } diff --git a/lpm/src/trie/trie_with_default.rs b/lpm/src/trie/trie_with_default.rs index ed06647a3..c9cffb8eb 100644 --- a/lpm/src/trie/trie_with_default.rs +++ b/lpm/src/trie/trie_with_default.rs @@ -98,6 +98,13 @@ impl + TrieMapFactory> TrieMap for TrieMapWithDefa self.0.get_mut(prefix) } + fn contains_key(&self, prefix: B) -> bool + where + B: Borrow, + { + self.0.contains_key(prefix) + } + fn is_empty(&self) -> bool { self.0.is_empty() }