Skip to content

Commit b4bf863

Browse files
authored
Fix invalid state in transaction (#2009)
Fix #2004. The gRPC code gets the `Status` information from the server to know if a transaction was successful. Now, every time we try to run a statement, the `is_autocommit` is run on the remote connection to know if we are in a `State::Txn` or not. After that, the code is basically a copy of `crate::replication::connection::should_execute_local` with the addition that when matching if the code should execute locally, the `state` is set to the `predicted_end_state`. This step was skipped by the old implementation because after a remote statement statement executed, the end state came from the server as mentioned before. `state` is mostly concerned to know if the server is in a transaction and that we should keep sending remote queries until a `COMMIT` or `ROLLBACK`. The match statement _could_ be refactored to reflect this more clearly.
2 parents 0ef879a + 3867a2d commit b4bf863

File tree

1 file changed

+61
-1
lines changed

1 file changed

+61
-1
lines changed

libsql/src/sync/connection.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::{
33
hrana::{connection::HttpConnection, hyper::HttpSender},
44
local::{self, impls::LibsqlStmt},
55
params::Params,
6+
parser,
67
replication::connection::State,
78
sync::SyncContext,
89
BatchRows, Error, Result, Statement, Transaction, TransactionBehavior,
@@ -33,7 +34,66 @@ impl SyncedConnection {
3334

3435
let mut state = self.state.lock().await;
3536

36-
crate::replication::connection::should_execute_local(&mut state, stmts.as_slice())
37+
if !self.remote.is_autocommit() {
38+
*state = State::Txn;
39+
}
40+
41+
{
42+
let predicted_end_state = {
43+
let mut state = state.clone();
44+
45+
stmts.iter().for_each(|parser::Statement { kind, .. }| {
46+
state = state.step(*kind);
47+
});
48+
49+
state
50+
};
51+
52+
let should_execute_local = match (*state, predicted_end_state) {
53+
(State::Init, State::Init) => stmts.iter().all(parser::Statement::is_read_only),
54+
55+
(State::Init, State::TxnReadOnly) | (State::TxnReadOnly, State::TxnReadOnly) => {
56+
let is_read_only = stmts.iter().all(parser::Statement::is_read_only);
57+
58+
if !is_read_only {
59+
return Err(Error::Misuse(
60+
"Invalid write in a readonly transaction".into(),
61+
));
62+
}
63+
64+
*state = State::TxnReadOnly;
65+
true
66+
}
67+
68+
(State::TxnReadOnly, State::Init) => {
69+
let is_read_only = stmts.iter().all(parser::Statement::is_read_only);
70+
71+
if !is_read_only {
72+
return Err(Error::Misuse(
73+
"Invalid write in a readonly transaction".into(),
74+
));
75+
}
76+
77+
*state = State::Init;
78+
true
79+
}
80+
81+
(init, State::Invalid) => {
82+
let err = Err(Error::InvalidParserState(format!("{:?}", init)));
83+
84+
// Reset state always back to init so the user can start over
85+
*state = State::Init;
86+
87+
return err;
88+
}
89+
_ => {
90+
*state = predicted_end_state;
91+
false
92+
},
93+
};
94+
95+
Ok(should_execute_local)
96+
}
3797
}
3898
}
3999

0 commit comments

Comments
 (0)