7474import java .util .Optional ;
7575import java .util .concurrent .CompletableFuture ;
7676import java .util .concurrent .ConcurrentHashMap ;
77+ import java .util .concurrent .locks .Lock ;
7778import java .util .concurrent .locks .ReentrantLock ;
79+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
7880import java .util .stream .Collectors ;
7981
8082import static org .apache .iotdb .consensus .iot .IoTConsensus .getConsensusGroupIdsFromDir ;
@@ -92,7 +94,9 @@ public class PipeConsensus implements IConsensus {
9294 new ConcurrentHashMap <>();
9395 private final PipeConsensusRPCService rpcService ;
9496 private final RegisterManager registerManager = new RegisterManager ();
95- private final ReentrantLock stateMachineMapLock = new ReentrantLock ();
97+ private final Map <ConsensusGroupId , ReentrantLock > consensusGroupIdReentrantLockMap =
98+ new ConcurrentHashMap <>();
99+ private final ReentrantReadWriteLock stateMachineMapLock = new ReentrantReadWriteLock ();
96100 private final PipeConsensusConfig config ;
97101 private final ConsensusPipeManager consensusPipeManager ;
98102 private final ConsensusPipeGuardian consensusPipeGuardian ;
@@ -190,7 +194,7 @@ private void checkAllConsensusPipe() {
190194 entry -> entry .getKey ().getConsensusGroupId (),
191195 Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue )));
192196 try {
193- stateMachineMapLock .lock ();
197+ stateMachineMapLock .writeLock (). lock ();
194198 stateMachineMap .forEach (
195199 (key , value ) ->
196200 value .checkConsensusPipe (existedPipes .getOrDefault (key , ImmutableMap .of ())));
@@ -214,7 +218,7 @@ private void checkAllConsensusPipe() {
214218 }
215219 });
216220 } finally {
217- stateMachineMapLock .unlock ();
221+ stateMachineMapLock .writeLock (). unlock ();
218222 }
219223 }
220224
@@ -263,8 +267,11 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
263267 throw new IllegalPeerEndpointException (thisNode , peers );
264268 }
265269
270+ Lock lock =
271+ consensusGroupIdReentrantLockMap .computeIfAbsent (groupId , key -> new ReentrantLock ());
266272 try {
267- stateMachineMapLock .lock ();
273+ lock .lock ();
274+ stateMachineMapLock .readLock ().lock ();
268275 if (stateMachineMap .containsKey (groupId )) {
269276 throw new ConsensusGroupAlreadyExistException (groupId );
270277 }
@@ -293,29 +300,36 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
293300 LOGGER .warn ("Cannot create local peer for group {} with peers {}" , groupId , peers , e );
294301 throw new ConsensusException (e );
295302 } finally {
296- stateMachineMapLock .unlock ();
303+ stateMachineMapLock .readLock ().unlock ();
304+ lock .unlock ();
297305 }
298306 }
299307
300308 @ Override
301309 public void deleteLocalPeer (ConsensusGroupId groupId ) throws ConsensusException {
302310 KillPoint .setKillPoint (IoTConsensusDeleteLocalPeerKillPoints .BEFORE_DELETE );
311+ Lock lock =
312+ consensusGroupIdReentrantLockMap .computeIfAbsent (groupId , key -> new ReentrantLock ());
303313 try {
304- stateMachineMapLock .lock ();
314+ lock .lock ();
315+ stateMachineMapLock .readLock ().lock ();
305316 if (!stateMachineMap .containsKey (groupId )) {
306317 throw new ConsensusGroupNotExistException (groupId );
307318 }
308319
309320 final PipeConsensusServerImpl consensus = stateMachineMap .get (groupId );
310321 consensus .clear ();
322+ stateMachineMap .remove (groupId );
311323
312324 FileUtils .deleteFileOrDirectory (new File (getPeerDir (groupId )));
313325 KillPoint .setKillPoint (IoTConsensusDeleteLocalPeerKillPoints .AFTER_DELETE );
314326 } catch (IOException e ) {
315327 LOGGER .warn ("Cannot delete local peer for group {}" , groupId , e );
316328 throw new ConsensusException (e );
317329 } finally {
318- stateMachineMapLock .unlock ();
330+ stateMachineMapLock .readLock ().unlock ();
331+ lock .unlock ();
332+ consensusGroupIdReentrantLockMap .remove (groupId );
319333 }
320334 }
321335
0 commit comments