Skip to content

Commit 1dda8ef

Browse files
committed
fix: store affected rows for HranaRows
1 parent 8145962 commit 1dda8ef

File tree

3 files changed

+31
-16
lines changed

3 files changed

+31
-16
lines changed

libsql/src/hrana/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ where
2828

2929
impl<T> HttpConnection<T>
3030
where
31-
T: HttpSend,
31+
T: HttpSend + Send + Sync + 'static,
3232
{
3333
pub fn new(url: String, token: String, inner: T) -> Self {
3434
// The `libsql://` protocol is an alias for `https://`.

libsql/src/hrana/mod.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use libsql_hrana::proto::{Batch, BatchResult, Col, Stmt, StmtResult};
2121
use std::collections::VecDeque;
2222
use std::future::Future;
2323
use std::pin::Pin;
24+
use std::sync::atomic::Ordering;
2425
use std::sync::Arc;
2526
use std::task::{Context, Poll};
2627

@@ -122,7 +123,7 @@ where
122123

123124
impl<T> Statement<T>
124125
where
125-
T: HttpSend,
126+
T: HttpSend + Send + Sync + 'static,
126127
{
127128
pub(crate) fn new(stream: HranaStream<T>, sql: String, want_rows: bool) -> crate::Result<Self> {
128129
// in SQLite when a multiple statements are glued together into one string, only the first one is
@@ -170,20 +171,20 @@ where
170171
pub(crate) async fn query_raw(
171172
&mut self,
172173
params: &Params,
173-
) -> crate::Result<HranaRows<T::Stream>> {
174+
) -> crate::Result<HranaRows<T::Stream, T>> {
174175
let mut stmt = self.inner.clone();
175176
bind_params(params.clone(), &mut stmt);
176177

177178
let cursor = self.stream.cursor(Batch::single(stmt)).await?;
178-
let rows = HranaRows::from_cursor(cursor).await?;
179+
let rows = HranaRows::from_cursor(cursor, self.stream.clone()).await?;
179180

180181
Ok(rows)
181182
}
182183
}
183184

184185
impl<T> Statement<T>
185186
where
186-
T: HttpSend,
187+
T: HttpSend + Send + Sync + 'static,
187188
<T as HttpSend>::Stream: Send + Sync + 'static,
188189
{
189190
pub async fn query(&mut self, params: &Params) -> crate::Result<super::Rows> {
@@ -192,28 +193,37 @@ where
192193
}
193194
}
194195

195-
pub struct HranaRows<S> {
196+
pub struct HranaRows<S, T: HttpSend> {
196197
cursor_step: OwnedCursorStep<S>,
197198
column_types: Option<Vec<ValueType>>,
199+
stream: HranaStream<T>,
198200
}
199201

200-
impl<S> HranaRows<S>
202+
impl<S, T> HranaRows<S, T>
201203
where
204+
T: HttpSend + Send + Sync + 'static,
202205
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
203206
{
204-
async fn from_cursor(cursor: Cursor<S>) -> Result<Self> {
207+
async fn from_cursor(cursor: Cursor<S>, stream: HranaStream<T>) -> Result<Self> {
205208
let cursor_step = cursor.next_step_owned().await?;
206209
Ok(HranaRows {
207210
cursor_step,
208211
column_types: None,
212+
stream,
209213
})
210214
}
211215

212216
pub async fn next(&mut self) -> crate::Result<Option<super::Row>> {
213217
let row = match self.cursor_step.next().await {
214218
Some(Ok(row)) => row,
215219
Some(Err(e)) => return Err(crate::Error::Hrana(Box::new(e))),
216-
None => return Ok(None),
220+
None => {
221+
self.stream
222+
.inner
223+
.affected_row_count
224+
.store(self.cursor_step.affected_rows().into(), Ordering::SeqCst);
225+
return Ok(None);
226+
}
217227
};
218228

219229
if self.column_types.is_none() {
@@ -254,17 +264,19 @@ where
254264
}
255265

256266
#[async_trait::async_trait]
257-
impl<S> RowsInner for HranaRows<S>
267+
impl<S, T> RowsInner for HranaRows<S, T>
258268
where
269+
T: HttpSend + Send + Sync + 'static,
259270
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin,
260271
{
261272
async fn next(&mut self) -> crate::Result<Option<super::Row>> {
262273
self.next().await
263274
}
264275
}
265276

266-
impl<S> ColumnsInner for HranaRows<S>
277+
impl<S, T> ColumnsInner for HranaRows<S, T>
267278
where
279+
T: HttpSend + Send + Sync + 'static,
268280
S: Stream<Item = std::io::Result<Bytes>> + Send + Sync + Unpin,
269281
{
270282
fn column_count(&self) -> i32 {

libsql/src/hrana/stream.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ use crate::hrana::cursor::{Cursor, CursorReq};
22
use crate::hrana::proto::{Batch, BatchResult, DescribeResult, Stmt, StmtResult};
33
use crate::hrana::{CursorResponseError, HranaError, HttpSend, Result};
44
use bytes::{Bytes, BytesMut};
5-
use futures::Stream;
5+
use futures::{Stream, StreamExt};
66
use libsql_hrana::proto::{
77
BatchStreamReq, CloseSqlStreamReq, CloseStreamReq, CloseStreamResp, DescribeStreamReq,
88
GetAutocommitStreamReq, PipelineReqBody, PipelineRespBody, SequenceStreamReq,
99
StoreSqlStreamReq, StreamRequest, StreamResponse, StreamResult,
1010
};
11+
use std::pin::pin;
1112
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
1213
use std::sync::Arc;
1314
use tokio::sync::Mutex;
@@ -30,7 +31,7 @@ pub struct HranaStream<T>
3031
where
3132
T: HttpSend,
3233
{
33-
inner: Arc<Inner<T>>,
34+
pub inner: Arc<Inner<T>>,
3435
}
3536

3637
impl<T> Clone for HranaStream<T>
@@ -94,7 +95,9 @@ where
9495
(0, 0)
9596
};
9697

97-
self.inner.total_changes.fetch_add(affected_row_count, Ordering::SeqCst);
98+
self.inner
99+
.total_changes
100+
.fetch_add(affected_row_count, Ordering::SeqCst);
98101
self.inner
99102
.affected_row_count
100103
.store(affected_row_count, Ordering::SeqCst);
@@ -279,11 +282,11 @@ where
279282
}
280283

281284
#[derive(Debug)]
282-
struct Inner<T>
285+
pub struct Inner<T>
283286
where
284287
T: HttpSend,
285288
{
286-
affected_row_count: AtomicU64,
289+
pub affected_row_count: AtomicU64,
287290
total_changes: AtomicU64,
288291
last_insert_rowid: AtomicI64,
289292
is_autocommit: AtomicBool,

0 commit comments

Comments
 (0)