Skip to content

Commit 1cdd04c

Browse files
dataroaringclaude
andauthored
[opt](txn) Per-transaction locking and parallel publish for DatabaseTransactionMgr (#59990)
## Summary - Replace database-wide write lock with per-transaction `synchronized(transactionState)` for commit, preCommit, abort, and finish paths, so independent transactions proceed concurrently - Enable parallel publish within a single database by routing publish executors by transactionId instead of dbId - Move edit log writes outside the write lock to reduce lock hold time - Add config flags for runtime control and safe fallback Fixes: #53642 ## Changes ### Per-transaction locking (DatabaseTransactionMgr.java) - Extract `updateTxnLabels` from `unprotectUpdateInMemoryState` — only needed at `beginTransaction` and replay time, not during state transitions - Convert `runningTxnNums` from `volatile int` to `AtomicInteger` for thread-safe increment/decrement without db lock - Replace `ArrayDeque` with `ConcurrentLinkedDeque` for final-status transaction queues - Replace `writeLock()`/`writeUnlock()` with `synchronized(transactionState)` in: - `preCommitTransaction2PC` - `commitTransaction` (both overloads) - `finishTransaction` - `abortTransaction` / `abortTransaction2PC` - DB write lock retained for: `beginTransaction` (label uniqueness), `removeUselessTxns`/`cleanLabel` (touch `labelToTxnIds` HashMap), replay paths ### Parallel publish within a database (PublishVersionDaemon.java) - Route publish executor by `transactionId` instead of `dbId` when `enable_per_txn_publish=true`, so different transactions in the same DB finish in parallel - Fix race condition: use local variables in `tryFinishTxnSync` instead of shared instance fields for `partitionVisibleVersions`/`backendPartitions` - Rename `dbExecutors` to `publishExecutors` ### Edit log outside lock (DatabaseTransactionMgr.java, EditLog.java) - Enqueue edit log inside lock (nanoseconds), await outside lock (milliseconds) - Add `enable_txn_log_outside_lock` config flag ### Config flags (Config.java) - `enable_per_txn_publish` (default `true`, `mutable=true`): controls publish routing. Set to `false` to fall back to sequential-per-DB publish - `enable_txn_log_outside_lock` (default `true`, `mutable=true`): controls edit log write location ## Correctness **Two different transactions committing concurrently (the main win):** - Thread A: `synchronized(txn1) { set COMMITTED, ConcurrentMap.put }` - Thread B: `synchronized(txn2) { set COMMITTED, ConcurrentMap.put }` - Different locks, different map entries → fully concurrent **Same transaction: concurrent commit + abort:** - `synchronized(txnState)` serializes them — second thread sees the first's state change and handles accordingly **No deadlock:** State transition paths only acquire per-txn locks. Cleanup/replay paths only acquire db write lock. No path acquires both. ## Test plan - [ ] Run DatabaseTransactionMgrTest unit tests - [ ] Concurrent load testing with multiple tables in same database - [ ] Verify lock hold time reduction via metrics - [ ] Toggle `enable_per_txn_publish` at runtime — verify both parallel and sequential modes work - [ ] Toggle `enable_txn_log_outside_lock` at runtime — verify both paths work 🤖 Generated with [Claude Code](https://claude.com/claude-code) ## test **scripts** `for i in `seq 1 2000`; do mysql -h127.0.0.1 -P9030 -uroot paralldb -e "drop table if exists sbtest${i}; create table sbtest${i} ( id BIGINT NOT NULL AUTO_INCREMENT, k INTEGER DEFAULT 0, c CHAR(120) DEFAULT '', pad CHAR(60) DEFAULT '' ) DISTRIBUTED BY RANDOM BUCKETS 1 PROPERTIES( "replication_num" = "1" )" done` `sysbench --db-driver=mysql --mysql-host=127.0.0.1 --mysql-port=9030 --mysql-user=root --mysql-db=paralldb --tables=1000 --threads=1000 --time=180 --report-interval=10 oltp_insert run` **env** 3 fe 3 be **enable_txn_log_outside_lock = true enable_per_txn_publish = true** [ 10s ] thds: 1000 tps: 1345.63 qps: 1345.63 (r/w/o: 0.00/1345.63/0.00) lat (ms,95%): 1869.60 err/s: 0.00 reconn/s: 0.00 [ 20s ] thds: 1000 tps: 1669.04 qps: 1669.04 (r/w/o: 0.00/1669.04/0.00) lat (ms,95%): 1032.01 err/s: 0.00 reconn/s: 0.00 [ 30s ] thds: 1000 tps: 1780.93 qps: 1780.93 (r/w/o: 0.00/1780.93/0.00) lat (ms,95%): 893.56 err/s: 0.00 reconn/s: 0.00 [ 40s ] thds: 1000 tps: 1845.02 qps: 1845.02 (r/w/o: 0.00/1845.02/0.00) lat (ms,95%): 861.95 err/s: 0.00 reconn/s: 0.00 [ 50s ] thds: 1000 tps: 1576.76 qps: 1576.76 (r/w/o: 0.00/1576.76/0.00) lat (ms,95%): 2493.86 err/s: 0.00 reconn/s: 0.00 [ 60s ] thds: 1000 tps: 1911.56 qps: 1911.56 (r/w/o: 0.00/1911.56/0.00) lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00 [ 70s ] thds: 1000 tps: 2006.93 qps: 2006.93 (r/w/o: 0.00/2006.93/0.00) lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00 [ 80s ] thds: 1000 tps: 997.81 qps: 997.81 (r/w/o: 0.00/997.81/0.00) lat (ms,95%): 995.51 err/s: 0.00 reconn/s: 0.00 [ 90s ] thds: 1000 tps: 1462.80 qps: 1462.80 (r/w/o: 0.00/1462.80/0.00) lat (ms,95%): 4855.31 err/s: 0.00 reconn/s: 0.00 [ 100s ] thds: 1000 tps: 1661.09 qps: 1661.09 (r/w/o: 0.00/1661.09/0.00) lat (ms,95%): 943.16 err/s: 0.00 reconn/s: 0.00 [ 110s ] thds: 1000 tps: 1686.85 qps: 1686.85 (r/w/o: 0.00/1686.85/0.00) lat (ms,95%): 909.80 err/s: 0.00 reconn/s: 0.00 [ 120s ] thds: 1000 tps: 725.92 qps: 725.92 (r/w/o: 0.00/725.92/0.00) lat (ms,95%): 6360.91 err/s: 0.00 reconn/s: 0.00 [ 130s ] thds: 1000 tps: 1839.17 qps: 1839.17 (r/w/o: 0.00/1839.17/0.00) lat (ms,95%): 1013.60 err/s: 0.00 reconn/s: 0.00 [ 140s ] thds: 1000 tps: 1986.62 qps: 1986.62 (r/w/o: 0.00/1986.62/0.00) lat (ms,95%): 773.68 err/s: 0.00 reconn/s: 0.00 [ 150s ] thds: 1000 tps: 1173.42 qps: 1173.42 (r/w/o: 0.00/1173.42/0.00) lat (ms,95%): 1109.09 err/s: 0.00 reconn/s: 0.00 [ 160s ] thds: 1000 tps: 1057.97 qps: 1057.97 (r/w/o: 0.00/1057.97/0.00) lat (ms,95%): 9118.47 err/s: 0.00 reconn/s: 0.00 [ 170s ] thds: 1000 tps: 2023.63 qps: 2023.63 (r/w/o: 0.00/2023.63/0.00) lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00 [ 180s ] thds: 1000 tps: 2015.55 qps: 2015.55 (r/w/o: 0.00/2015.55/0.00) lat (ms,95%): 787.74 err/s: 0.00 reconn/s: 0.00 SQL statistics: queries performed: read: 0 write: 288670 other: 0 total: 288670 transactions: 288670 (1601.69 per sec.) queries: 288670 (1601.69 per sec.) **enable_txn_log_outside_lock = false enable_per_txn_publish = false** [ 10s ] thds: 1000 tps: 135.28 qps: 135.28 (r/w/o: 0.00/135.28/0.00) lat (ms,95%): 7346.49 err/s: 0.00 reconn/s: 0.00 [ 20s ] thds: 1000 tps: 162.20 qps: 162.20 (r/w/o: 0.00/162.20/0.00) lat (ms,95%): 6247.39 err/s: 0.00 reconn/s: 0.00 [ 30s ] thds: 1000 tps: 90.70 qps: 90.70 (r/w/o: 0.00/90.70/0.00) lat (ms,95%): 6247.39 err/s: 0.00 reconn/s: 0.00 [ 40s ] thds: 1000 tps: 150.60 qps: 150.60 (r/w/o: 0.00/150.60/0.00) lat (ms,95%): 11946.04 err/s: 0.00 reconn/s: 0.00 [ 50s ] thds: 1000 tps: 182.10 qps: 182.10 (r/w/o: 0.00/182.10/0.00) lat (ms,95%): 5709.50 err/s: 0.00 reconn/s: 0.00 [ 60s ] thds: 1000 tps: 185.10 qps: 185.10 (r/w/o: 0.00/185.10/0.00) lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00 [ 70s ] thds: 1000 tps: 80.70 qps: 80.70 (r/w/o: 0.00/80.70/0.00) lat (ms,95%): 11115.87 err/s: 0.00 reconn/s: 0.00 [ 80s ] thds: 1000 tps: 183.50 qps: 183.50 (r/w/o: 0.00/183.50/0.00) lat (ms,95%): 11115.87 err/s: 0.00 reconn/s: 0.00 [ 90s ] thds: 1000 tps: 182.40 qps: 182.40 (r/w/o: 0.00/182.40/0.00) lat (ms,95%): 5607.61 err/s: 0.00 reconn/s: 0.00 [ 100s ] thds: 1000 tps: 128.30 qps: 128.30 (r/w/o: 0.00/128.30/0.00) lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00 [ 110s ] thds: 1000 tps: 137.50 qps: 137.50 (r/w/o: 0.00/137.50/0.00) lat (ms,95%): 10917.50 err/s: 0.00 reconn/s: 0.00 [ 120s ] thds: 1000 tps: 182.60 qps: 182.60 (r/w/o: 0.00/182.60/0.00) lat (ms,95%): 5607.61 err/s: 0.00 reconn/s: 0.00 [ 130s ] thds: 1000 tps: 183.60 qps: 183.60 (r/w/o: 0.00/183.60/0.00) lat (ms,95%): 5409.26 err/s: 0.00 reconn/s: 0.00 [ 140s ] thds: 1000 tps: 86.50 qps: 86.50 (r/w/o: 0.00/86.50/0.00) lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00 [ 150s ] thds: 1000 tps: 177.00 qps: 177.00 (r/w/o: 0.00/177.00/0.00) lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00 [ 160s ] thds: 1000 tps: 183.00 qps: 183.00 (r/w/o: 0.00/183.00/0.00) lat (ms,95%): 5709.50 err/s: 0.00 reconn/s: 0.00 [ 170s ] thds: 1000 tps: 142.30 qps: 142.30 (r/w/o: 0.00/142.30/0.00) lat (ms,95%): 5507.54 err/s: 0.00 reconn/s: 0.00 [ 180s ] thds: 1000 tps: 130.10 qps: 130.10 (r/w/o: 0.00/130.10/0.00) lat (ms,95%): 10722.67 err/s: 0.00 reconn/s: 0.00 SQL statistics: queries performed: read: 0 write: 28035 other: 0 total: 28035 transactions: 28035 (152.91 per sec.) queries: 28035 (152.91 per sec.) ignored errors: 0 (0.00 per sec.) reconnects: 0 (0.00 per sec.) **enable_txn_log_outside_lock = false enable_per_txn_publish = true** [ 10s ] thds: 1000 tps: 965.23 qps: 965.23 (r/w/o: 0.00/965.23/0.00) lat (ms,95%): 2493.86 err/s: 0.00 reconn/s: 0.00 [ 20s ] thds: 1000 tps: 1464.63 qps: 1464.63 (r/w/o: 0.00/1464.63/0.00) lat (ms,95%): 1050.76 err/s: 0.00 reconn/s: 0.00 [ 30s ] thds: 1000 tps: 888.00 qps: 888.00 (r/w/o: 0.00/888.00/0.00) lat (ms,95%): 909.80 err/s: 0.00 reconn/s: 0.00 [ 40s ] thds: 1000 tps: 1632.37 qps: 1632.37 (r/w/o: 0.00/1632.37/0.00) lat (ms,95%): 5312.73 err/s: 0.00 reconn/s: 0.00 [ 50s ] thds: 1000 tps: 1810.11 qps: 1810.11 (r/w/o: 0.00/1810.11/0.00) lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00 [ 60s ] thds: 1000 tps: 1922.10 qps: 1922.10 (r/w/o: 0.00/1922.10/0.00) lat (ms,95%): 831.46 err/s: 0.00 reconn/s: 0.00 [ 70s ] thds: 1000 tps: 712.58 qps: 712.58 (r/w/o: 0.00/712.58/0.00) lat (ms,95%): 7215.39 err/s: 0.00 reconn/s: 0.00 [ 80s ] thds: 1000 tps: 2093.76 qps: 2093.76 (r/w/o: 0.00/2093.76/0.00) lat (ms,95%): 816.63 err/s: 0.00 reconn/s: 0.00 [ 90s ] thds: 1000 tps: 2115.99 qps: 2115.99 (r/w/o: 0.00/2115.99/0.00) lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00 [ 100s ] thds: 1000 tps: 1494.48 qps: 1494.48 (r/w/o: 0.00/1494.48/0.00) lat (ms,95%): 733.00 err/s: 0.00 reconn/s: 0.00 [ 110s ] thds: 1000 tps: 651.00 qps: 651.00 (r/w/o: 0.00/651.00/0.00) lat (ms,95%): 10531.32 err/s: 0.00 reconn/s: 0.00 [ 120s ] thds: 1000 tps: 1976.26 qps: 1976.26 (r/w/o: 0.00/1976.26/0.00) lat (ms,95%): 787.74 err/s: 0.00 reconn/s: 0.00 [ 130s ] thds: 1000 tps: 1779.35 qps: 1779.35 (r/w/o: 0.00/1779.35/0.00) lat (ms,95%): 861.95 err/s: 0.00 reconn/s: 0.00 [ 140s ] thds: 1000 tps: 1281.70 qps: 1281.70 (r/w/o: 0.00/1281.70/0.00) lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00 [ 150s ] thds: 1000 tps: 0.00 qps: 0.00 (r/w/o: 0.00/0.00/0.00) lat (ms,95%): 0.00 err/s: 0.00 reconn/s: 0.00 [ 160s ] thds: 1000 tps: 1250.21 qps: 1250.21 (r/w/o: 0.00/1250.21/0.00) lat (ms,95%): 14302.94 err/s: 0.00 reconn/s: 0.00 [ 170s ] thds: 1000 tps: 1775.39 qps: 1775.39 (r/w/o: 0.00/1775.39/0.00) lat (ms,95%): 877.61 err/s: 0.00 reconn/s: 0.00 [ 180s ] thds: 1000 tps: 1894.81 qps: 1894.81 (r/w/o: 0.00/1894.81/0.00) lat (ms,95%): 816.63 err/s: 0.00 reconn/s: 0.00 SQL statistics: queries performed: read: 0 write: 258081 other: 0 total: 258081 transactions: 258081 (1431.61 per sec.) queries: 258081 (1431.61 per sec.) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 98ca256 commit 1cdd04c

File tree

6 files changed

+356
-124
lines changed

6 files changed

+356
-124
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,28 @@ public class Config extends ConfigBase {
744744
"Txn manager will reject coming txns."})
745745
public static int max_running_txn_num_per_db = 10000;
746746

