Skip to content

Commit 3e62caf

Browse files
committed
fix #711
1 parent 51b0f23 commit 3e62caf

File tree

8 files changed

+155
-47
lines changed

8 files changed

+155
-47
lines changed

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
11
# CHANGELOG.md
22

3+
## 0.32.0 (unreleased)
4+
5+
- Rollback any open transactions when returning a connection to the pool.
6+
- Previously, if an error occurred in the middle of a transaction, the transaction would be left open, and the connection would be returned to the pool. The next request could get a connection with an open half-completed transaction, which could lead to hard to debug issues.
7+
- This allows safely using features that require a transaction, like
8+
- ```sql
9+
BEGIN;
10+
CREATE TEMPORARY TABLE t (x int) ON COMMIT DROP; -- postgres syntax
11+
-- do something with t
12+
-- previously, if an error occurred, the transaction would be left open, and the connection returned to the pool.
13+
-- the next request could get a connection where the table `t` still exists, leading to a new error.
14+
COMMIT;
15+
```
16+
- This will now automatically rollback the transaction, even if an error occurs in the middle of it.
17+
- Fix a bug where one additional SQL statement was executed after an error occurred in a SQL file. This could cause surprising unexpected behavior.
18+
- ```sql
19+
insert into t values ($invalid_value); -- if this statement fails, ...
20+
insert into t values (42); -- this next statement should not be executed
21+
```
22+
323
## 0.31.0 (2024-11-24)
424

525
### 🚀 **New Features**

Cargo.lock

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "sqlpage"
3-
version = "0.31.0"
3+
version = "0.32.0"
44
edition = "2021"
55
description = "Build data user interfaces entirely in SQL. A web server that takes .sql files and formats the query result using pre-made configurable professional-looking components."
66
keywords = ["web", "sql", "framework"]

src/webserver/database/connect.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{mem::take, time::Duration};
33
use super::Database;
44
use crate::{app_config::AppConfig, ON_CONNECT_FILE};
55
use anyhow::Context;
6+
use futures_util::future::BoxFuture;
67
use sqlx::{
78
any::{Any, AnyConnectOptions, AnyKind},
89
pool::PoolOptions,
@@ -93,12 +94,32 @@ impl Database {
9394
)
9495
.acquire_timeout(Duration::from_secs_f64(
9596
config.database_connection_acquire_timeout_seconds,
96-
));
97+
))
98+
.after_release(on_return_to_pool);
9799
pool_options = add_on_connection_handler(config, pool_options);
98100
pool_options
99101
}
100102
}
101103

104+
fn on_return_to_pool(
105+
conn: &mut sqlx::AnyConnection,
106+
meta: sqlx::pool::PoolConnectionMetadata,
107+
) -> BoxFuture<'_, Result<bool, sqlx::Error>> {
108+
Box::pin(async move {
109+
match conn.execute("ROLLBACK").await {
110+
Ok(r) => log::info!(
111+
"Rolled back a transaction that was left open before returning a connection to the pool. Result: {:?}",
112+
r
113+
),
114+
Err(e) => log::trace!(
115+
"Failed to rollback before returning a connection to the pool. There was probably no transaction left open: {e:?}"
116+
),
117+
}
118+
log::trace!("Releasing connection: {meta:#?}");
119+
Ok(true)
120+
})
121+
}
122+
102123
fn add_on_connection_handler(
103124
config: &AppConfig,
104125
pool_options: PoolOptions<Any>,

src/webserver/database/execute_queries.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,19 +89,38 @@ pub fn stream_query_results_with_conn<'a>(
8989
.map(|res| res.unwrap_or_else(DbItem::Error))
9090
}
9191

