Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a24cb10
Synchronous version of Transaction Client. Async functions wrapped wi…
Jan 16, 2026
fda1e85
sync transaction and snapshot, tests, runtime handling for sync_client
Jan 16, 2026
9d2f1a1
Add comments and sign-off-by message
odecode Jan 16, 2026
83498cf
change .expect to ? operator for better error handling
odecode Jan 17, 2026
d720774
add documentation to sync_client public API
odecode Jan 17, 2026
2fad975
add better documentation about snapshot
odecode Jan 17, 2026
a6403de
remove trailing whitespace
odecode Jan 18, 2026
63c94d1
change tokio runtime method, add scan feature tests
odecode Jan 18, 2026
55bf7a3
init_sync change according to pr comment, cargo fmt
odecode Jan 19, 2026
9235466
multi thread scheduler for new_with_config
odecode Jan 20, 2026
cc0f119
cleanup docstrings
odecode Jan 20, 2026
b5b389c
rename test cases sync_txn -> txn_sync
odecode Jan 20, 2026
00dc279
change init_sync according to review suggestion and add comment about…
odecode Jan 20, 2026
b3ea3fa
remove trailing whitespace
odecode Jan 20, 2026
920f004
tests for sync_transaction wrapper functions
odecode Jan 20, 2026
2fa5375
Merge branch 'master' into sync-api-txclient
odecode Jan 20, 2026
b5fe3b1
code review fixes
odecode Jan 28, 2026
4807831
remove trailing whitespace
odecode Jan 28, 2026
d07e80f
Merge branch 'master' into sync-api-txclient
odecode Jan 28, 2026
2f6c85f
fix compilation error
odecode Jan 28, 2026
d625c21
error handling to sync_snapshot and sync_transaction to prevent panics
odecode Jan 28, 2026
7154265
eliminate duplicate error message
odecode Feb 15, 2026
e3f5a09
Merge branch 'master' into sync-api-txclient
pingyu Feb 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ pub use crate::transaction::Client as TransactionClient;
#[doc(inline)]
pub use crate::transaction::Snapshot;
#[doc(inline)]
pub use crate::transaction::SyncSnapshot;
#[doc(inline)]
pub use crate::transaction::SyncTransaction;
#[doc(inline)]
pub use crate::transaction::SyncTransactionClient;
#[doc(inline)]
pub use crate::transaction::Transaction;
#[doc(inline)]
pub use crate::transaction::TransactionOptions;
6 changes: 6 additions & 0 deletions src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub use client::Client;
pub(crate) use lock::resolve_locks;
pub(crate) use lock::HasLocks;
pub use snapshot::Snapshot;
pub use sync_client::SyncTransactionClient;
pub use sync_snapshot::SyncSnapshot;
pub use sync_transaction::SyncTransaction;
pub use transaction::CheckLevel;
#[doc(hidden)]
pub use transaction::HeartbeatOption;
Expand All @@ -28,5 +31,8 @@ pub use lock::LockResolver;
pub use lock::ResolveLocksContext;
pub use lock::ResolveLocksOptions;
mod snapshot;
mod sync_client;
mod sync_snapshot;
mod sync_transaction;
#[allow(clippy::module_inception)]
mod transaction;
253 changes: 253 additions & 0 deletions src/transaction/sync_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
use crate::{
request::plan::CleanupLocksResult,
transaction::{
client::Client, sync_snapshot::SyncSnapshot, sync_transaction::SyncTransaction,
ResolveLocksOptions,
},
BoundRange, Config, Result, Timestamp, TransactionOptions,
};
use std::sync::Arc;

/// Synchronous TiKV transactional client.
///
/// This is a synchronous wrapper around the async [`TransactionClient`](crate::TransactionClient).
/// All methods block the current thread until completion.
///
/// For async operations, use [`TransactionClient`](crate::TransactionClient) instead.
Comment thread
odecode marked this conversation as resolved.
pub struct SyncTransactionClient {
client: Client,
runtime: Arc<tokio::runtime::Runtime>,
}
Comment thread
odecode marked this conversation as resolved.