747+
@ConfField(mutable = true, masterOnly = true, description = {
748+
"是否将事务的 edit log 写入移到写锁之外以减少锁竞争。"
749+
+ "开启后,edit log 条目在写锁内入队(FIFO 保证顺序),"
750+
+ "在写锁外等待持久化完成,从而降低写锁持有时间,提高并发事务吞吐量。"
751+
+ "默认开启。关闭后使用传统的锁内同步写入模式。",
752+
"Whether to move transaction edit log writes outside the write lock to reduce lock contention. "
753+
+ "When enabled, edit log entries are enqueued inside the write lock (FIFO preserves ordering) "
754+
+ "and awaited outside the lock, reducing write lock hold time "
755+
+ "and improving concurrent transaction throughput. "
756+
+ "Default is true. Set to false to use the traditional in-lock synchronous write mode."})
757+
public static boolean enable_txn_log_outside_lock = true;
758+
759+
@ConfField(mutable = true, description = {
760+
"是否启用按事务级别并行发布。开启后,同一数据库内的不同事务可以在不同的执行器线程上并行完成发布,"
761+
+ "而不是按数据库顺序执行。关闭后回退到按数据库路由(旧行为),同一数据库内的事务顺序发布。",
762+
"Whether to enable per-transaction parallel publish. When enabled, different transactions "
763+
+ "in the same database can finish publishing in parallel across executor threads, "
764+
+ "instead of being serialized per database. "
765+
+ "When disabled, falls back to per-database routing (old behavior) "
766+
+ "where transactions within a DB are published sequentially."})
767+
public static boolean enable_per_txn_publish = true;
768+
747769
@ConfField(masterOnly = true, description = {"pending load task 执行线程数。这个配置可以限制当前等待的导入作业数。"
748770
+ "并且应小于 `max_running_txn_num_per_db`。",
749771
"The pending load task executor pool size. "

fe/fe-core/src/main/java/org/apache/doris/DorisFE.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.nio.file.StandardOpenOption;
6767
import java.time.LocalDate;
6868
import java.util.List;
69+
import java.util.Random;
6970
import java.util.concurrent.TimeUnit;
7071
import java.util.concurrent.atomic.AtomicBoolean;
7172

@@ -596,6 +597,11 @@ private static void fuzzyConfigs() {
596597
LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", Config.random_add_cluster_keys_for_mow);
597598
}
598599

600+
Config.enable_txn_log_outside_lock = new Random().nextBoolean();
601+
LOG.info("fuzzy set enable_txn_log_outside_lock={}", Config.enable_txn_log_outside_lock);
602+
Config.enable_batch_editlog = new Random().nextBoolean();
603+
LOG.info("fuzzy set enable_batch_editlog={}", Config.enable_batch_editlog);
604+
599605
setFuzzyForCatalog();
600606
}
601607

fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,13 +124,14 @@
124124
public class EditLog {
125125
public static final Logger LOG = LogManager.getLogger(EditLog.class);
126126

127-
// Helper class to hold log edit requests
128-
private static class EditLogItem {
127+
// Helper class to hold log edit requests.
128+
// Public so that callers can enqueue inside a lock and await outside it.
129+
public static class EditLogItem {
129130
static AtomicLong nextUid = new AtomicLong(0);
130131
final short op;
131132
final Writable writable;
132133
final Object lock = new Object();
133-
boolean finished = false;
134+
volatile boolean finished = false;
134135
long logId = -1;
135136
long uid = -1;
136137

@@ -139,6 +140,24 @@ private static class EditLogItem {
139140
this.writable = writable;
140141
uid = nextUid.getAndIncrement();
141142
}
143+
144+
/**
145+
* Wait for this edit log entry to be flushed to persistent storage.
146+
* Returns the assigned log ID.
147+
*/
148+
public long await() {
149+
synchronized (lock) {
150+
while (!finished) {
151+
try {
152+
lock.wait();
153+
} catch (InterruptedException e) {
154+
LOG.error("Fatal Error : write stream Exception");
155+
System.exit(-1);
156+
}
157+
}
158+
}
159+
return logId;
160+
}
142161
}
143162

144163
private final BlockingQueue<EditLogItem> logEditQueue = new LinkedBlockingQueue<>();
@@ -1534,6 +1553,49 @@ public long logEditWithQueue(short op, Writable writable) {
15341553
return req.logId;
15351554
}
15361555

1556+
/**
1557+
* Submit an edit log entry to the batch queue without waiting for it to be flushed.
1558+
* The entry is enqueued in FIFO order, so calling this inside a write lock guarantees
1559+
* that edit log entries are ordered by lock acquisition order.
1560+
*
1561+
* <p>The caller MUST call {@link EditLogItem#await()} after releasing the lock to ensure
1562+
* the entry is persisted before proceeding.
1563+
*
1564+
* <p>If batch edit log is disabled, this falls back to a synchronous direct write
1565+
* and the returned item is already completed.
1566+
*
1567+
* @return an {@link EditLogItem} handle to await completion
1568+
*/
1569+
public EditLogItem submitEdit(short op, Writable writable) {
1570+
if (this.getNumEditStreams() == 0) {
1571+
LOG.error("Fatal Error : no editLog stream", new Exception());
1572+
throw new Error("Fatal Error : no editLog stream");
1573+
}
1574+
1575+
EditLogItem req = new EditLogItem(op, writable);
1576+
if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) {
1577+
while (true) {
1578+
try {
1579+
logEditQueue.put(req);
1580+
break;
1581+
} catch (InterruptedException e) {
1582+
LOG.warn("Interrupted during put, will sleep and retry.");
1583+
try {
1584+
Thread.sleep(100);
1585+
} catch (InterruptedException ex) {
1586+
LOG.warn("interrupted during sleep, will retry.", ex);
1587+
}
1588+
}
1589+
}
1590+
} else {
1591+
// Non-batch mode: write directly (synchronous)
1592+
long logId = logEditDirectly(op, writable);
1593+
req.logId = logId;
1594+
req.finished = true;
1595+
}
1596+
return req;
1597+
}
1598+
15371599
private synchronized long logEditDirectly(short op, Writable writable) {
15381600
long logId = -1;
15391601
try {

0 commit comments

Comments
 (0)