Skip to content

Commit 286ecdb

Browse files
committed
Start on borrow overhaul
1 parent e0e8c45 commit 286ecdb

File tree

6 files changed

+99
-102
lines changed

6 files changed

+99
-102
lines changed

tokio-postgres/src/client.rs

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -173,50 +173,50 @@ impl Client {
173173
///
174174
/// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
175175
/// which are set when executed. Prepared statements can only be used with the connection that created them.
176-
pub fn prepare(&mut self, query: &str) -> impl Future<Output = Result<Statement, Error>> {
177-
self.prepare_typed(query, &[])
176+
pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
177+
self.prepare_typed(query, &[]).await
178178
}
179179

180180
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
181181
///
182182
/// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
183183
/// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
184-
pub fn prepare_typed(
185-
&mut self,
184+
pub async fn prepare_typed(
185+
&self,
186186
query: &str,
187187
parameter_types: &[Type],
188-
) -> impl Future<Output = Result<Statement, Error>> {
189-
prepare::prepare(self.inner(), query, parameter_types)
188+
) -> Result<Statement, Error> {
189+
prepare::prepare(&self.inner, query, parameter_types).await
190190
}
191191

192192
/// Executes a statement, returning a stream of the resulting rows.
193193
///
194194
/// # Panics
195195
///
196196
/// Panics if the number of parameters provided does not match the number expected.
197-
pub fn query(
198-
&mut self,
199-
statement: &Statement,
200-
params: &[&(dyn ToSql + Sync)],
201-
) -> impl Stream<Item = Result<Row, Error>> {
197+
pub fn query<'a>(
198+
&'a self,
199+
statement: &'a Statement,
200+
params: &'a [&'a (dyn ToSql + Sync)],
201+
) -> impl Stream<Item = Result<Row, Error>> + 'a {
202202
let buf = query::encode(statement, params.iter().map(|s| *s as _));
203-
query::query(self.inner(), statement.clone(), buf)
203+
query::query(&self.inner, statement, buf)
204204
}
205205

206206
/// Like [`query`], but takes an iterator of parameters rather than a slice.
207207
///
208208
/// [`query`]: #method.query
209209
pub fn query_iter<'a, I>(
210-
&mut self,
211-
statement: &Statement,
210+
&'a self,
211+
statement: &'a Statement,
212212
params: I,
213-
) -> impl Stream<Item = Result<Row, Error>>
213+
) -> impl Stream<Item = Result<Row, Error>> + 'a
214214
where
215-
I: IntoIterator<Item = &'a dyn ToSql>,
215+
I: IntoIterator<Item = &'a dyn ToSql> + 'a,
216216
I::IntoIter: ExactSizeIterator,
217217
{
218218
let buf = query::encode(statement, params);
219-
query::query(self.inner(), statement.clone(), buf)
219+
query::query(&self.inner, statement, buf)
220220
}
221221

222222
/// Executes a statement, returning the number of rows modified.
@@ -226,29 +226,29 @@ impl Client {
226226
/// # Panics
227227
///
228228
/// Panics if the number of parameters provided does not match the number expected.
229-
pub fn execute(
230-
&mut self,
229+
pub async fn execute(
230+
&self,
231231
statement: &Statement,
232232
params: &[&(dyn ToSql + Sync)],
233-
) -> impl Future<Output = Result<u64, Error>> {
233+
) -> Result<u64, Error> {
234234
let buf = query::encode(statement, params.iter().map(|s| *s as _));
235-
query::execute(self.inner(), buf)
235+
query::execute(&self.inner, buf).await
236236
}
237237

238238
/// Like [`execute`], but takes an iterator of parameters rather than a slice.
239239
///
240240
/// [`execute`]: #method.execute
241-
pub fn execute_iter<'a, I>(
242-
&mut self,
241+
pub async fn execute_iter<'a, I>(
242+
&self,
243243
statement: &Statement,
244244
params: I,
245-
) -> impl Future<Output = Result<u64, Error>>
245+
) -> Result<u64, Error>
246246
where
247247
I: IntoIterator<Item = &'a dyn ToSql>,
248248
I::IntoIter: ExactSizeIterator,
249249
{
250250
let buf = query::encode(statement, params);
251-
query::execute(self.inner(), buf)
251+
query::execute(&self.inner, buf).await
252252
}
253253

