Skip to content

Commit 37e11d0

Browse files
authored
Merge pull request #1446 from Lorak-mmk/refactor-prepared-sharing
Centralize prepared statement data sharing
2 parents add9b22 + d100f9d commit 37e11d0

File tree

6 files changed

+131
-111
lines changed

6 files changed

+131
-111
lines changed

scylla/src/client/caching_session.rs

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@
44
use crate::errors::{ExecutionError, PagerExecutionError, PrepareError};
55
use crate::response::query_result::QueryResult;
66
use crate::response::{PagingState, PagingStateResponse};
7-
use crate::routing::partitioner::PartitionerName;
87
use crate::statement::batch::{Batch, BatchStatement};
9-
use crate::statement::prepared::PreparedStatement;
8+
use crate::statement::prepared::{PreparedStatement, UnconfiguredPreparedStatement};
109
use crate::statement::unprepared::Statement;
11-
use bytes::Bytes;
1210
use dashmap::DashMap;
1311
use futures::future::try_join_all;
14-
use scylla_cql::frame::response::result::{PreparedMetadata, ResultMetadata};
1512
use scylla_cql::serialize::batch::BatchValues;
1613
use scylla_cql::serialize::row::SerializeRow;
1714
use std::collections::hash_map::RandomState;
@@ -22,19 +19,6 @@ use std::sync::Arc;
2219
use crate::client::pager::QueryPager;
2320
use crate::client::session::Session;
2421

25-
/// Contains just the parts of a prepared statement that were returned
26-
/// from the database. All remaining parts (query string, page size,
27-
/// consistency, etc.) are taken from the Query passed
28-
/// to the `CachingSession::execute` family of methods.
29-
#[derive(Debug)]
30-
struct RawPreparedStatementData {
31-
id: Bytes,
32-
is_confirmed_lwt: bool,
33-
metadata: PreparedMetadata,
34-
result_metadata: Arc<ResultMetadata<'static>>,
35-
partitioner_name: PartitionerName,
36-
}
37-
3822
/// Provides auto caching while executing queries
3923
pub struct CachingSession<S = RandomState>
4024
where
@@ -45,7 +29,7 @@ where
4529
/// If a prepared statement is added while the limit is reached, the oldest prepared statement
4630
/// is removed from the cache
4731
max_capacity: usize,
48-
cache: DashMap<String, RawPreparedStatementData, S>,
32+
cache: DashMap<String, UnconfiguredPreparedStatement, S>,
4933
use_cached_metadata: bool,
5034
}
5135

@@ -204,16 +188,7 @@ where
204188

205189
if let Some(raw) = self.cache.get(&query.contents) {
206190
let page_size = query.get_validated_page_size();
207-
let mut stmt = PreparedStatement::new(
208-
raw.id.clone(),
209-
raw.is_confirmed_lwt,
210-
raw.metadata.clone(),
211-
raw.result_metadata.clone(),
212-
query.contents,
213-
page_size,
214-
query.config,
215-
);
216-
stmt.set_partitioner_name(raw.partitioner_name.clone());
191+
let mut stmt = raw.make_configured_handle(query.config, page_size);
217192
stmt.set_use_cached_result_metadata(self.use_cached_metadata);
218193
Ok(stmt)
219194
} else {
@@ -245,13 +220,7 @@ where
245220
}
246221
}
247222

248-
let raw = RawPreparedStatementData {
249-
id: prepared.get_id().clone(),
250-
is_confirmed_lwt: prepared.is_confirmed_lwt(),
251-
metadata: prepared.get_prepared_metadata().clone(),
252-
result_metadata: prepared.get_result_metadata().clone(),
253-
partitioner_name: prepared.get_partitioner_name().clone(),
254-
};
223+
let raw = prepared.make_unconfigured_handle();
255224
self.cache.insert(query_contents, raw);
256225

257226
Ok(prepared)

