Skip to content

Commit 685f112

Browse files
committed
perf: remove ReentrantReadWriteLock if readInDtKvExecutor is true
1 parent 5af6ef1 commit 685f112

File tree

5 files changed

+92
-40
lines changed

5 files changed

+92
-40
lines changed

server/src/main/java/com/github/dtprj/dongting/dtkv/server/DtKV.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,7 @@ protected void sendRequest(ChannelInfo ci, WatchNotifyReq req, ArrayList<Channel
130130
}
131131
};
132132
this.ttlManager = new TtlManager(ts, this::expire);
133-
KvImpl kvImpl = new KvImpl(watchManager, ttlManager, ts, config.groupId,
134-
kvConfig.initMapCapacity, kvConfig.loadFactor);
133+
KvImpl kvImpl = new KvImpl(watchManager, ttlManager, ts, config.groupId, kvConfig);
135134
updateStatus(false, kvImpl);
136135
}
137136

@@ -283,8 +282,7 @@ public FiberFuture<Void> installSnapshot(long lastIncludeIndex, int lastIncludeT
283282
private void install0(long offset, boolean done, ByteBuffer data) {
284283
if (offset == 0) {
285284
watchManager.reset();
286-
KvImpl kvImpl = new KvImpl(watchManager, ttlManager, ts, config.groupId, kvConfig.initMapCapacity,
287-
kvConfig.loadFactor);
285+
KvImpl kvImpl = new KvImpl(watchManager, ttlManager, ts, config.groupId, kvConfig);
288286
updateStatus(true, kvImpl);
289287
encodeStatus = new EncodeStatus();
290288
} else if (!kvStatus.installSnapshot) {

server/src/main/java/com/github/dtprj/dongting/dtkv/server/KvImpl.java

Lines changed: 81 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
import java.util.function.Supplier;
4040

4141
/**
42-
* All write operations run in same thread, and there are multiple read threads, so the write thread
42+
* All write operations run in same thread (in DtkvExecutor).
43+
* If readInDtKvExecutor is true (default), all read/write operations run in same thread and not need lock.
44+
* If readInDtKvExecutor is false, there are multiple read threads and one write thread, but the writer
4345
* do not need to acquire lock if it only read data or update fields that read threads will not access.
46+
*
4447
* @author huangli
4548
*/
4649
class KvImpl {
@@ -60,8 +63,7 @@ class KvImpl {
6063
// for fast access root dir
6164
final KvNodeHolder root;
6265

63-
// write operations is not atomic, so we need lock although ConcurrentHashMap is used
64-
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
66+
private final ReentrantReadWriteLock readWriteLock;
6567

6668
private final Timestamp ts;
6769

@@ -75,16 +77,17 @@ class KvImpl {
7577
private final TtlManager ttlManager;
7678

7779
public KvImpl(ServerWatchManager watchManager, TtlManager ttlManager, Timestamp ts, int groupId,
78-
int initCapacity, float loadFactor) {
80+
KvServerConfig kvServerConfig) {
7981
this.watchManager = watchManager;
8082
this.ts = ts;
8183
this.groupId = groupId;
82-
this.map = new KvMap(initCapacity, loadFactor);
84+
this.map = new KvMap(kvServerConfig.initMapCapacity, kvServerConfig.loadFactor);
8385
KvNodeEx n = new KvNodeEx(0, 0, 0, 0,
8486
KvNode.FLAG_DIR_MASK, null);
8587
this.root = new KvNodeHolder(ByteArray.EMPTY, ByteArray.EMPTY, n, null);
8688
this.map.put(ByteArray.EMPTY, root);
8789
this.ttlManager = ttlManager;
90+
this.readWriteLock = kvServerConfig.readInDtKvExecutor ? null : new ReentrantReadWriteLock();
8891
}
8992

9093
static KvResult checkExistNode(KvNodeHolder h, KvImpl.OpContext ctx) {
@@ -238,11 +241,16 @@ public KvResult get(ByteArray key) {
238241
if (ck != KvCodes.SUCCESS) {
239242
return new KvResult(ck);
240243
}
241-
readWriteLock.readLock().lock();
244+
ReentrantReadWriteLock readWriteLock = this.readWriteLock;
245+
if (readWriteLock != null) {
246+
readWriteLock.readLock().lock();
247+
}
242248
try {
243249
return get0(key);
244250
} finally {
245-
readWriteLock.readLock().unlock();
251+
if (readWriteLock != null) {
252+
readWriteLock.readLock().unlock();
253+
}
246254
}
247255
}
248256

@@ -275,7 +283,9 @@ public Pair<Integer, List<KvResult>> batchGet(List<byte[]> keys) {
275283
}
276284
int s = keys.size();
277285
ArrayList<KvResult> list = new ArrayList<>(s);
278-
readWriteLock.readLock().lock();
286+
if (readWriteLock != null) {
287+
readWriteLock.readLock().lock();
288+
}
279289
try {
280290
for (int i = 0; i < s; i++) {
281291
byte[] bs = keys.get(i);
@@ -288,7 +298,9 @@ public Pair<Integer, List<KvResult>> batchGet(List<byte[]> keys) {
288298
}
289299
}
290300
} finally {
291-
readWriteLock.readLock().unlock();
301+
if (readWriteLock != null) {
302+
readWriteLock.readLock().unlock();
303+
}
292304
}
293305
return new Pair<>(KvCodes.SUCCESS, list);
294306
}
@@ -304,7 +316,9 @@ public Pair<Integer, List<KvResult>> list(ByteArray key) {
304316
if (ck != KvCodes.SUCCESS) {
305317
return new Pair<>(ck, null);
306318
}
307-
readWriteLock.readLock().lock();
319+
if (readWriteLock != null) {
320+
readWriteLock.readLock().lock();
321+
}
308322
try {
309323
KvNodeHolder h;
310324
if (key == null || key.getData().length == 0) {
@@ -325,7 +339,9 @@ public Pair<Integer, List<KvResult>> list(ByteArray key) {
325339
ArrayList<KvResult> list = kvNode.list();
326340
return new Pair<>(KvCodes.SUCCESS, list);
327341
} finally {
328-
readWriteLock.readLock().unlock();
342+
if (readWriteLock != null) {
343+
readWriteLock.readLock().unlock();
344+
}
329345
}
330346
}
331347

@@ -371,7 +387,7 @@ public KvResult put(long index, ByteArray key, byte[] data) {
371387
return checkAndPut(index, key, data, true);
372388
}
373389

374-
private KvResult checkAndPut(long index, ByteArray key, byte[] data, boolean lock) {
390+
private KvResult checkAndPut(long index, ByteArray key, byte[] data, boolean lockAndFireUpdate) {
375391
int ck = checkKey(key, false, false);
376392
if (ck != KvCodes.SUCCESS) {
377393
return new KvResult(ck);
@@ -393,14 +409,16 @@ private KvResult checkAndPut(long index, ByteArray key, byte[] data, boolean loc
393409
if (r != null) {
394410
return r;
395411
}
396-
if (lock) {
412+
if (lockAndFireUpdate && readWriteLock != null) {
397413
this.readWriteLock.writeLock().lock();
398414
}
399415
try {
400416
return doPutInLock(index, key, data, h, parent, lastIndexOfSep);
401417
} finally {
402-
if (lock) {
403-
this.readWriteLock.writeLock().unlock();
418+
if (lockAndFireUpdate) {
419+
if (readWriteLock != null) {
420+
this.readWriteLock.writeLock().unlock();
421+
}
404422
afterUpdate();
405423
}
406424
}
@@ -512,14 +530,18 @@ public Pair<Integer, List<KvResult>> batchPut(long index, List<byte[]> keys, Lis
512530
if (values == null || values.size() != size) {
513531
return new Pair<>(KvCodes.INVALID_VALUE, null);
514532
}
515-
readWriteLock.writeLock().lock();
533+
if (readWriteLock != null) {
534+
readWriteLock.writeLock().lock();
535+
}
516536
try {
517537
for (int i = 0; i < size; i++) {
518538
byte[] k = keys.get(i);
519539
list.add(checkAndPut(index, k == null ? null : new ByteArray(k), values.get(i), false));
520540
}
521541
} finally {
522-
readWriteLock.writeLock().unlock();
542+
if (readWriteLock != null) {
543+
readWriteLock.writeLock().unlock();
544+
}
523545
afterUpdate();
524546
}
525547
return new Pair<>(KvCodes.SUCCESS, list);
@@ -639,7 +661,9 @@ public Supplier<Boolean> createGcTask() {
639661
long t = System.currentTimeMillis();
640662
log.info("group {} start gc task", groupId);
641663
return () -> {
642-
readWriteLock.writeLock().lock();
664+
if (readWriteLock != null) {
665+
readWriteLock.writeLock().lock();
666+
}
643667
try {
644668
for (int i = 0; i < gcItems; i++) {
645669
if (!it.hasNext()) {
@@ -651,7 +675,9 @@ public Supplier<Boolean> createGcTask() {
651675
}
652676
return Boolean.TRUE;
653677
} finally {
654-
readWriteLock.writeLock().unlock();
678+
if (readWriteLock != null) {
679+
readWriteLock.writeLock().unlock();
680+
}
655681
}
656682
};
657683
}
@@ -660,7 +686,7 @@ public KvResult remove(long index, ByteArray key) {
660686
return checkAndRemove(index, key, true);
661687
}
662688

663-
private KvResult checkAndRemove(long index, ByteArray key, boolean lock) {
689+
private KvResult checkAndRemove(long index, ByteArray key, boolean lockAndFireUpdate) {
664690
int ck = checkKey(key, false, false);
665691
if (ck != KvCodes.SUCCESS) {
666692
return new KvResult(ck);
@@ -674,14 +700,16 @@ private KvResult checkAndRemove(long index, ByteArray key, boolean lock) {
674700
if (n.childCount() > 0) {
675701
return new KvResult(KvCodes.HAS_CHILDREN);
676702
}
677-
if (lock) {
703+
if (lockAndFireUpdate && readWriteLock != null) {
678704
this.readWriteLock.writeLock().lock();
679705
}
680706
try {
681707
return doRemoveInLock(index, h);
682708
} finally {
683-
if (lock) {
684-
this.readWriteLock.writeLock().unlock();
709+
if (lockAndFireUpdate) {
710+
if (readWriteLock != null) {
711+
this.readWriteLock.writeLock().unlock();
712+
}
685713
afterUpdate();
686714
}
687715
}
@@ -735,14 +763,18 @@ public Pair<Integer, List<KvResult>> batchRemove(long index, List<byte[]> keys)
735763
}
736764
int size = keys.size();
737765
ArrayList<KvResult> list = new ArrayList<>(size);
738-
readWriteLock.writeLock().lock();
766+
if (readWriteLock != null) {
767+
readWriteLock.writeLock().lock();
768+
}
739769
try {
740770
for (int i = 0; i < size; i++) {
741771
byte[] k = keys.get(i);
742772
list.add(checkAndRemove(index, k == null ? null : new ByteArray(k), false));
743773
}
744774
} finally {
745-
readWriteLock.writeLock().unlock();
775+
if (readWriteLock != null) {
776+
readWriteLock.writeLock().unlock();
777+
}
746778
afterUpdate();
747779
}
748780
return new Pair<>(KvCodes.SUCCESS, list);
@@ -776,7 +808,9 @@ public KvResult compareAndSet(long index, ByteArray key, byte[] expectedValue, b
776808
if (r != null) {
777809
return r;
778810
}
779-
readWriteLock.writeLock().lock();
811+
if (readWriteLock != null) {
812+
readWriteLock.writeLock().lock();
813+
}
780814
try {
781815
if (expectedValue == null || expectedValue.length == 0) {
782816
if (h == null || h.latest.removed) {
@@ -809,7 +843,9 @@ public KvResult compareAndSet(long index, ByteArray key, byte[] expectedValue, b
809843
}
810844
}
811845
} finally {
812-
readWriteLock.writeLock().unlock();
846+
if (readWriteLock != null) {
847+
readWriteLock.writeLock().unlock();
848+
}
813849
afterUpdate();
814850
}
815851
}
@@ -921,11 +957,15 @@ public KvResult expire(long index, ByteArray key, long expectRaftIndex) {
921957
}
922958
return new KvResult(KvCodes.TTL_INDEX_MISMATCH);
923959
}
924-
readWriteLock.writeLock().lock();
960+
if (readWriteLock != null) {
961+
readWriteLock.writeLock().lock();
962+
}
925963
try {
926964
return expireInLock(index, h);
927965
} finally {
928-
readWriteLock.writeLock().unlock();
966+
if (readWriteLock != null) {
967+
readWriteLock.writeLock().unlock();
968+
}
929969
afterUpdate();
930970
}
931971
}
@@ -1052,7 +1092,9 @@ private KvResult updateNextOwnerIfExists(long index, KvNodeHolder parent) {
10521092
public KvResult tryLock(long index, ByteArray key, byte[] data) {
10531093
long ttlMillis = opContext.ttlMillis;
10541094
opContext.ttlMillis = 0; // the lock dir has no ttl
1055-
readWriteLock.writeLock().lock();
1095+
if (readWriteLock != null) {
1096+
readWriteLock.writeLock().lock();
1097+
}
10561098
try {
10571099
KvResult r = checkAndPut(index, key, null, false);
10581100
if (r.getBizCode() != KvCodes.SUCCESS && r.getBizCode() != KvCodes.DIR_EXISTS) {
@@ -1070,7 +1112,9 @@ public KvResult tryLock(long index, ByteArray key, byte[] data) {
10701112
}
10711113
return doPutInLock(index, fullKey, data, sub, parent, parent.key.length);
10721114
} finally {
1073-
readWriteLock.writeLock().unlock();
1115+
if (readWriteLock != null) {
1116+
readWriteLock.writeLock().unlock();
1117+
}
10741118
afterUpdate();
10751119
}
10761120
}
@@ -1095,7 +1139,9 @@ public KvResult unlock(long index, ByteArray key) {
10951139
BugLog.logAndThrow("sub.parent != parent");
10961140
}
10971141
boolean holdLock = sub == parent.latest.peekNextOwner();
1098-
readWriteLock.writeLock().lock();
1142+
if (readWriteLock != null) {
1143+
readWriteLock.writeLock().lock();
1144+
}
10991145
try {
11001146
doRemoveInLock(index, sub);
11011147
boolean removeParent = parent.latest.childCount() == 0;
@@ -1113,7 +1159,9 @@ public KvResult unlock(long index, ByteArray key) {
11131159
return new KvResult(KvCodes.LOCK_BY_OTHER);
11141160
}
11151161
} finally {
1116-
readWriteLock.writeLock().unlock();
1162+
if (readWriteLock != null) {
1163+
readWriteLock.writeLock().unlock();
1164+
}
11171165
afterUpdate();
11181166
}
11191167
}

server/src/test/java/com/github/dtprj/dongting/dtkv/server/KvImplTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ void setUp() {
4949
ver = 1;
5050
ts = new Timestamp();
5151
TtlManager tm = new TtlManager(ts, null);
52-
kv = new KvImpl(null, tm, ts, 0, 16, 0.75f);
52+
KvServerConfig c = new KvServerConfig();
53+
c.initMapCapacity = 16;
54+
kv = new KvImpl(null, tm, ts, 0, c);
5355
}
5456

5557
static ByteArray ba(String str) {

server/src/test/java/com/github/dtprj/dongting/dtkv/server/KvLockTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ void setUp() {
4848
ver = 1;
4949
ts = new Timestamp();
5050
TtlManager tm = new TtlManager(ts, null);
51-
kv = new KvImpl(null, tm, ts, 0, 16, 0.75f);
51+
KvServerConfig c = new KvServerConfig();
52+
c.initMapCapacity = 16;
53+
kv = new KvImpl(null, tm, ts, 0, c);
5254
}
5355

5456
static ByteArray ba(String str) {

server/src/test/java/com/github/dtprj/dongting/dtkv/server/ServerWatchManagerTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,9 @@ protected void sendRequest(ChannelInfo ci, WatchNotifyReq req, ArrayList<Channel
137137
}
138138
};
139139
TtlManager tm = new TtlManager(ts, null);
140-
kv = new KvImpl(manager, tm, ts, groupId, 16, 0.75f);
140+
KvServerConfig c = new KvServerConfig();
141+
c.initMapCapacity = 16;
142+
kv = new KvImpl(manager, tm, ts, groupId, c);
141143
put("aaa", "bbb"); // add first item and make raft index in statemachine greater than 0
142144
}
143145

0 commit comments

Comments
 (0)