254254
/// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
@@ -260,7 +260,7 @@ impl Client {
260260
///
261261
/// Panics if the number of parameters provided does not match the number expected.
262262
pub fn copy_in<S>(
263-
&mut self,
263+
&self,
264264
statement: &Statement,
265265
params: &[&(dyn ToSql + Sync)],
266266
stream: S,
@@ -281,7 +281,7 @@ impl Client {
281281
///
282282
/// Panics if the number of parameters provided does not match the number expected.
283283
pub fn copy_out(
284-
&mut self,
284+
&self,
285285
statement: &Statement,
286286
params: &[&(dyn ToSql + Sync)],
287287
) -> impl Stream<Item = Result<Bytes, Error>> {
@@ -303,7 +303,7 @@ impl Client {
303303
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
304304
/// them to this method!
305305
pub fn simple_query(
306-
&mut self,
306+
&self,
307307
query: &str,
308308
) -> impl Stream<Item = Result<SimpleQueryMessage, Error>> {
309309
simple_query::simple_query(self.inner(), query)
@@ -319,7 +319,7 @@ impl Client {
319319
/// Prepared statements should be use for any query which contains user-specified data, as they provided the
320320
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
321321
/// them to this method!
322-
pub fn batch_execute(&mut self, query: &str) -> impl Future<Output = Result<(), Error>> {
322+
pub fn batch_execute(&self, query: &str) -> impl Future<Output = Result<(), Error>> {
323323
simple_query::batch_execute(self.inner(), query)
324324
}
325325

@@ -338,7 +338,7 @@ impl Client {
338338
///
339339
/// Requires the `runtime` Cargo feature (enabled by default).
340340
#[cfg(feature = "runtime")]
341-
pub fn cancel_query<T>(&mut self, tls: T) -> impl Future<Output = Result<(), Error>>
341+
pub fn cancel_query<T>(&self, tls: T) -> impl Future<Output = Result<(), Error>>
342342
where
343343
T: MakeTlsConnect<Socket>,
344344
{
@@ -354,7 +354,7 @@ impl Client {
354354
/// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
355355
/// connection itself.
356356
pub fn cancel_query_raw<S, T>(
357-
&mut self,
357+
&self,
358358
stream: S,
359359
tls: T,
360360
) -> impl Future<Output = Result<(), Error>>

tokio-postgres/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@
7979
//! use std::future::Future;
8080
//! use tokio_postgres::{Client, Error, Statement};
8181
//!
82-
//! fn pipelined_prepare(
83-
//! client: &mut Client,
84-
//! ) -> impl Future<Output = Result<(Statement, Statement), Error>>
82+
//! async fn pipelined_prepare(
83+
//! client: &Client,
84+
//! ) -> Result<(Statement, Statement), Error>
8585
//! {
8686
//! future::try_join(
8787
//! client.prepare("SELECT * FROM foo"),
8888
//! client.prepare("INSERT INTO bar (id, name) VALUES ($1, $2)")
89-
//! )
89+
//! ).await
9090
//! }
9191
//! ```
9292
//!

tokio-postgres/src/prepare.rs

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,14 @@ ORDER BY attnum
5757

5858
static NEXT_ID: AtomicUsize = AtomicUsize::new(0);
5959

60-
pub fn prepare(
61-
client: Arc<InnerClient>,
60+
pub async fn prepare(
61+
client: &Arc<InnerClient>,
6262
query: &str,
6363
types: &[Type],
64-
) -> impl Future<Output = Result<Statement, Error>> + 'static {
64+
) -> Result<Statement, Error> {
6565
let name = format!("s{}", NEXT_ID.fetch_add(1, Ordering::SeqCst));
6666
let buf = encode(&name, query, types);
6767

68-
async move {
6968
let buf = buf?;
7069
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
7170

@@ -103,14 +102,13 @@ pub fn prepare(
103102
}
104103

105104
Ok(Statement::new(&client, name, parameters, columns))
106-
}
107105
}
108106

109-
fn prepare_rec(
110-
client: Arc<InnerClient>,
111-
query: &str,
112-
types: &[Type],
113-
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'static + Send>> {
107+
fn prepare_rec<'a>(
108+
client: &'a Arc<InnerClient>,
109+
query: &'a str,
110+
types: &'a [Type],
111+
) -> Pin<Box<dyn Future<Output = Result<Statement, Error>> + 'a + Send>> {
114112
Box::pin(prepare(client, query, types))
115113
}
116114

@@ -135,7 +133,7 @@ async fn get_type(client: &Arc<InnerClient>, oid: Oid) -> Result<Type, Error> {
135133
let stmt = typeinfo_statement(client).await?;
136134

137135
let buf = query::encode(&stmt, (&[&oid as &dyn ToSql]).iter().cloned());
138-
let rows = query::query(client.clone(), stmt, buf);
136+
let rows = query::query(client, &stmt, buf);
139137
pin_mut!(rows);
140138

141139
let row = match rows.try_next().await? {
@@ -190,10 +188,10 @@ async fn typeinfo_statement(client: &Arc<InnerClient>) -> Result<Statement, Erro
190188
return Ok(stmt);
191189
}
192190

193-
let stmt = match prepare_rec(client.clone(), TYPEINFO_QUERY, &[]).await {
191+
let stmt = match prepare_rec(client, TYPEINFO_QUERY, &[]).await {
194192
Ok(stmt) => stmt,
195193
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_TABLE) => {
196-
prepare_rec(client.clone(), TYPEINFO_FALLBACK_QUERY, &[]).await?
194+
prepare_rec(client, TYPEINFO_FALLBACK_QUERY, &[]).await?
197195
}
198196
Err(e) => return Err(e),
199197
};
@@ -206,7 +204,7 @@ async fn get_enum_variants(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec<St
206204
let stmt = typeinfo_enum_statement(client).await?;
207205

208206
let buf = query::encode(&stmt, (&[&oid as &dyn ToSql]).iter().cloned());
209-
query::query(client.clone(), stmt, buf)
207+
query::query(client, &stmt, buf)
210208
.and_then(|row| future::ready(row.try_get(0)))
211209
.try_collect()
212210
.await
@@ -217,10 +215,10 @@ async fn typeinfo_enum_statement(client: &Arc<InnerClient>) -> Result<Statement,
217215
return Ok(stmt);
218216
}
219217

220-
let stmt = match prepare_rec(client.clone(), TYPEINFO_ENUM_QUERY, &[]).await {
218+
let stmt = match prepare_rec(client, TYPEINFO_ENUM_QUERY, &[]).await {
221219
Ok(stmt) => stmt,
222220
Err(ref e) if e.code() == Some(&SqlState::UNDEFINED_COLUMN) => {
223-
prepare_rec(client.clone(), TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await?
221+
prepare_rec(client, TYPEINFO_ENUM_FALLBACK_QUERY, &[]).await?
224222
}
225223
Err(e) => return Err(e),
226224
};
@@ -233,7 +231,7 @@ async fn get_composite_fields(client: &Arc<InnerClient>, oid: Oid) -> Result<Vec
233231
let stmt = typeinfo_composite_statement(client).await?;
234232

235233
let buf = query::encode(&stmt, (&[&oid as &dyn ToSql]).iter().cloned());
236-
let rows = query::query(client.clone(), stmt, buf)
234+
let rows = query::query(client, &stmt, buf)
237235
.try_collect::<Vec<_>>()
238236
.await?;
239237

@@ -253,7 +251,7 @@ async fn typeinfo_composite_statement(client: &Arc<InnerClient>) -> Result<State
253251
return Ok(stmt);
254252
}
255253

256-
let stmt = prepare_rec(client.clone(), TYPEINFO_COMPOSITE_QUERY, &[]).await?;
254+
let stmt = prepare_rec(client, TYPEINFO_COMPOSITE_QUERY, &[]).await?;
257255

258256
client.set_typeinfo_composite(&stmt);
259257
Ok(stmt)

tokio-postgres/src/query.rs

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,16 @@ use std::pin::Pin;
1010
use std::sync::Arc;
1111
use std::task::{Context, Poll};
1212

13-
pub fn query(
14-
client: Arc<InnerClient>,
15-
statement: Statement,
13+
pub fn query<'a>(
14+
client: &'a Arc<InnerClient>,
15+
statement: &'a Statement,
1616
buf: Result<Vec<u8>, Error>,
17-
) -> impl Stream<Item = Result<Row, Error>> {
18-
start(client, buf)
19-
.map_ok(|responses| Query {
20-
statement,
21-
responses,
22-
})
23-
.try_flatten_stream()
17+
) -> impl Stream<Item = Result<Row, Error>> + 'a {
18+
let f = async move {
19+
let responses = start(client, buf).await?;
20+
Ok(Query { statement: statement.clone(), responses })
21+
};
22+
f.try_flatten_stream()
2423
}
2524

2625
pub fn query_portal(
@@ -44,7 +43,7 @@ pub fn query_portal(
4443
start.try_flatten_stream()
4544
}
4645

47-
pub async fn execute(client: Arc<InnerClient>, buf: Result<Vec<u8>, Error>) -> Result<u64, Error> {
46+
pub async fn execute(client: &InnerClient, buf: Result<Vec<u8>, Error>) -> Result<u64, Error> {
4847
let mut responses = start(client, buf).await?;
4948

5049
loop {
@@ -67,7 +66,7 @@ pub async fn execute(client: Arc<InnerClient>, buf: Result<Vec<u8>, Error>) -> R
6766
}
6867
}
6968

70-
async fn start(client: Arc<InnerClient>, buf: Result<Vec<u8>, Error>) -> Result<Responses, Error> {
69+
async fn start(client: &InnerClient, buf: Result<Vec<u8>, Error>) -> Result<Responses, Error> {
7170
let buf = buf?;
7271
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
7372

0 commit comments

Comments
 (0)