Skip to content

Commit fadd138

Browse files
committed
feat: add columns information for Hrana Statements
1 parent e3904f9 commit fadd138

File tree

7 files changed

+50
-20
lines changed

7 files changed

+50
-20
lines changed

libsql-server/tests/standalone/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,15 +99,15 @@ fn basic_metrics() {
9999

100100
let snapshot = snapshot_metrics();
101101
snapshot.assert_counter("libsql_server_libsql_execute_program", 3);
102-
snapshot.assert_counter("libsql_server_user_http_response", 3);
102+
snapshot.assert_counter("libsql_server_user_http_response", 4);
103103

104104
for (key, (_, _, val)) in snapshot.snapshot() {
105105
if key.kind() == metrics_util::MetricKind::Counter
106106
&& key.key().name() == "libsql_client_version"
107107
{
108108
let label = key.key().labels().next().unwrap();
109109
assert!(label.value().starts_with("libsql-remote-"));
110-
assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(3));
110+
assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(4));
111111
}
112112
}
113113

libsql/src/hrana/connection.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ where
8080
)
8181
}
8282

83-
pub fn prepare(&self, sql: &str) -> crate::Result<Statement<T>> {
83+
pub async fn prepare(&self, sql: &str) -> crate::Result<Statement<T>> {
8484
let stream = self.current_stream().clone();
85-
Statement::new(stream, sql.to_string(), true)
85+
Statement::new(stream, sql.to_string(), true).await
8686
}
8787
}
8888

libsql/src/hrana/hyper.rs

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl Conn for HttpConnection<HttpSender> {
131131

132132
async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
133133
let stream = self.current_stream().clone();
134-
let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
134+
let stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
135135
Ok(Statement {
136136
inner: Box::new(stmt),
137137
})
@@ -241,7 +241,16 @@ impl crate::statement::Stmt for crate::hrana::Statement<HttpSender> {
241241
// 2. Even if we do execute query, Hrana doesn't return all info that Column exposes.
242242
// 3. Even if we would like to return some of the column info ie. column [ValueType], this information is not
243243
// present in Hrana [Col] but rather inferred from the row cell type.
244-
vec![]
244+
self.cols
245+
.iter()
246+
.map(|name| crate::Column {
247+
name,
248+
origin_name: None,
249+
table_name: None,
250+
database_name: None,
251+
decl_type: None,
252+
})
253+
.collect()
245254
}
246255
}
247256

@@ -350,7 +359,7 @@ impl Conn for HranaStream<HttpSender> {
350359
}
351360

352361
async fn prepare(&self, sql: &str) -> crate::Result<Statement> {
353-
let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true)?;
362+
let stmt = crate::hrana::Statement::new(self.clone(), sql.to_string(), true).await?;
354363
Ok(Statement {
355364
inner: Box::new(stmt),
356365
})

libsql/src/hrana/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,13 +119,22 @@ where
119119
stream: HranaStream<T>,
120120
close_stream: bool,
121121
inner: Stmt,
122+
cols: Vec<String>,
122123
}
123124

124125
impl<T> Statement<T>
125126
where
126127
T: HttpSend + Send + Sync + 'static,
127128
{
128-
pub(crate) fn new(stream: HranaStream<T>, sql: String, want_rows: bool) -> crate::Result<Self> {
129+
pub(crate) async fn new(
130+
stream: HranaStream<T>,
131+
sql: String,
132+
want_rows: bool,
133+
) -> crate::Result<Self> {
134+
let desc = stream.describe(&sql).await?;
135+
136+
let cols: Vec<_> = desc.cols.into_iter().map(|col| col.name).collect();
137+
129138
// in SQLite when a multiple statements are glued together into one string, only the first one is
130139
// executed and then a handle to continue execution is returned. However Hrana API doesn't allow
131140
// passing multi-statement strings, so we just pick first one.
@@ -147,6 +156,7 @@ where
147156
stream,
148157
close_stream,
149158
inner,
159+
cols,
150160
})
151161
}
152162
}