impl SyncTransactionClient {
/// Create a synchronous transactional [`SyncTransactionClient`] and connect to the TiKV cluster.
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
Comment thread
odecode marked this conversation as resolved.
Outdated
/// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
/// (include all endpoints, if possible), this helps avoid having a single point of failure.
///
/// This is a synchronous version of [`TransactionClient::new`](crate::TransactionClient::new).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::SyncTransactionClient;
/// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap();
/// ```
pub fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Self> {
Self::new_with_config(pd_endpoints, Config::default())
}

/// Create a synchronous transactional [`SyncTransactionClient`] with a custom configuration.
///
/// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
/// PD must be provided, not the TiKV nodes. It's important to include more than one PD endpoint
/// (include all endpoints, if possible), this helps avoid having a single point of failure.
///
/// This is a synchronous version of [`TransactionClient::new_with_config`](crate::TransactionClient::new_with_config).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::{Config, SyncTransactionClient};
/// # use std::time::Duration;
/// let client = SyncTransactionClient::new_with_config(
/// vec!["192.168.0.100"],
/// Config::default().with_timeout(Duration::from_secs(60)),
/// )
/// .unwrap();
/// ```
pub fn new_with_config<S: Into<String>>(pd_endpoints: Vec<S>, config: Config) -> Result<Self> {
let runtime =
Arc::new(tokio::runtime::Runtime::new()?);
Comment thread
odecode marked this conversation as resolved.
Outdated
let client = runtime.block_on(Client::new_with_config(pd_endpoints, config))?;
Ok(Self { client, runtime })
}

/// Creates a new optimistic [`SyncTransaction`].
///
/// Use the transaction to issue requests like [`get`](SyncTransaction::get) or
/// [`put`](SyncTransaction::put).
///
/// Write operations do not lock data in TiKV, thus the commit request may fail due to a write
/// conflict.
///
/// This is a synchronous version of [`TransactionClient::begin_optimistic`](crate::TransactionClient::begin_optimistic).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::SyncTransactionClient;
/// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap();
/// let mut transaction = client.begin_optimistic().unwrap();
/// // ... Issue some commands.
/// transaction.commit().unwrap();
/// ```
pub fn begin_optimistic(&self) -> Result<SyncTransaction> {
let inner = self.runtime.block_on(self.client.begin_optimistic())?;
Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime)))
}

/// Creates a new pessimistic [`SyncTransaction`].
///
/// Write operations will lock the data until committed, thus commit requests should not suffer
/// from write conflicts.
///
/// This is a synchronous version of [`TransactionClient::begin_pessimistic`](crate::TransactionClient::begin_pessimistic).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::SyncTransactionClient;
/// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap();
/// let mut transaction = client.begin_pessimistic().unwrap();
/// // ... Issue some commands.
/// transaction.commit().unwrap();
/// ```
pub fn begin_pessimistic(&self) -> Result<SyncTransaction> {
let inner = self.runtime.block_on(self.client.begin_pessimistic())?;
Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime)))
}

/// Create a new customized [`SyncTransaction`].
///
/// This is a synchronous version of [`TransactionClient::begin_with_options`](crate::TransactionClient::begin_with_options).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::{SyncTransactionClient, TransactionOptions};
/// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap();
/// let mut transaction = client
/// .begin_with_options(TransactionOptions::default().use_async_commit())
/// .unwrap();
/// // ... Issue some commands.
/// transaction.commit().unwrap();
/// ```
pub fn begin_with_options(&self, options: TransactionOptions) -> Result<SyncTransaction> {
let inner = self
.runtime
.block_on(self.client.begin_with_options(options))?;
Ok(SyncTransaction::new(inner, Arc::clone(&self.runtime)))
}

/// Create a new read-only [`SyncSnapshot`] at the given [`Timestamp`].
///
/// A snapshot is a read-only transaction that reads data as if the snapshot was taken at the
/// specified timestamp. It can read operations that happened before the timestamp, but ignores
/// operations after the timestamp.
///
/// Use snapshots when you need:
/// - Consistent reads across multiple operations without starting a full transaction
/// - Point-in-time reads at a specific timestamp
/// - Read-only access without the overhead of transaction tracking
///
/// Unlike transactions, snapshots cannot perform write operations (put, delete, etc.).
///
/// This is a synchronous version of [`TransactionClient::snapshot`](crate::TransactionClient::snapshot).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::{SyncTransactionClient, TransactionOptions};
/// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap();
/// let timestamp = client.current_timestamp().unwrap();
/// let mut snapshot = client.snapshot(timestamp, TransactionOptions::default());
///
Comment thread
odecode marked this conversation as resolved.
Outdated
/// // Read data as it existed at the snapshot timestamp
/// let value = snapshot.get("key".to_owned()).unwrap();
/// ```
pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> SyncSnapshot {
let inner = self.client.snapshot(timestamp, options);
SyncSnapshot::new(inner, Arc::clone(&self.runtime))
}
Comment thread
odecode marked this conversation as resolved.

/// Retrieve the current [`Timestamp`].
///
/// This is a synchronous version of [`TransactionClient::current_timestamp`](crate::TransactionClient::current_timestamp).
///
/// # Examples
///
/// ```rust,no_run
/// # use tikv_client::SyncTransactionClient;
/// let client = SyncTransactionClient::new(vec!["192.168.0.100"]).unwrap();
/// let timestamp = client.current_timestamp().unwrap();
/// ```
pub fn current_timestamp(&self) -> Result<Timestamp> {
self.runtime.block_on(self.client.current_timestamp())
}

