Skip to content

Commit ac3aca3

Browse files
wprzytulamuzarski
authored andcommitted
connection[_pool]: propagate UntranslatedEndpoint borrowing
As address translation no longer needs owned Strings, it's enough to pass references to UntranslatedEndpoint to `open_connection` & friends. Note that, unfortunately, we can't fully elide the clone in `start_opening_connection()`, because the endpoint may be mutated (in the shard-aware case) for the purpose of opening a connection to the shard-aware port. Nonetheless, now the limitation that requires us to clone is in internal code, not in the user-facing API, which is better for us.
1 parent 7b5d3ac commit ac3aca3

File tree

2 files changed

+27
-28
lines changed

2 files changed

+27
-28
lines changed

scylla/src/network/connection.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2045,16 +2045,16 @@ impl Connection {
20452045
}
20462046

20472047
async fn maybe_translated_addr(
2048-
endpoint: UntranslatedEndpoint,
2048+
endpoint: &UntranslatedEndpoint,
20492049
address_translator: Option<&dyn AddressTranslator>,
20502050
) -> Result<SocketAddr, TranslationError> {
2051-
match endpoint {
2052-
UntranslatedEndpoint::ContactPoint(addr) => Ok(addr.address),
2051+
match *endpoint {
2052+
UntranslatedEndpoint::ContactPoint(ref addr) => Ok(addr.address),
20532053
UntranslatedEndpoint::Peer(PeerEndpoint {
20542054
host_id,
20552055
address,
2056-
datacenter,
2057-
rack,
2056+
ref datacenter,
2057+
ref rack,
20582058
}) => match address {
20592059
NodeAddr::Translatable(addr) => {
20602060
// In this case, addr is subject to AddressTranslator.
@@ -2089,7 +2089,7 @@ async fn maybe_translated_addr(
20892089
///
20902090
/// At the beginning, translates node's address, if it is subject to address translation.
20912091
pub(super) async fn open_connection(
2092-
endpoint: UntranslatedEndpoint,
2092+
endpoint: &UntranslatedEndpoint,
20932093
source_port: Option<u16>,
20942094
config: &HostConnectionConfig,
20952095
) -> Result<(Connection, ErrorReceiver), ConnectionError> {
@@ -2207,7 +2207,7 @@ pub(super) async fn open_connection(
22072207
}
22082208

22092209
pub(super) async fn open_connection_to_shard_aware_port(
2210-
endpoint: UntranslatedEndpoint,
2210+
endpoint: &UntranslatedEndpoint,
22112211
shard: Shard,
22122212
sharder: Sharder,
22132213
config: &HostConnectionConfig,
@@ -2216,7 +2216,7 @@ pub(super) async fn open_connection_to_shard_aware_port(
22162216
let source_port_iter = sharder.iter_source_ports_for_shard(shard);
22172217

22182218
for port in source_port_iter {
2219-
let connect_result = open_connection(endpoint.clone(), Some(port), config).await;
2219+
let connect_result = open_connection(endpoint, Some(port), config).await;
22202220

22212221
match connect_result {
22222222
Err(err) if err.is_address_unavailable_for_use() => continue, // If we can't use this port, try the next one
@@ -2525,7 +2525,7 @@ mod tests {
25252525
let addr: SocketAddr = resolve_hostname(&uri).await;
25262526

25272527
let (connection, _) = super::open_connection(
2528-
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
2528+
&UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
25292529
address: addr,
25302530
datacenter: None,
25312531
}),
@@ -2650,7 +2650,7 @@ mod tests {
26502650

26512651
let subtest = |enable_coalescing: bool, ks: String| async move {
26522652
let (connection, _) = super::open_connection(
2653-
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
2653+
&UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
26542654
address: addr,
26552655
datacenter: None,
26562656
}),
@@ -2782,16 +2782,20 @@ mod tests {
27822782
.unwrap();
27832783

27842784
// We must interrupt the driver's full connection opening, because our proxy does not interact further after Startup.
2785+
let endpoint = UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
2786+
address: proxy_addr,
2787+
datacenter: None,
2788+
});
27852789
let (startup_without_lwt_optimisation, _shard) = select! {
2786-
_ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, &config) => unreachable!(),
2790+
_ = open_connection(&endpoint, None, &config) => unreachable!(),
27872791
startup = startup_rx.recv() => startup.unwrap(),
27882792
};
27892793

27902794
proxy.running_nodes[0]
27912795
.change_request_rules(Some(make_rules(options_with_lwt_optimisation_support)));
27922796

27932797
let (startup_with_lwt_optimisation, _shard) = select! {
2794-
_ = open_connection(UntranslatedEndpoint::ContactPoint(ResolvedContactPoint{address: proxy_addr, datacenter: None}), None, &config) => unreachable!(),
2798+
_ = open_connection(&endpoint, None, &config) => unreachable!(),
27952799
startup = startup_rx.recv() => startup.unwrap(),
27962800
};
27972801

@@ -2850,7 +2854,7 @@ mod tests {
28502854

28512855
// Setup connection normally, without obstruction
28522856
let (conn, mut error_receiver) = open_connection(
2853-
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
2857+
&UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
28542858
address: proxy_addr,
28552859
datacenter: None,
28562860
}),

scylla/src/network/connection_pool.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,7 @@ impl PoolRefiller {
871871
endpoint
872872
};
873873
let result = open_connection_to_shard_aware_port(
874-
shard_aware_endpoint,
874+
&shard_aware_endpoint,
875875
shard,
876876
sharder.clone(),
877877
&cfg,
@@ -886,7 +886,7 @@ impl PoolRefiller {
886886
.boxed(),
887887
_ => async move {
888888
let non_shard_aware_endpoint = endpoint;
889-
let result = open_connection(non_shard_aware_endpoint, None, &cfg).await;
889+
let result = open_connection(&non_shard_aware_endpoint, None, &cfg).await;
890890
OpenedConnectionEvent {
891891
result,
892892
requested_shard: None,
@@ -1170,20 +1170,15 @@ mod tests {
11701170
// to the right shard
11711171
let sharder = Sharder::new(ShardCount::new(3).unwrap(), 12);
11721172

1173+
let endpoint = UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
1174+
address: connect_address,
1175+
datacenter: None,
1176+
});
1177+
11731178
// Open the connections
1174-
let mut conns = Vec::new();
1175-
1176-
for _ in 0..connections_number {
1177-
conns.push(open_connection_to_shard_aware_port(
1178-
UntranslatedEndpoint::ContactPoint(ResolvedContactPoint {
1179-
address: connect_address,
1180-
datacenter: None,
1181-
}),
1182-
0,
1183-
sharder.clone(),
1184-
&connection_config,
1185-
));
1186-
}
1179+
let conns = (0..connections_number).map(|_| {
1180+
open_connection_to_shard_aware_port(&endpoint, 0, sharder.clone(), &connection_config)
1181+
});
11871182

11881183
let joined = futures::future::join_all(conns).await;
11891184

0 commit comments

Comments
 (0)