libsql/src/replication/connection.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
// TODO(lucio): Move this to `remote/mod.rs`
22

3-
use std::time::Duration;
4-
use std::str::FromStr;
5-
use std::sync::Arc;
6-
use std::sync::atomic::AtomicU64;
73
use libsql_replication::rpc::proxy::{
84
describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond,
95
OkCond, Positional, Query, ResultRows, State as RemoteState, Step,
106
};
117
use parking_lot::Mutex;
8+
use std::str::FromStr;
9+
use std::sync::atomic::AtomicU64;
10+
use std::sync::Arc;
11+
use std::time::Duration;
1212

1313
use crate::parser;
1414
use crate::parser::StmtKind;
@@ -168,7 +168,11 @@ impl From<RemoteState> for State {
168168
}
169169

170170
impl RemoteConnection {
171-
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>, max_write_replication_index: Arc<AtomicU64>) -> Self {
171+
pub(crate) fn new(
172+
local: LibsqlConnection,
173+
writer: Option<Writer>,
174+
max_write_replication_index: Arc<AtomicU64>,
175+
) -> Self {
172176
let state = Arc::new(Mutex::new(Inner::default()));
173177
Self {
174178
local,
@@ -180,9 +184,16 @@ impl RemoteConnection {
180184

181185
fn update_max_write_replication_index(&self, index: Option<u64>) {
182186
if let Some(index) = index {
183-
let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst);
187+
let mut current = self
188+
.max_write_replication_index
189+
.load(std::sync::atomic::Ordering::SeqCst);
184190
while index > current {
185-
match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) {
191+
match self.max_write_replication_index.compare_exchange(
192+
current,
193+
index,
194+
std::sync::atomic::Ordering::SeqCst,
195+
std::sync::atomic::Ordering::SeqCst,
196+
) {
186197
Ok(_) => break,
187198
Err(new_current) => current = new_current,
188199
}

libsql/src/sync/connection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ impl Conn for SyncedConnection {
128128
})
129129
} else {
130130
let stmt = Statement {
131-
inner: Box::new(self.remote.prepare(sql)?),
131+
inner: Box::new(self.remote.prepare(sql).await?),
132132
};
133133

134134
if self.read_your_writes {

libsql/src/wasm/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ where
7474
self.conn.current_stream().clone(),
7575
sql.to_string(),
7676
true,
77-
)?;
77+
).await?;
7878
let rows = stmt.execute(&params.into_params()?).await?;
7979
Ok(rows as u64)
8080
}
@@ -104,7 +104,7 @@ where
104104
self.conn.current_stream().clone(),
105105
sql.to_string(),
106106
true,
107-
)?;
107+
).await?;
108108
let rows = stmt.query_raw(&params.into_params()?).await?;
109109
Ok(Rows {
110110
inner: Box::new(rows),
@@ -139,7 +139,7 @@ where
139139
pub async fn query(&self, sql: &str, params: impl IntoParams) -> crate::Result<Rows> {
140140
tracing::trace!("querying `{}`", sql);
141141
let stream = self.inner.stream().clone();
142-
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
142+
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
143143
let rows = stmt.query_raw(&params.into_params()?).await?;
144144
Ok(Rows {
145145
inner: Box::new(rows),
@@ -149,7 +149,7 @@ where
149149
pub async fn execute(&self, sql: &str, params: impl IntoParams) -> crate::Result<u64> {
150150
tracing::trace!("executing `{}`", sql);
151151
let stream = self.inner.stream().clone();
152-
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true)?;
152+
let mut stmt = crate::hrana::Statement::new(stream, sql.to_string(), true).await?;
153153
let rows = stmt.execute(&params.into_params()?).await?;
154154
Ok(rows as u64)
155155
}

0 commit comments

Comments
 (0)