Skip to content

Commit 738b1e1

Browse files
committed
Add Session::first_shard_for_statement
Resolves #468 This is a follow-up on #508 and #658: - To minimize CPU usage related to network operations when inserting a very large number of lines, it is relevant to batch. - To batch in the most efficient manner, these batches have to be shard-aware. Since #508, `batch` will pick the shard of the first statement to send the query to. However it is left to the user to constitute the batches in such a way that the target shard is the same for all the elements of the batch. - This was made *possible* by #658, but it was still very boilerplate-ish. I was waiting for #612 to be merged (amazing work btw! 😃) to implement a more direct and factored API (as that would use it). - This new ~`Session::first_shard_for_statement(self, &PreparedStatement, &SerializedValues) -> Option<(Node, Option<Shard>)>` makes shard-aware batching easy on the users, by providing access to the first node and shard of the query plan.
1 parent ad0d2f5 commit 738b1e1

File tree

1 file changed

+59
-16
lines changed

1 file changed

+59
-16
lines changed

scylla/src/transport/session.rs

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::frame::types::LegacyConsistency;
88
use crate::history;
99
use crate::history::HistoryListener;
1010
use crate::retry_policy::RetryPolicy;
11+
use crate::routing;
1112
use arc_swap::ArcSwapOption;
1213
use async_trait::async_trait;
1314
use bytes::Bytes;
@@ -898,15 +899,7 @@ impl Session {
898899
.as_ref()
899900
.map(|pk| prepared.get_partitioner_name().hash(pk));
900901

901-
let statement_info = RoutingInfo {
902-
consistency: prepared
903-
.get_consistency()
904-
.unwrap_or(self.default_execution_profile_handle.access().consistency),
905-
serial_consistency: prepared.get_serial_consistency(),
906-
token,
907-
keyspace: prepared.get_keyspace_name(),
908-
is_confirmed_lwt: prepared.is_confirmed_lwt(),
909-
};
902+
let statement_info = self.routing_info(prepared, token);
910903

911904
let span =
912905
RequestSpan::new_prepared(partition_key.as_ref(), token, serialized_values.size());
@@ -1814,13 +1807,63 @@ impl Session {
18141807
prepared: &PreparedStatement,
18151808
serialized_values: &SerializedValues,
18161809
) -> Result<Option<Token>, QueryError> {
1817-
match self.calculate_partition_key(prepared, serialized_values) {
1818-
Ok(Some(partition_key)) => {
1819-
let partitioner_name = prepared.get_partitioner_name();
1820-
Ok(Some(partitioner_name.hash(&partition_key)))
1821-
}
1822-
Ok(None) => Ok(None),
1823-
Err(err) => Err(err),
1810+
Ok(self
1811+
.calculate_partition_key(prepared, serialized_values)?
1812+
.map(|partition_key| prepared.get_partitioner_name().hash(&partition_key)))
1813+
}
1814+
1815+
/// Get the first node/shard that the load balancer would target if running this query
1816+
///
1817+
/// This may help constituting shard-aware batches
1818+
pub fn first_shard_for_statement(
1819+
&self,
1820+
prepared: &PreparedStatement,
1821+
serialized_values: &SerializedValues,
1822+
) -> Result<Option<(Arc<Node>, Option<routing::Shard>)>, QueryError> {
1823+
let token = match self.calculate_token(prepared, serialized_values)? {
1824+
Some(token) => token,
1825+
None => return Ok(None),
1826+
};
1827+
let routing_info = self.routing_info(prepared, Some(token));
1828+
let cluster_data = self.cluster.get_data();
1829+
let execution_profile = prepared
1830+
.config
1831+
.execution_profile_handle
1832+
.as_ref()
1833+
.unwrap_or_else(|| self.get_default_execution_profile_handle())
1834+
.access();
1835+
let mut query_plan = load_balancing::Plan::new(
1836+
&*execution_profile.load_balancing_policy,
1837+
&routing_info,
1838+
&cluster_data,
1839+
);
1840+
// We can't return the full iterator here because the iterator borrows from local variables.
1841+
// In order to achieve that, two designs would be possible:
1842+
// - Construct a self-referential struct and implement iterator on it via e.g. Ouroboros
1843+
// - Take a closure as a parameter that will take the local iterator and return anything, and
1844+
// this function would return directly what the closure returns
1845+
// Most likely though, people would use this for some kind of shard-awareness optimization for batching,
1846+
// and are consequently not interested in subsequent nodes.
1847+
// Until then, let's just expose this, as it is simpler
1848+
Ok(query_plan.next().map(move |node| {
1849+
let token = node.sharder().map(|sharder| sharder.shard_of(token));
1850+
(node.clone(), token)
1851+
}))
1852+
}
1853+
1854+
fn routing_info<'p>(
1855+
&self,
1856+
prepared: &'p PreparedStatement,
1857+
token: Option<Token>,
1858+
) -> RoutingInfo<'p> {
1859+
RoutingInfo {
1860+
consistency: prepared
1861+
.get_consistency()
1862+
.unwrap_or(self.default_execution_profile_handle.access().consistency),
1863+
serial_consistency: prepared.get_serial_consistency(),
1864+
token,
1865+
keyspace: prepared.get_keyspace_name(),
1866+
is_confirmed_lwt: prepared.is_confirmed_lwt(),
18241867
}
18251868
}
18261869

0 commit comments

Comments
 (0)