/// Request garbage collection (GC) of the TiKV cluster.
///
/// GC deletes MVCC records whose timestamp is lower than the given `safepoint`. We must guarantee
/// that all transactions started before this timestamp had committed. We can keep an active
/// transaction list in application to decide which is the minimal start timestamp of them.
///
/// For each key, the last mutation record (unless it's a deletion) before `safepoint` is retained.
///
/// GC is performed by:
/// 1. resolving all locks with timestamp <= `safepoint`
/// 2. updating PD's known safepoint
///
/// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview).
/// We skip the second step "delete ranges" which is an optimization for TiDB.
///
/// This is a synchronous version of [`TransactionClient::gc`](crate::TransactionClient::gc).
pub fn gc(&self, safepoint: Timestamp) -> Result<bool> {
self.runtime.block_on(self.client.gc(safepoint))
}

/// Clean up all locks in the specified range.
///
/// This is a synchronous version of [`TransactionClient::cleanup_locks`](crate::TransactionClient::cleanup_locks).
pub fn cleanup_locks(
&self,
range: impl Into<BoundRange>,
safepoint: &Timestamp,
options: ResolveLocksOptions,
) -> Result<CleanupLocksResult> {
self.runtime
.block_on(self.client.cleanup_locks(range, safepoint, options))
}

/// Cleans up all keys in a range and quickly reclaim disk space.
///
/// The range can span over multiple regions.
///
/// Note that the request will directly delete data from RocksDB, and all MVCC will be erased.
///
/// This interface is intended for special scenarios that resemble operations like "drop table" or "drop database" in TiDB.
///
/// This is a synchronous version of [`TransactionClient::unsafe_destroy_range`](crate::TransactionClient::unsafe_destroy_range).
pub fn unsafe_destroy_range(&self, range: impl Into<BoundRange>) -> Result<()> {
self.runtime
.block_on(self.client.unsafe_destroy_range(range))
}

/// Scan all locks in the specified range.
///
/// This is only available for integration tests.
///
/// Note: `batch_size` must be >= expected number of locks.
///
/// This is a synchronous version of [`TransactionClient::scan_locks`](crate::TransactionClient::scan_locks).
#[cfg(feature = "integration-tests")]
pub fn scan_locks(
&self,
safepoint: &Timestamp,
range: impl Into<BoundRange>,
batch_size: u32,
) -> Result<Vec<crate::proto::kvrpcpb::LockInfo>> {
self.runtime
.block_on(self.client.scan_locks(safepoint, range, batch_size))
}
}

impl Clone for SyncTransactionClient {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
runtime: Arc::clone(&self.runtime),
}
}
}
72 changes: 72 additions & 0 deletions src/transaction/sync_snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use crate::{BoundRange, Key, KvPair, Result, Snapshot, Value};
use std::sync::Arc;

/// A synchronous read-only snapshot.
///
/// This is a wrapper around the async [`Snapshot`] that provides blocking methods.
/// All operations block the current thread until completed.
Comment thread
odecode marked this conversation as resolved.
pub struct SyncSnapshot {
inner: Snapshot,
runtime: Arc<tokio::runtime::Runtime>,
}

impl SyncSnapshot {
pub(crate) fn new(inner: Snapshot, runtime: Arc<tokio::runtime::Runtime>) -> Self {
Self { inner, runtime }
}

/// Get the value associated with the given key.
pub fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
self.runtime.block_on(self.inner.get(key))
}

/// Check whether the key exists.
pub fn key_exists(&mut self, key: impl Into<Key>) -> Result<bool> {
self.runtime.block_on(self.inner.key_exists(key))
}

/// Get the values associated with the given keys.
pub fn batch_get(
&mut self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<impl Iterator<Item = KvPair>> {
self.runtime.block_on(self.inner.batch_get(keys))
}

/// Scan a range, return at most `limit` key-value pairs that lying in the range.
Comment thread
odecode marked this conversation as resolved.
Outdated
pub fn scan(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
self.runtime.block_on(self.inner.scan(range, limit))
}

/// Scan a range, return at most `limit` keys that lying in the range.
Comment thread
odecode marked this conversation as resolved.
Outdated
pub fn scan_keys(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
self.runtime.block_on(self.inner.scan_keys(range, limit))
}

/// Similar to scan, but in the reverse direction.
pub fn scan_reverse(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = KvPair>> {
self.runtime.block_on(self.inner.scan_reverse(range, limit))
}

/// Similar to scan_keys, but in the reverse direction.
pub fn scan_keys_reverse(
&mut self,
range: impl Into<BoundRange>,
limit: u32,
) -> Result<impl Iterator<Item = Key>> {
self.runtime
.block_on(self.inner.scan_keys_reverse(range, limit))
}
}
Loading
Loading