-
Notifications
You must be signed in to change notification settings - Fork 136
Add Bound{Statement,Batch} #1321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Your concern is justified. The scenario indeed may surprise the users with unexpected runtime behaviour. There exists a general solution to this problem: all affected items from If you say that it's a lot of duplicated code, then I agree. If you say it's a huge effort to duplicate everything, I also agree. Therefore, we likely prefer a non-general solution in this case, but rather one that is suited for this particular case. @Lorak-mmk has some idea IIUC. |
156aab0
to
ee9b930
Compare
I've began using this branch at my job and we've noticed a very clear drop in how long it takes us to process our data and reduced our number of scaled instances as well. For context, the specific workload is: we get a giant file with lots of different types of events that we want to save onto different tables, but there is often multiple events that go into the same table and partition. To do this we make batches that we keep in essentially a Our average processing time for a file went from ~150ms to ~50ms, so a drop in 66%. Of course this is anecdotal and specific to my use case of batching so YMMV. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I stopped the review on the third commit for now.
scylla/src/client/session.rs
Outdated
values: impl SerializeRow, | ||
paging_state: PagingState, | ||
) -> Result<(QueryResult, PagingStateResponse), ExecutionError> { | ||
self.do_execute_single_page(prepared, values, paging_state) | ||
.await | ||
let statement = prepared.clone().bind(&values)?; | ||
self.do_execute_single_page(&statement, paging_state).await | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 I think we'd prefer not to require cloning PreparedStatement
(even it should be quite cheap) in execute_{unpaged,single_page}()
. Could we instead introduce (for internal, but perhaps also external use) borrowed BoundStatement
s? Such that would borrow PreparedStatements
, but own SerializedValues
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User might like to do something like this:
let prepared = session.prepare(...).await?;
let mut bound = Vec::new();
bound.push(prepared.bind_by_ref(...));
bound.push(prepared.bind_by_ref(...));
...
let results = futures::future::try_join_all(
bound.into_iter()
.map(|bound| session.execute_bound(bound))
)?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that should be very doable. I was assuming that cloning a PreparedStatement
was cheap enough as to not matter but I can add a BoundStatementRef
or something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified the existing BoundStatement
to hold a Cow<PreparedStatement>
such that it allows a borrowed or an owned version of PreparedStatement
. I then modified the existing PreparedStatement::bind
to borrow the prepared statement by reference and added a new method that moves it instead.
scylla/src/statement/bound.rs
Outdated
/// Represents a statement that already had all its values bound | ||
#[derive(Debug, Clone)] | ||
pub struct BoundStatement { | ||
pub(crate) prepared: PreparedStatement, | ||
pub(crate) values: SerializedValues, | ||
} | ||
|
||
impl BoundStatement { | ||
pub(crate) fn new( | ||
prepared: PreparedStatement, | ||
values: &impl SerializeRow, | ||
) -> Result<BoundStatement, SerializationError> { | ||
let values = prepared.serialize_values(values)?; | ||
Ok(Self { prepared, values }) | ||
} | ||
|
||
/// Determines which values constitute the partition key and puts them in order. | ||
/// | ||
/// This is a preparation step necessary for calculating token based on a prepared statement. | ||
pub(crate) fn pk(&self) -> Result<PartitionKey<'_>, PartitionKeyExtractionError> { | ||
PartitionKey::new(self.prepared.get_prepared_metadata(), &self.values) | ||
} | ||
|
||
pub(crate) fn pk_and_token( | ||
&self, | ||
) -> Result<Option<(PartitionKey<'_>, Token)>, PartitionKeyError> { | ||
if !self.prepared.is_token_aware() { | ||
return Ok(None); | ||
} | ||
|
||
let partition_key = self.pk()?; | ||
let token = partition_key.calculate_token(self.prepared.get_partitioner_name())?; | ||
Ok(Some((partition_key, token))) | ||
} | ||
|
||
/// Calculates the token for the prepared statement and its bound values | ||
/// | ||
/// Returns the token that would be computed for executing the provided prepared statement with | ||
/// the provided values. | ||
pub fn token(&self) -> Result<Option<Token>, PartitionKeyError> { | ||
self.pk_and_token().map(|p| p.map(|(_, t)| t)) | ||
} | ||
|
||
/// Returns the prepared statement behind the `BoundStatement` | ||
pub fn prepared(&self) -> &PreparedStatement { | ||
&self.prepared | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ The commit message says that the BoundStatement
is internal-only. At the same time, it's pub
and it has pub
methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no public way to create it, the only method PreparedStatement::bind
is pub(crate)
. But I am fine making the struct itself pub(crate)
as well for this commit and then undoing it in the commit that makes it public.
scylla/src/statement/bound.rs
Outdated
/// Determines which values constitute the partition key and puts them in order. | ||
/// | ||
/// This is a preparation step necessary for calculating token based on a prepared statement. | ||
pub(crate) fn pk(&self) -> Result<PartitionKey<'_>, PartitionKeyExtractionError> { | ||
PartitionKey::new(self.prepared.get_prepared_metadata(), &self.values) | ||
} | ||
|
||
pub(crate) fn pk_and_token( | ||
&self, | ||
) -> Result<Option<(PartitionKey<'_>, Token)>, PartitionKeyError> { | ||
if !self.prepared.is_token_aware() { | ||
return Ok(None); | ||
} | ||
|
||
let partition_key = self.pk()?; | ||
let token = partition_key.calculate_token(self.prepared.get_partitioner_name())?; | ||
Ok(Some((partition_key, token))) | ||
} | ||
|
||
/// Calculates the token for the prepared statement and its bound values | ||
/// | ||
/// Returns the token that would be computed for executing the provided prepared statement with | ||
/// the provided values. | ||
pub fn token(&self) -> Result<Option<Token>, PartitionKeyError> { | ||
self.pk_and_token().map(|p| p.map(|(_, t)| t)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 The previous method names (e.g., extract_partition_key_and_calculate_token()
), were verbose but informative. Their names warned that they required computations; the new names suggest getter-like cheap operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, and I am okay changing it back but if I could push back just slightly, the previous one was more expensive because it also serialized the values as part of that calculation. For this method the values have already been serialized so it is a cheaper calculation. Additionally based on the types it is already implied that some sort of calculation needs to happen at runtime since the operation can fail. Although... looking at the code it still a bit complex so I am fine adding calculate
back in there or something like that.
async fn last_minute_prepare_batch<'b>( | ||
&self, | ||
init_batch: &'b Batch, | ||
values: impl BatchValues, | ||
) -> Result<Cow<'b, Batch>, PrepareError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ What does the name of this method mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't think of a good name for it and I wanted to get the idea out there rather than bikeshed on it, my bad.
Basically this is the preparing of the batch that occurs at the moment a batch gets executed if any of its statements with values weren't already prepared. This logic already existed but it was done in a latter step and I had to do move it here since I wanted the execution of Batch
to be built upon BoundBatch
to make it easier to catch bugs instead of duplicating logic. BoundBatch
does not allow for unprepared statements with values so this "last minute prepare" was moved here.
General thought: |
ee9b930
to
8481e96
Compare
That doesn't seem to match the existing implementation (in Prepared statement unpaged: scylla-rust-driver/scylla/src/client/session.rs Line 1299 in 0085612
Prepared statement single page: scylla-rust-driver/scylla/src/client/session.rs Line 1318 in 0085612
Prepared statement iterating: scylla-rust-driver/scylla/src/client/session.rs Line 1438 in 0085612
In fact, even for unprepared statements, as long as there are values, they eventually get prepared and serialized, instead of serializing straight into the networking buffer: scylla-rust-driver/scylla/src/client/session.rs Lines 1085 to 1086 in 0085612
The only case I see where we don't pre-serialize values is unprepared statements without values. But That all being said, you all do in fact do that for |
8481e96
to
f9ec0a2
Compare
f9ec0a2
to
fe53d08
Compare
49d7424
to
1a2e7f3
Compare
this struct keeps track of a PreparedStatement and SerializedValues
this is the version that the top crate (scylla) will use to send batches
it implements Default same as `Batch`, and it also allows for override of the batch_type same as `Batch`
1a2e7f3
to
bafcdf9
Compare
allows users to transform an existing bound statement into one that doesn't borrow the prepared statement (by cloning it)
bafcdf9
to
2b5f6c6
Compare
This PR adds a version of
BoundStatement
andBoundBatch
to hopefully help inspire your future design of these structs.As discussed in: #941, you all wish to design these features yourselves (for totally understandable reasons) so this PR is here just to help inspire the future implementation, perhaps discuss decisions/API, or whatever else I may be able to help with. I will likely use this branch in my own work to solve my need for this API temporarily until an official version is up.
BoundStatement
This is probably the easiest to understand. It is just a
PreparedStatement
with a boundSerializedValues
. The only public way to create this struct is throughPreparedStatement::bind(self, impl: SerializeRow)
, thus making it externally type safe at creation, while internally it erases the type and just keeps the byte buffer (SerializedValues
).This struct is useful to:
Token
related to the prepared statement + values combo without needing to double-serialize (once to get the token and then again when executing the statement).All existing internal APIs where a
PreparedStatement
and its relevant values where passed in together are rewritten to useBoundStatement
instead while all external APIs remain the same so as to not introduce breaking changes.BoundBatch
This is where things begin to get interesting. While a naive implementation of
BoundBatch
that simply keeps aVec<PreparedStatement>
could work, that'd take away some of the current niceties of howBatch
is serialized, namely when serializing a batch and its values it doesn't create a small buffer for each serialized row but instead serializes every statement + value into one joined buffer thus avoiding multiple small allocations. To let this current optimization live-on while still allowing each statement + value to be passed in sequentially theBoundBatch
instead keeps a single buffer where it serializes each statement + value as they are passed in. When the bound batch is executed this large buffer is copied into the request buffer in one go. Since we are serializing as we go, we need to keep track a few extra details:This struct is useful to:
BoundStatement
to avoid re-serializing values . This is useful because it is common for batches to be more efficient when they are all for the same token. So now a user can make aBoundStatement
, use it to calculate its token, and based on that decide whatBoundBatch
instance to put theBoundStatement
into.BoundBatch
can easily copy the serialized bytes out ofBoundStatement
thus saving CPU resources.Note that this implementation of
BoundBatch
allows for unprepared statements to be added if and only if there are no values associated for the unprepared statement. This is done to follow the existing logic ofBatch
where any unprepared statement that had values would be prepared prior to doing the request, now we are just making it explicit for the user to do have to prepared it if they want to useBoundBatch
. The internal implementation of executing aBatch
has been rewritten to first prepare the statements in theBatch
(just as it used to do but earlier now) and then create aBoundBatch
. This allows the logic of executing a batch to be all in one place so that all the existing tests ofBatch
end up testingBoundBatch
execution as well.Execute
traitThis is a sealed trait (users of the crate can call its methods but not implement the trait) for types that can be executed on a
Session
without any additional values. For now this is directly implemented only onBoundBatch
. Existing methods have been rewritten to use this trait but externally nothing has changed other than this trait being usable publicly. (so someone can callmy_bound_batch.execute(&session)
.ExecutePageable
traitAnother sealed trait for types that be executed on a
Session
but are aware of pagiination. This is implemented directly onBoundStatement
and(Statement, impl SerializeValeus)
. Any type that implementsExecutePageable
will also auto-implementExecute
with the implementation calling for the pageable methods with no page limit, and no initial paging state. This allows forBoundStatement
and(Statement, impl SerializeValeus)
to have the same three ways to be called now: no pagination from the start, no pagination from a saved point, and pagination from a saved point. This is done via a single method that has a const generic but could easily be refactored to two methods instead (that call for the generic method under the hood).Other random notes
Because the
scylla-cql
crate is already in v1.x, and theBatch
struct in that crate has its definition fully public, I couldn't change it at all so instead I had to create a newBatchV2
. I left the current implementation ofBatch
(de)serialization and instead made requests with the batch opcode (de)serialize usingBatchV2
. This allows code to compile but it is technically perhaps a breaking change in that if a foreign crate relies onscylla-cql
deserializing toBatch
instead of the newBatchV2
then their code will break at runtime. I can't think of a reason why someone would do that but I figured it was worth bringing up anyway.Fixes: #941
Pre-review checklist
./docs/source/
.Fixes:
annotations to PR description.