Skip to content

Commit f81d87f

Browse files
committed
adapt after master merge
1 parent a5b8b87 commit f81d87f

File tree

8 files changed

+186
-116
lines changed

8 files changed

+186
-116
lines changed

scylla-cql/src/types/serialize/row.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,17 @@ pub trait SerializeRow {
8080
/// the bind marker types and names so that the values can be properly
8181
/// type checked and serialized.
8282
fn is_empty(&self) -> bool;
83+
84+
/// Specialization that allows the driver to not re-serialize the row if it's already
85+
/// a `SerializedValues`
86+
///
87+
/// Note that if using this, it's the user's responsibility to ensure that this
88+
/// `SerializedValues` has been generated with the same prepared statement as the query
89+
/// is going to be made with.
90+
#[inline]
91+
fn already_serialized(&self) -> Option<&SerializedValues> {
92+
None
93+
}
8394
}
8495

8596
macro_rules! fallback_impl_contents {
@@ -255,12 +266,35 @@ impl<T: SerializeRow + ?Sized> SerializeRow for &T {
255266
ctx: &RowSerializationContext<'_>,
256267
writer: &mut RowWriter,
257268
) -> Result<(), SerializationError> {
258-
<T as SerializeRow>::serialize(self, ctx, writer)
269+
<T as SerializeRow>::serialize(*self, ctx, writer)
259270
}
260271

261272
#[inline]
262273
fn is_empty(&self) -> bool {
263-
<T as SerializeRow>::is_empty(self)
274+
<T as SerializeRow>::is_empty(*self)
275+
}
276+
277+
#[inline]
278+
fn already_serialized(&self) -> Option<&SerializedValues> {
279+
<T as SerializeRow>::already_serialized(*self)
280+
}
281+
}
282+
283+
impl SerializeRow for SerializedValues {
284+
fn serialize(
285+
&self,
286+
_ctx: &RowSerializationContext<'_>,
287+
writer: &mut RowWriter,
288+
) -> Result<(), SerializationError> {
289+
Ok(writer.append_serialize_row(self))
290+
}
291+
292+
fn is_empty(&self) -> bool {
293+
self.is_empty()
294+
}
295+
296+
fn already_serialized(&self) -> Option<&SerializedValues> {
297+
Some(self)
264298
}
265299
}
266300

scylla/src/statement/batch.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ use std::sync::Arc;
44
use crate::history::HistoryListener;
55
use crate::load_balancing;
66
use crate::retry_policy::RetryPolicy;
7+
use crate::routing::Shard;
78
use crate::statement::{prepared_statement::PreparedStatement, query::Query};
8-
use crate::transport::{execution_profile::ExecutionProfileHandle, Node};
9+
use crate::transport::execution_profile::ExecutionProfileHandle;
10+
use crate::transport::NodeRef;
911
use crate::Session;
1012

1113
use super::StatementConfig;
@@ -145,31 +147,31 @@ impl Batch {
145147
self.config.execution_profile_handle.as_ref()
146148
}
147149