92+
/// Transforms a stream of database items to stop processing after encountering the first error.
93+
/// The error item itself is still emitted before stopping.
9294
pub fn stop_at_first_error(
9395
results_stream: impl Stream<Item = DbItem>,
9496
) -> impl Stream<Item = DbItem> {
95-
let mut has_error = false;
96-
results_stream.take_while(move |item| {
97-
// We stop the stream AFTER the first error, so that the error is still returned to the client, but the rest of the queries are not executed.
98-
let should_continue = !has_error;
99-
if let DbItem::Error(err) = item {
100-
log::error!("{err:?}");
101-
has_error = true;
97+
// We need a oneshot channel rather than a simple boolean flag because
98+
// take_while would poll the stream one extra time after the error,
99+
// while take_until stops immediately when the future completes
100+
let (error_tx, error_rx) = tokio::sync::oneshot::channel();
101+
let mut error_tx = Some(error_tx);
102+
103+
results_stream
104+
.inspect(move |item| {
105+
if let DbItem::Error(err) = item {
106+
log::error!("{err:?}");
107+
if let Some(tx) = error_tx.take() {
108+
let _ = tx.send(());
109+
}
110+
}
111+
})
112+
.take_until(error_rx)
113+
}
114+
115+
pub(crate) async fn rollback_transaction(db_connection: &mut DbConn) {
116+
if let Some(conn) = db_connection.as_mut() {
117+
match conn.execute("ROLLBACK").await {
118+
Ok(r) => log::debug!("Rolled back transaction with result: {:?}", r),
119+
Err(e) => log::debug!("Failed to rollback transaction: {e:?}"),
102120
}
103-
futures_util::future::ready(should_continue)
104-
})
121+
} else {
122+
log::debug!("No connection to rollback a transaction");
123+
}
105124
}
106125

107126
/// Executes the sqlpage pseudo-functions contained in a static simple select

src/webserver/http.rs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::render::{AnyRenderBodyContext, HeaderContext, PageContext};
22
use crate::webserver::content_security_policy::ContentSecurityPolicy;
3-
use crate::webserver::database::execute_queries::stop_at_first_error;
3+
use crate::webserver::database::execute_queries::{rollback_transaction, stop_at_first_error};
44
use crate::webserver::database::{execute_queries::stream_query_results_with_conn, DbItem};
55
use crate::webserver::http_request_info::extract_request_info;
66
use crate::webserver::ErrorWithStatus;
@@ -173,32 +173,35 @@ async fn render_sql(
173173
let database_entries_stream =
174174
stream_query_results_with_conn(&sql_file, &mut req_param, &mut conn);
175175
let database_entries_stream = stop_at_first_error(database_entries_stream);
176-
let response_with_writer = build_response_header_and_stream(
177-
Arc::clone(&app_state),
178-
database_entries_stream,
179-
request_context,
180-
)
181-
.await;
182-
match response_with_writer {
183-
Ok(ResponseWithWriter::RenderStream {
184-
http_response,
185-
renderer,
176+
{
177+
let response_with_writer = build_response_header_and_stream(
178+
Arc::clone(&app_state),
186179
database_entries_stream,
187-
}) => {
188-
resp_send
189-
.send(http_response)
190-
.unwrap_or_else(|e| log::error!("could not send headers {e:?}"));
191-
stream_response(database_entries_stream, renderer).await;
192-
}
193-
Ok(ResponseWithWriter::FinishedResponse { http_response }) => {
194-
resp_send
195-
.send(http_response)
196-
.unwrap_or_else(|e| log::error!("could not send headers {e:?}"));
197-
}
198-
Err(err) => {
199-
send_anyhow_error(&err, resp_send, app_state.config.environment);
180+
request_context,
181+
)
182+
.await;
183+
match response_with_writer {
184+
Ok(ResponseWithWriter::RenderStream {
185+
http_response,
186+
renderer,
187+
database_entries_stream,
188+
}) => {
189+
resp_send
190+
.send(http_response)
191+
.unwrap_or_else(|e| log::error!("could not send headers {e:?}"));
192+
stream_response(database_entries_stream, renderer).await;
193+
}
194+
Ok(ResponseWithWriter::FinishedResponse { http_response }) => {
195+
resp_send
196+
.send(http_response)
197+
.unwrap_or_else(|e| log::error!("could not send headers {e:?}"));
198+
}
199+
Err(err) => {
200+
send_anyhow_error(&err, resp_send, app_state.config.environment);
201+
}
200202
}
201203
}
204+
rollback_transaction(&mut conn).await;
202205
});
203206
resp_recv.await.map_err(ErrorInternalServerError)
204207
}

tests/failed_transaction.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
BEGIN;
2+
CREATE TEMPORARY TABLE t(f INTEGER NOT NULL);
3+
INSERT INTO t(f) VALUES ($x);
4+
COMMIT;
5+
6+
select 'text' as component, f as contents from t;

tests/index.rs

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,37 @@ async fn test_csv_upload() -> actix_web::Result<()> {
469469
Ok(())
470470
}
471471

472+
#[actix_web::test]
473+
async fn test_transaction_error() -> actix_web::Result<()> {
474+
// First, request the page without any parameter. It should fail because
475+
// of the not null constraint.
476+
// But then, when we request again with a parameter, we should not see any side
477+
// effect coming from the first transaction, and it should succeed
478+
let data = make_app_data().await;
479+
let req = get_request_to_with_data("/tests/failed_transaction.sql", data.clone())
480+
.await?
481+
.to_srv_request();
482+
let resp = main_handler(req).await?;
483+
let body = test::read_body(resp).await;
484+
let body_str = String::from_utf8(body.to_vec()).unwrap();
485+
assert!(
486+
body_str.contains("constraint failed"),
487+
"{body_str}\nexpected to contain: constraint failed"
488+
);
489+
// Now query again, with ?x=1447
490+
let req = get_request_to_with_data("/tests/failed_transaction.sql?x=1447", data)
491+
.await?
492+
.to_srv_request();
493+
let resp = main_handler(req).await?;
494+
let body = test::read_body(resp).await;
495+
let body_str = String::from_utf8(body.to_vec()).unwrap();
496+
assert!(
497+
body_str.contains("1447"),
498+
"{body_str}\nexpected to contain: 1447"
499+
);
500+
Ok(())
501+
}
502+
472503
#[actix_web::test]
473504
/// `/sqlpage/migrations/0001_init.sql` should return a 403 Forbidden
474505
async fn privileged_paths_are_not_accessible() {
@@ -561,8 +592,10 @@ async fn test_official_website_basic_auth_example() {
561592
);
562593
}
563594

564-
async fn get_request_to(path: &str) -> actix_web::Result<TestRequest> {
565-
let data = make_app_data().await;
595+
async fn get_request_to_with_data(
596+
path: &str,
597+
data: actix_web::web::Data<AppState>,
598+
) -> actix_web::Result<TestRequest> {
566599
Ok(test::TestRequest::get()
567600
.uri(path)
568601
.insert_header(ContentType::plaintext())
@@ -571,6 +604,11 @@ async fn get_request_to(path: &str) -> actix_web::Result<TestRequest> {
571604
.app_data(data))
572605
}
573606

607+
async fn get_request_to(path: &str) -> actix_web::Result<TestRequest> {
608+
let data = make_app_data().await;
609+
get_request_to_with_data(path, data).await
610+
}
611+
574612
async fn make_app_data_from_config(config: AppConfig) -> actix_web::web::Data<AppState> {
575613
let state = AppState::init(&config).await.unwrap();
576614
actix_web::web::Data::new(state)
@@ -614,6 +652,7 @@ pub fn test_config() -> AppConfig {
614652
serde_json::from_str::<AppConfig>(&format!(
615653
r#"{{
616654
"database_url": "{}",
655+
"max_database_pool_connections": 1,
617656
"database_connection_retries": 3,
618657
"database_connection_acquire_timeout_seconds": 15,
619658
"allow_exec": true,

0 commit comments

Comments
 (0)