Skip to content

Commit cdf9060

Browse files
authored
Fix Connection::changes() for remote queries (#2044)
This implementation is very much not ideal and should be reworked. Its exposing the `Inne` type and `inner` parameter of `HranaStream`. I can't think of a way to structure the code in a better way, but later we could extend the protocol to include the changes made by the query as a initial response, preventing having to hold a reference the the stream all together.
2 parents 8145962 + e9d6191 commit cdf9060

File tree

5 files changed

+36
-20
lines changed

5 files changed

+36
-20
lines changed

libsql/src/hrana/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ where
2828

2929
impl<T> HttpConnection<T>
3030
where
31-
T: HttpSend,
31+
T: HttpSend + Send + Sync + 'static,
3232
{
3333
pub fn new(url: String, token: String, inner: T) -> Self {
3434
// The `libsql://` protocol is an alias for `https://`.

libsql/src/hrana/mod.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use libsql_hrana::proto::{Batch, BatchResult, Col, Stmt, StmtResult};
2121
use std::collections::VecDeque;
2222
use std::future::Future;
2323
use std::pin::Pin;
24+
use std::sync::atomic::Ordering;
2425
use std::sync::Arc;
2526
use std::task::{Context, Poll};
2627

@@ -122,7 +123,7 @@ where
122123

123124
impl<T> Statement<T>
124125
where
125-
T: HttpSend,
126+
T: HttpSend + Send + Sync + 'static,
126127
{
127128
pub(crate) fn new(stream: HranaStream<T>, sql: String, want_rows: bool) -> crate::Result<Self> {
128129
// in SQLite when a multiple statements are glued together into one string, only the first one is
@@ -170,20 +171,20 @@ where
170171
pub(crate) async fn query_raw(
171172
&mut self,
172173
params: &Params,
173-
) -> crate::Result<HranaRows<T::Stream>> {
174+
) -> crate::Result<HranaRows<T::Stream, T>> {
174175
let mut stmt = self.inner.clone();
175176
bind_params(params.clone(), &mut stmt);
176177

177178
let cursor = self.stream.cursor(Batch::single(stmt)).await?;
178-
let rows = HranaRows::from_cursor(cursor).await?;
179+
let rows = HranaRows::from_cursor(cursor, self.stream.clone()).await?;
179180

180181
Ok(rows)
181182
}
182183
}
183184

184185
impl<T> Statement<T>
185186
where
186-
T: HttpSend,
187+
T: HttpSend + Send + Sync + 'static,
187188
<T as HttpSend>::Stream: Send + Sync + 'static,
188189
{
189190
pub async fn query(&mut self, params: &Params) -> crate::Result<super::Rows> {
@@ -192,28 +193,37 @@ where
192193
}
193194
}
194195

195-
pub struct HranaRows<S> {
196+
pub struct HranaRows<S, T: HttpSend> {
196197
cursor_step: OwnedCursorStep<S>,
197198
column_types: Option<Vec<ValueType>>,
199+
stream: HranaStream<T>,
198200
}
199201

200-
impl<S> HranaRows<S>
202+
impl<S, T> HranaRows<S, T>
201203
where
204+
T: HttpSend + Send + Sync + 'static,
202205
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
203206
{
204-
async fn from_cursor(cursor: Cursor<S>) -> Result<Self> {
207+
async fn from_cursor(cursor: Cursor<S>, stream: HranaStream<T>) -> Result<Self> {
205208
let cursor_step = cursor.next_step_owned().await?;
206209
Ok(HranaRows {
207210
cursor_step,
208211
column_types: None,
212+
stream,
209213
})
210214
}
211215

212216
pub async fn next(&mut self) -> crate::Result<Option<super::Row>> {
213217
let row = match self.cursor_step.next().await {
214218
Some(Ok(row)) => row,
215219
Some(Err(e)) => return Err(crate::Error::Hrana(Box::new(e))),
216-
None => return Ok(None),
220+
None => {
221+
self.stream
222+
.inner
223+
.affected_row_count
224+
.store(self.cursor_step.affected_rows().into(), Ordering::SeqCst);
225+
return Ok(None);
226+
}
217227
};
218228

219229
if self.column_types.is_none() {
@@ -254,17 +264,19 @@ where
254264
}
255265

256266
#[async_trait::async_trait]
257-
impl<S> RowsInner for HranaRows<S>
267+
impl<S, T> RowsInner for HranaRows<S, T>
258268
where
269+
T: HttpSend + Send + Sync + 'static,
259270
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin,
260271
{
261272
async fn next(&mut self) -> crate::Result<Option<super::Row>> {
262273
self.next().await
263274
}
264275
}
265276

266-
impl<S> ColumnsInner for HranaRows<S>
277+
impl<S, T> ColumnsInner for HranaRows<S, T>
267278
where
279+
T: HttpSend + Send + Sync + 'static,
268280
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin,
269281
{
270282
fn column_count(&self) -> i32 {

libsql/src/hrana/stream.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct HranaStream<T>
3030
where
3131
T: HttpSend,
3232
{
33-
inner: Arc<Inner<T>>,
33+
pub inner: Arc<Inner<T>>,
3434
}
3535

3636
impl<T> Clone for HranaStream<T>
@@ -94,7 +94,9 @@ where
9494
(0, 0)
9595
};
9696

97-
self.inner.total_changes.fetch_add(affected_row_count, Ordering::SeqCst);
97+
self.inner
98+
.total_changes
99+
.fetch_add(affected_row_count, Ordering::SeqCst);
98100
self.inner
99101
.affected_row_count
100102
.store(affected_row_count, Ordering::SeqCst);
@@ -279,11 +281,11 @@ where
279281
}
280282

281283
#[derive(Debug)]
282-
struct Inner<T>
284+
pub struct Inner<T>
283285
where
284286
T: HttpSend,
285287
{
286-
affected_row_count: AtomicU64,
288+
pub affected_row_count: AtomicU64,
287289
total_changes: AtomicU64,
288290
last_insert_rowid: AtomicI64,
289291
is_autocommit: AtomicBool,

libsql/src/wasm/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ cfg_cloudflare! {
4848
#[derive(Debug, Clone)]
4949
pub struct Connection<T>
5050
where
51-
T: HttpSend,
51+
T: HttpSend + Sync + Send + 'static,
5252
{
5353
conn: HttpConnection<T>,
5454
}
@@ -65,7 +65,7 @@ cfg_cloudflare! {
6565

6666
impl<T> Connection<T>
6767
where
68-
T: HttpSend,
68+
T: HttpSend + Sync + Send + 'static,
6969
<T as HttpSend>::Stream: 'static,
7070
{
7171
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result<u64> {
@@ -126,14 +126,14 @@ where
126126
#[derive(Debug, Clone)]
127127
pub struct Transaction<T>
128128
where
129-
T: HttpSend,
129+
T: HttpSend + Sync + Send + 'static,
130130
{
131131
inner: HttpTransaction<T>,
132132
}
133133

134134
impl<T> Transaction<T>
135135
where
136-
T: HttpSend,
136+
T: HttpSend + Sync + Send + 'static,
137137
<T as HttpSend>::Stream: 'static,
138138
{
139139
pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {

libsql/src/wasm/rows.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::hrana::HranaRows;
2+
use crate::hrana::HttpSend;
23
use crate::Row;
34
use bytes::Bytes;
45
use futures::Stream;
@@ -42,8 +43,9 @@ pub(super) trait RowsInner {
4243
}
4344

4445
#[async_trait::async_trait(?Send)]
45-
impl<S> RowsInner for HranaRows<S>
46+
impl<S, T> RowsInner for HranaRows<S, T>
4647
where
48+
T: HttpSend + Sync + Send + 'static,
4749
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
4850
{
4951
async fn next(&mut self) -> crate::Result<Option<Row>> {

0 commit comments

Comments
 (0)