148-
/// Associates the batch with a new execution profile that will have a load balancing policy
149-
/// that will enforce the use of the provided [`Node`] to the extent possible.
150+
/// Associates the batch with a new execution profile that will have a load
151+
/// balancing policy that will enforce the use of the provided [`Node`]
152+
/// to the extent possible.
150153
///
151-
/// This should typically be used in conjunction with [`Session::shard_for_statement`], where
152-
/// you would constitute a batch by assigning to the same batch all the statements that would be executed in
153-
/// the same shard.
154+
/// This should typically be used in conjunction with
155+
/// [`Session::shard_for_statement`], where you would constitute a batch
156+
/// by assigning to the same batch all the statements that would be executed
157+
/// in the same shard.
154158
///
155-
/// Since it is not guaranteed that subsequent calls to the load balancer would re-assign the statement
156-
/// to the same node, you should use this method to enforce the use of the original node that was envisioned by
159+
/// Since it is not guaranteed that subsequent calls to the load balancer
160+
/// would re-assign the statement to the same node, you should use this
161+
/// method to enforce the use of the original node that was envisioned by
157162
/// `shard_for_statement` for the batch:
158163
///
159164
/// ```rust
160165
/// # use scylla::Session;
161166
/// # use std::error::Error;
162167
/// # async fn check_only_compiles(session: &Session) -> Result<(), Box<dyn Error>> {
163-
/// use scylla::{
164-
/// batch::Batch,
165-
/// frame::value::{SerializedValues, ValueList},
166-
/// };
168+
/// use scylla::{batch::Batch, serialize::row::SerializedValues};
167169
///
168170
/// let prepared_statement = session
169171
/// .prepare("INSERT INTO ks.tab(a, b) VALUES(?, ?)")
170172
/// .await?;
171173
///
172-
/// let serialized_values: SerializedValues = (1, 2).serialized()?.into_owned();
174+
/// let serialized_values: SerializedValues = prepared_statement.serialize_values(&(1, 2))?;
173175
/// let shard = session.shard_for_statement(&prepared_statement, &serialized_values)?;
174176
///
175177
/// // Send that to a task that will handle statements targeted to the same shard
@@ -178,8 +180,8 @@ impl Batch {
178180
/// // Constitute a batch with all the statements that would be executed in the same shard
179181
///
180182
/// let mut batch: Batch = Default::default();
181-
/// if let Some((node, _shard_idx)) = shard {
182-
/// batch.enforce_target_node(&node, &session);
183+
/// if let Some((node, shard_idx)) = shard {
184+
/// batch.enforce_target_node(&node, shard_idx, &session);
183185
/// }
184186
/// let mut batch_values = Vec::new();
185187
///
@@ -195,13 +197,14 @@ impl Batch {
195197
/// ```
196198
///
197199
///
198-
/// If the target node is not available anymore at the time of executing the statement, it will fallback to the
199-
/// original load balancing policy:
200+
/// If the target node is not available anymore at the time of executing the
201+
/// statement, it will fallback to the original load balancing policy:
200202
/// - Either that currently set on the [`Batch`], if any
201203
/// - Or that of the [`Session`] if there isn't one on the `Batch`
202204
pub fn enforce_target_node(
203205
&mut self,
204-
node: &Arc<Node>,
206+
node: NodeRef<'_>,
207+
shard: Shard,
205208
base_execution_profile_from_session: &Session,
206209
) {
207210
let execution_profile_handle = self.get_execution_profile_handle().unwrap_or_else(|| {
@@ -210,8 +213,9 @@ impl Batch {
210213
self.set_execution_profile_handle(Some(
211214
execution_profile_handle
212215
.pointee_to_builder()
213-
.load_balancing_policy(Arc::new(load_balancing::EnforceTargetNodePolicy::new(
216+
.load_balancing_policy(Arc::new(load_balancing::EnforceTargetShardPolicy::new(
214217
node,
218+
shard,
215219
execution_profile_handle.load_balancing_policy(),
216220
)))
217221
.build()

scylla/src/statement/prepared_statement.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ impl PreparedStatement {
459459
self.config.execution_profile_handle.as_ref()
460460
}
461461

462-
pub(crate) fn serialize_values(
462+
pub fn serialize_values(
463463
&self,
464464
values: &impl SerializeRow,
465465
) -> Result<SerializedValues, SerializationError> {

scylla/src/transport/load_balancing/default.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -703,7 +703,7 @@ impl DefaultPolicy {
703703
vec.into_iter()
704704
}
705705

706-
fn is_alive(node: NodeRef, _shard: Option<Shard>) -> bool {
706+
pub(crate) fn is_alive(node: NodeRef, _shard: Option<Shard>) -> bool {
707707
// For now, we leave this as stub, until we have time to improve node events.
708708
// node.is_enabled() && !node.is_down()
709709
node.is_enabled()

scylla/src/transport/load_balancing/enforce_node.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use super::{DefaultPolicy, FallbackPlan, LoadBalancingPolicy, NodeRef, RoutingInfo};
2-
use crate::transport::{cluster::ClusterData, Node};
2+
use crate::{
3+
routing::Shard,
4+
transport::{cluster::ClusterData, Node},
5+
};
36
use std::sync::Arc;
47
use uuid::Uuid;
58

@@ -8,25 +11,36 @@ use uuid::Uuid;
811
///
912
/// This is meant to be used for shard-aware batching.
1013
#[derive(Debug)]
11-
pub struct EnforceTargetNodePolicy {
14+
pub struct EnforceTargetShardPolicy {
1215
target_node: Uuid,
16+
shard: Shard,
1317
fallback: Arc<dyn LoadBalancingPolicy>,
1418
}
1519

16-
impl EnforceTargetNodePolicy {
17-
pub fn new(target_node: &Arc<Node>, fallback: Arc<dyn LoadBalancingPolicy>) -> Self {
20+
impl EnforceTargetShardPolicy {
21+
pub fn new(
22+
target_node: &Arc<Node>,
23+
shard: Shard,
24+
fallback: Arc<dyn LoadBalancingPolicy>,
25+
) -> Self {
1826
Self {
1927
target_node: target_node.host_id,
28+
shard,
2029
fallback,
2130
}
2231
}
2332
}
24-
impl LoadBalancingPolicy for EnforceTargetNodePolicy {
25-
fn pick<'a>(&'a self, query: &'a RoutingInfo, cluster: &'a ClusterData) -> Option<NodeRef<'a>> {
33+
impl LoadBalancingPolicy for EnforceTargetShardPolicy {
34+
fn pick<'a>(
35+
&'a self,
36+
query: &'a RoutingInfo,
37+
cluster: &'a ClusterData,
38+
) -> Option<(NodeRef<'a>, Option<Shard>)> {
2639
cluster
2740
.known_peers
2841
.get(&self.target_node)
29-
.filter(DefaultPolicy::is_alive)
42+
.map(|node| (node, Some(self.shard)))
43+
.filter(|&(node, shard)| DefaultPolicy::is_alive(node, shard))
3044
.or_else(|| self.fallback.pick(query, cluster))
3145
}
3246

@@ -40,7 +54,7 @@ impl LoadBalancingPolicy for EnforceTargetNodePolicy {
4054

4155
fn name(&self) -> String {
4256
format!(
43-
"Enforce target node Load balancing policy - Node: {} - fallback: {}",
57+
"Enforce target shard Load balancing policy - Node: {} - fallback: {}",
4458
self.target_node,
4559
self.fallback.name()
4660
)

scylla/src/transport/load_balancing/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod plan;
1717
pub use plan::Plan;
1818
pub use {
1919
default::{DefaultPolicy, DefaultPolicyBuilder, LatencyAwarenessBuilder},
20-
enforce_node::EnforceTargetNodePolicy,
20+
enforce_node::EnforceTargetShardPolicy,
2121
};
2222

2323
/// Represents info about statement that can be used by load balancing policies.

0 commit comments

Comments
 (0)