scylla/src/client/session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,20 +1341,20 @@ impl Session {
13411341

13421342
// Validate prepared ids equality.
13431343
for another_raw_prepared in raw_prepared_statements_results_iter.flatten() {
1344-
if prepared.get_id() != &another_raw_prepared.prepared_response.id {
1344+
if prepared.get_id() != another_raw_prepared.get_id() {
13451345
tracing::error!(
13461346
"Got differing ids upon statement preparation: statement \"{}\", id1: {:?}, id2: {:?}",
13471347
prepared.get_statement(),
13481348
prepared.get_id(),
1349-
another_raw_prepared.prepared_response.id
1349+
another_raw_prepared.get_id()
13501350
);
13511351
return Err(PrepareError::PreparedStatementIdsMismatch);
13521352
}
13531353

13541354
// Collect all tracing ids from prepare() queries in the final result
13551355
prepared
13561356
.prepare_tracing_ids
1357-
.extend(another_raw_prepared.tracing_id);
1357+
.extend(another_raw_prepared.tracing_id());
13581358
}
13591359

13601360
// This is the first preparation that succeeded.

scylla/src/network/connection.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,11 @@ use crate::frame::{
2121
use crate::policies::address_translator::{AddressTranslator, UntranslatedPeer};
2222
use crate::policies::timestamp_generator::TimestampGenerator;
2323
use crate::response::query_result::QueryResult;
24-
use crate::response::{
25-
NonErrorAuthResponse, NonErrorStartupResponse, PagingState, QueryResponse, RawPreparedStatement,
26-
};
24+
use crate::response::{NonErrorAuthResponse, NonErrorStartupResponse, PagingState, QueryResponse};
2725
use crate::routing::locator::tablets::{RawTablet, TabletParsingError};
2826
use crate::routing::{Shard, ShardAwarePortRange, ShardInfo, Sharder, ShardingError};
2927
use crate::statement::batch::{Batch, BatchStatement};
30-
use crate::statement::prepared::PreparedStatement;
28+
use crate::statement::prepared::{PreparedStatement, RawPreparedStatement};
3129
use crate::statement::unprepared::Statement;
3230
use crate::statement::{Consistency, PageSize};
3331
use bytes::Bytes;
@@ -688,13 +686,13 @@ impl Connection {
688686
previous_prepared: &PreparedStatement,
689687
) -> Result<(), RequestAttemptError> {
690688
let reprepare_query: Statement = query.into();
691-
let prepared_response = self.prepare_raw(&reprepare_query).await?.prepared_response;
689+
let raw_prepared = self.prepare_raw(&reprepare_query).await?;
692690

693691
// Reprepared statement should keep its id - it's the md5 sum
694692
// of statement contents
695-
if prepared_response.id != previous_prepared.get_id() {
693+
if raw_prepared.get_id() != previous_prepared.get_id() {
696694
Err(RequestAttemptError::RepreparedIdChanged {
697-
reprepared_id: prepared_response.id.into(),
695+
reprepared_id: raw_prepared.get_id().clone().into(),
698696
statement: reprepare_query.contents,
699697
expected_id: previous_prepared.get_id().clone().into(),
700698
})

scylla/src/response/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,5 @@ mod request_response;
1414
pub use coordinator::Coordinator;
1515
pub(crate) use request_response::{
1616
NonErrorAuthResponse, NonErrorQueryResponse, NonErrorStartupResponse, QueryResponse,
17-
RawPreparedStatement,
1817
};
1918
pub use scylla_cql::frame::request::query::{PagingState, PagingStateResponse};

scylla/src/response/request_response.rs

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::collections::HashMap;
2-
use std::sync::Arc;
32

43
use bytes::Bytes;
54
use scylla_cql::frame::request::query::PagingStateResponse;
@@ -11,8 +10,6 @@ use crate::errors::RequestAttemptError;
1110
use crate::frame::response::{self, result};
1211
use crate::response::Coordinator;
1312
use crate::response::query_result::QueryResult;
14-
use crate::statement::Statement;
15-
use crate::statement::prepared::PreparedStatement;
1613

1714
pub(crate) struct QueryResponse {
1815
pub(crate) response: Response,
@@ -145,59 +142,3 @@ pub(crate) enum NonErrorAuthResponse {
145142
AuthChallenge(response::authenticate::AuthChallenge),
146143
AuthSuccess(response::authenticate::AuthSuccess),
147144
}
148-
149-
/// Parts which are needed to construct [PreparedStatement].
150-
///
151-
/// Kept separate for performance reasons, because constructing
152-
/// [PreparedStatement] involves allocations.
153-
pub(crate) struct RawPreparedStatement<'statement> {
154-
pub(crate) statement: &'statement Statement,
155-
pub(crate) prepared_response: result::Prepared,
156-
pub(crate) is_lwt: bool,
157-
pub(crate) tracing_id: Option<Uuid>,
158-
}
159-
160-
impl<'statement> RawPreparedStatement<'statement> {
161-
pub(crate) fn new(
162-
statement: &'statement Statement,
163-
prepared_response: result::Prepared,
164-
is_lwt: bool,
165-
tracing_id: Option<Uuid>,
166-
) -> Self {
167-
Self {
168-
statement,
169-
prepared_response,
170-
is_lwt,
171-
tracing_id,
172-
}
173-
}
174-
}
175-
176-
/// Constructs the fully-fledged [PreparedStatement].
177-
///
178-
/// This involves allocations.
179-
impl RawPreparedStatement<'_> {
180-
pub(crate) fn into_prepared_statement(self) -> PreparedStatement {
181-
let Self {
182-
statement,
183-
prepared_response,
184-
is_lwt,
185-
tracing_id,
186-
} = self;
187-
let mut prepared_statement = PreparedStatement::new(
188-
prepared_response.id,
189-
is_lwt,
190-
prepared_response.prepared_metadata,
191-
Arc::new(prepared_response.result_metadata),
192-
statement.contents.clone(),
193-
statement.get_validated_page_size(),
194-
statement.config.clone(),
195-
);
196-
197-
if let Some(tracing_id) = tracing_id {
198-
prepared_statement.prepare_tracing_ids.push(tracing_id);
199-
}
200-
201-
prepared_statement
202-
}
203-
}

scylla/src/statement/prepared.rs

Lines changed: 119 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,92 @@ use uuid::Uuid;
1818
use super::{PageSize, StatementConfig};
1919
use crate::client::execution_profile::ExecutionProfileHandle;
2020
use crate::errors::{BadQuery, ExecutionError};
21-
use crate::frame::response::result::PreparedMetadata;
21+
use crate::frame::response::result::{self, PreparedMetadata};
2222
use crate::frame::types::{Consistency, SerialConsistency};
2323
use crate::observability::history::HistoryListener;
2424
use crate::policies::load_balancing::LoadBalancingPolicy;
2525
use crate::policies::retry::RetryPolicy;
2626
use crate::response::query_result::ColumnSpecs;
2727
use crate::routing::Token;
2828
use crate::routing::partitioner::{Partitioner, PartitionerHasher, PartitionerName};
29+
use crate::statement::Statement;
30+
31+
/// Parts which are needed to construct [PreparedStatement].
32+
///
33+
/// Kept separate for performance reasons, because constructing
34+
/// [PreparedStatement] involves allocations.
35+
///
36+
/// # Lifecycle of prepared statement
37+
///
38+
/// When PREPARE request is issued, RawPreparedStatement is returned,
39+
/// and later converted to PreparedStatement.
40+
/// PreparedStatement can be cloned. Clone is a new handle to the same
41+
/// underlying shared data - if the result metadata is updated, it is
42+
/// updated for all clones.
43+
/// PreparedStatement can be turned into UnconfiguredPreparedStatement.
44+
/// Similarly to clone, unconfigured statement is also a handle to the
45+
/// same shared data.
46+
/// UnconfiguredPreparedStatement can be used to create new PreparedStatement
47+
/// objects. Those are also handles to the same shared data.
48+
pub(crate) struct RawPreparedStatement<'statement> {
49+
statement: &'statement Statement,
50+
prepared_response: result::Prepared,
51+
is_lwt: bool,
52+
tracing_id: Option<Uuid>,
53+
}
54+
55+
impl<'statement> RawPreparedStatement<'statement> {
56+
pub(crate) fn new(
57+
statement: &'statement Statement,
58+
prepared_response: result::Prepared,
59+
is_lwt: bool,
60+
tracing_id: Option<Uuid>,
61+
) -> Self {
62+
Self {
63+
statement,
64+
prepared_response,
65+
is_lwt,
66+
tracing_id,
67+
}
68+
}
69+
70+
pub(crate) fn get_id(&self) -> &Bytes {
71+
&self.prepared_response.id
72+
}
73+
74+
pub(crate) fn tracing_id(&self) -> Option<Uuid> {
75+
self.tracing_id
76+
}
77+
}
78+
79+
/// Constructs the fully-fledged [PreparedStatement].
80+
///
81+
/// This involves allocations.
82+
impl RawPreparedStatement<'_> {
83+
pub(crate) fn into_prepared_statement(self) -> PreparedStatement {
84+
let Self {
85+
statement,
86+
prepared_response,
87+
is_lwt,
88+
tracing_id,
89+
} = self;
90+
let mut prepared_statement = PreparedStatement::new(
91+
prepared_response.id,
92+
is_lwt,
93+
prepared_response.prepared_metadata,
94+
Arc::new(prepared_response.result_metadata),
95+
statement.contents.clone(),
96+
statement.get_validated_page_size(),
97+
statement.config.clone(),
98+
);
99+
100+
if let Some(tracing_id) = tracing_id {
101+
prepared_statement.prepare_tracing_ids.push(tracing_id);
102+
}
103+
104+
prepared_statement
105+
}
106+
}
29107

30108
/// Represents a statement prepared on the server.
31109
///
@@ -108,7 +186,7 @@ pub struct PreparedStatement {
108186
struct PreparedStatementSharedData {
109187
id: Bytes,
110188
metadata: PreparedMetadata,
111-
result_metadata: Arc<ResultMetadata<'static>>,
189+
initial_result_metadata: Arc<ResultMetadata<'static>>,
112190
statement: String,
113191
is_confirmed_lwt: bool,
114192
}
@@ -126,7 +204,7 @@ impl Clone for PreparedStatement {
126204
}
127205

128206
impl PreparedStatement {
129-
pub(crate) fn new(
207+
fn new(
130208
id: Bytes,
131209
is_lwt: bool,
132210
metadata: PreparedMetadata,
@@ -139,7 +217,7 @@ impl PreparedStatement {
139217
shared: Arc::new(PreparedStatementSharedData {
140218
id,
141219
metadata,
142-
result_metadata,
220+
initial_result_metadata: result_metadata,
143221
statement,
144222
is_confirmed_lwt: is_lwt,
145223
}),
@@ -430,12 +508,12 @@ impl PreparedStatement {
430508

431509
/// Access metadata about the result of prepared statement returned by the database
432510
pub(crate) fn get_result_metadata(&self) -> &Arc<ResultMetadata<'static>> {
433-
&self.shared.result_metadata
511+
&self.shared.initial_result_metadata
434512
}
435513

436514
/// Access column specifications of the result set returned after the execution of this statement
437515
pub fn get_result_set_col_specs(&self) -> ColumnSpecs<'_, 'static> {
438-
ColumnSpecs::new(self.shared.result_metadata.col_specs())
516+
ColumnSpecs::new(self.shared.initial_result_metadata.col_specs())
439517
}
440518

441519
/// Get the name of the partitioner used for this statement.
@@ -506,6 +584,41 @@ impl PreparedStatement {
506584
let ctx = RowSerializationContext::from_prepared(self.get_prepared_metadata());
507585
SerializedValues::from_serializable(&ctx, values)
508586
}
587+
588+
pub(crate) fn make_unconfigured_handle(&self) -> UnconfiguredPreparedStatement {
589+
UnconfiguredPreparedStatement {
590+
shared: Arc::clone(&self.shared),
591+
partitioner_name: self.get_partitioner_name().clone(),
592+
}
593+
}
594+
}
595+
596+
/// This is a [PreparedStatement] without parts that are available to be configured
597+
/// on an unprepared [Statement]. It is intended to be used in [CachingSession] cache,
598+
/// as a type safety measue: it first needs to be configured with config taken from
599+
/// statement provided by the user.
600+
/// It contains partitioner_name field. It is configurable on prepared statement,
601+
/// but not on unprepared statement, so we need to keep it.
602+
#[derive(Debug)]
603+
pub(crate) struct UnconfiguredPreparedStatement {
604+
shared: Arc<PreparedStatementSharedData>,
605+
partitioner_name: PartitionerName,
606+
}
607+
608+
impl UnconfiguredPreparedStatement {
609+
pub(crate) fn make_configured_handle(
610+
&self,
611+
config: StatementConfig,
612+
page_size: PageSize,
613+
) -> PreparedStatement {
614+
PreparedStatement {
615+
shared: Arc::clone(&self.shared),
616+
prepare_tracing_ids: Vec::new(),
617+
page_size,
618+
partitioner_name: self.partitioner_name.clone(),
619+
config,
620+
}
621+
}
509622
}
510623

511624
/// Error when extracting partition key from bound values.

0 commit comments

Comments
 (0)