Skip to content

Commit ebf9e0f

Browse files
authored
fix bad autocommit (#1542)
* add test * fix bug when updating autocommit status in write proxy * fmt
1 parent aaf42e3 commit ebf9e0f

File tree

2 files changed

+116
-18
lines changed

2 files changed

+116
-18
lines changed

libsql-server/src/connection/write_proxy.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -370,27 +370,30 @@ where
370370
program: Program,
371371
builder: B,
372372
) -> crate::Result<(B, TxnStatus, Option<FrameNo>)> {
373-
let mut txn_status = TxnStatus::Invalid;
373+
let txn_status = Arc::new(parking_lot::Mutex::new(TxnStatus::Invalid));
374374
let mut new_frame_no = None;
375375
let builder_config = self.builder_config.clone();
376-
let cb = move |response: exec_resp::Response, builder: &mut B| match response {
377-
exec_resp::Response::ProgramResp(resp) => {
378-
crate::rpc::streaming_exec::apply_program_resp_to_builder(
379-
&builder_config,
380-
builder,
381-
resp,
382-
|last_frame_no, is_autocommit| {
383-
txn_status = if is_autocommit {
384-
TxnStatus::Init
385-
} else {
386-
TxnStatus::Txn
387-
};
388-
new_frame_no = last_frame_no;
389-
},
390-
)
376+
let cb = {
377+
let txn_status = txn_status.clone();
378+
move |response: exec_resp::Response, builder: &mut B| match response {
379+
exec_resp::Response::ProgramResp(resp) => {
380+
crate::rpc::streaming_exec::apply_program_resp_to_builder(
381+
&builder_config,
382+
builder,
383+
resp,
384+
|last_frame_no, is_autocommit| {
385+
*txn_status.lock() = if is_autocommit {
386+
TxnStatus::Init
387+
} else {
388+
TxnStatus::Txn
389+
};
390+
new_frame_no = last_frame_no;
391+
},
392+
)
393+
}
394+
exec_resp::Response::DescribeResp(_) => Err(Error::PrimaryStreamMisuse),
395+
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
391396
}
392-
exec_resp::Response::DescribeResp(_) => Err(Error::PrimaryStreamMisuse),
393-
exec_resp::Response::Error(e) => Err(Error::RpcQueryError(e)),
394397
};
395398

396399
let builder = self
@@ -403,6 +406,7 @@ where
403406
)
404407
.await?;
405408

409+
let txn_status = *txn_status.lock();
406410
Ok((builder, txn_status, new_frame_no))
407411
}
408412

libsql-server/tests/cluster/replication.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,3 +237,97 @@ fn replica_lazy_creation() {
237237

238238
sim.run().unwrap();
239239
}
240+
241+
#[test]
242+
fn replica_interactive_transaction() {
243+
let mut sim = turmoil::Builder::new()
244+
.simulation_duration(Duration::from_secs(1000))
245+
.build();
246+
247+
let prim_tmp = tempfile::tempdir().unwrap();
248+
249+
sim.host("primary", {
250+
let prim_path = prim_tmp.path().to_path_buf();
251+
move || {
252+
let prim_path = prim_path.clone();
253+
async move {
254+
let primary = TestServer {
255+
path: prim_path.into(),
256+
db_config: DbConfig {
257+
max_log_size: 1,
258+
..Default::default()
259+
},
260+
rpc_server_config: Some(RpcServerConfig {
261+
acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(),
262+
tls_config: None,
263+
}),
264+
..Default::default()
265+
};
266+
267+
primary.start_sim(8080).await.unwrap();
268+
269+
Ok(())
270+
}
271+
}
272+
});
273+
274+
sim.host("replica", {
275+
move || async move {
276+
let tmp = tempfile::tempdir().unwrap();
277+
let replica = TestServer {
278+
path: tmp.path().to_path_buf().into(),
279+
db_config: DbConfig {
280+
max_log_size: 1,
281+
..Default::default()
282+
},
283+
rpc_client_config: Some(RpcClientConfig {
284+
remote_url: "http://primary:5050".into(),
285+
tls_config: None,
286+
connector: TurmoilConnector,
287+
}),
288+
..Default::default()
289+
};
290+
291+
replica.start_sim(8080).await.unwrap();
292+
293+
Ok(())
294+
}
295+
});
296+
297+
sim.client("client", async move {
298+
let db = Database::open_remote_with_connector("http://replica:8080", "", TurmoilConnector)
299+
.unwrap();
300+
let conn = db.connect().unwrap();
301+
302+
let tx = conn
303+
.transaction_with_behavior(libsql::TransactionBehavior::Immediate)
304+
.await
305+
.unwrap();
306+
307+
tx.execute("create table test (x)", ()).await.unwrap();
308+
tx.execute("insert into test values (12)", ())
309+
.await
310+
.unwrap();
311+
tx.execute("insert into test values (12)", ())
312+
.await
313+
.unwrap();
314+
tx.commit().await.unwrap();
315+
316+
let count = conn
317+
.query("select count(0) from test", ())
318+
.await
319+
.unwrap()
320+
.next()
321+
.await
322+
.unwrap()
323+
.unwrap()
324+
.get::<u32>(0)
325+
.unwrap();
326+
327+
assert_eq!(count, 2);
328+
329+
Ok(())
330+
});
331+
332+
sim.run().unwrap();
333+
}

0 commit comments

Comments
 (0)