Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion sqlx-core/src/any/connection/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::sql_str::SqlStr;
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::TryStreamExt;
use std::fmt::Debug;

pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
Expand Down Expand Up @@ -106,7 +107,19 @@ pub trait AnyConnectionBackend: std::any::Any + Debug + Send + 'static {
query: SqlStr,
persistent: bool,
arguments: Option<AnyArguments<'q>>,
) -> BoxFuture<'q, crate::Result<Option<AnyRow>>>;
) -> BoxFuture<'q, crate::Result<Option<AnyRow>>> {
let mut stream = self.fetch_many(query, persistent, arguments);

Box::pin(async move {
while let Some(result) = stream.try_next().await? {
if let Either::Right(row) = result {
return Ok(Some(row));
}
}

Ok(None)
})
}

fn prepare_with<'c, 'q: 'c>(
&'c mut self,
Expand Down
30 changes: 2 additions & 28 deletions sqlx-mysql/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
use either::Either;
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
use sqlx_core::any::{
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
Expand All @@ -17,7 +17,7 @@ use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::sql_str::SqlStr;
use sqlx_core::transaction::TransactionManager;
use std::{future, pin::pin};
use std::future;

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);

Expand Down Expand Up @@ -103,32 +103,6 @@ impl AnyConnectionBackend for MySqlConnection {
)
}

fn fetch_optional<'q>(
&'q mut self,
query: SqlStr,
persistent: bool,
arguments: Option<AnyArguments<'q>>,
) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
let persistent = persistent && arguments.is_some();
let arguments = arguments
.map(AnyArguments::convert_into)
.transpose()
.map_err(sqlx_core::Error::Encode);

Box::pin(async move {
let arguments = arguments?;
let mut stream = pin!(self.run(query, arguments, persistent).await?);

while let Some(result) = stream.try_next().await? {
if let Either::Right(row) = result {
return Ok(Some(AnyRow::try_from(&row)?));
}
}

Ok(None)
})
}

fn prepare_with<'c, 'q: 'c>(
&'c mut self,
sql: SqlStr,
Expand Down
28 changes: 2 additions & 26 deletions sqlx-postgres/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::{
};
use futures_core::future::BoxFuture;
use futures_core::stream::BoxStream;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt};
use sqlx_core::sql_str::SqlStr;
use std::{future, pin::pin};
use std::future;

use sqlx_core::any::{
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
Expand Down Expand Up @@ -105,30 +105,6 @@ impl AnyConnectionBackend for PgConnection {
)
}

fn fetch_optional<'q>(
&'q mut self,
query: SqlStr,
persistent: bool,
arguments: Option<AnyArguments<'q>>,
) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
let persistent = persistent && arguments.is_some();
let arguments = arguments
.map(AnyArguments::convert_into)
.transpose()
.map_err(sqlx_core::Error::Encode);

Box::pin(async move {
let arguments = arguments?;
let mut stream = pin!(self.run(query, arguments, persistent, None).await?);

if let Some(Either::Right(row)) = stream.try_next().await? {
return Ok(Some(AnyRow::try_from(&row)?));
}

Ok(None)
})
}

fn prepare_with<'c, 'q: 'c>(
&'c mut self,
sql: SqlStr,
Expand Down
59 changes: 32 additions & 27 deletions sqlx-sqlite/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use sqlx_core::database::Database;
use sqlx_core::describe::Describe;
use sqlx_core::executor::Executor;
use sqlx_core::transaction::TransactionManager;
use std::pin::pin;
use std::sync::Arc;

sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);
Expand Down Expand Up @@ -86,21 +85,7 @@ impl AnyConnectionBackend for SqliteConnection {
persistent: bool,
arguments: Option<AnyArguments<'q>>,
) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
let persistent = persistent && arguments.is_some();
let args = arguments.map(map_arguments);

Box::pin(
self.worker
.execute(query, args, self.row_channel_size, persistent, None)
.map_ok(flume::Receiver::into_stream)
.try_flatten_stream()
.map(
move |res: sqlx_core::Result<Either<SqliteQueryResult, SqliteRow>>| match res? {
Either::Left(result) => Ok(Either::Left(map_result(result))),
Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
},
),
)
self.fetch_with_limit(query, persistent, arguments, None)
}

fn fetch_optional<'q>(
Expand All @@ -109,19 +94,13 @@ impl AnyConnectionBackend for SqliteConnection {
persistent: bool,
arguments: Option<AnyArguments<'q>>,
) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
let persistent = persistent && arguments.is_some();
let args = arguments.map(map_arguments);
let mut stream = self.fetch_with_limit(query, persistent, arguments, Some(1));

Box::pin(async move {
let mut stream = pin!(
self.worker
.execute(query, args, self.row_channel_size, persistent, Some(1))
.map_ok(flume::Receiver::into_stream)
.await?
);

if let Some(Either::Right(row)) = stream.try_next().await? {
return Ok(Some(AnyRow::try_from(&row)?));
while let Some(result) = stream.try_next().await? {
if let Either::Right(row) = result {
return Ok(Some(row));
}
}

Ok(None)
Expand All @@ -145,6 +124,32 @@ impl AnyConnectionBackend for SqliteConnection {
}
}

impl SqliteConnection {
fn fetch_with_limit(
&mut self,
query: SqlStr,
persistent: bool,
arguments: Option<AnyArguments>,
limit: Option<usize>,
) -> BoxStream<'_, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
let persistent = persistent && arguments.is_some();
let args = arguments.map(map_arguments);

Box::pin(
self.worker
.execute(query, args, self.row_channel_size, persistent, limit)
.map_ok(flume::Receiver::into_stream)
.try_flatten_stream()
.map(
move |res: sqlx_core::Result<Either<SqliteQueryResult, SqliteRow>>| match res? {
Either::Left(result) => Ok(Either::Left(map_result(result))),
Either::Right(row) => Ok(Either::Right(AnyRow::try_from(&row)?)),
},
),
)
}
}

impl<'a> TryFrom<&'a SqliteTypeInfo> for AnyTypeInfo {
type Error = sqlx_core::Error;

Expand Down
Loading