Skip to content

Commit 809ecd4

Browse files
authored
1 parent b8947ff commit 809ecd4

File tree

10 files changed

+109
-85
lines changed

10 files changed

+109
-85
lines changed

iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,11 @@ public synchronized void close() {
291291

292292
// close subscription providers
293293
providers.acquireWriteLock();
294-
providers.closeProviders();
295-
providers.releaseWriteLock();
294+
try {
295+
providers.closeProviders();
296+
} finally {
297+
providers.releaseWriteLock();
298+
}
296299

297300
isClosed.set(true);
298301

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/PartitionBalancer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ public Map<String, DataPartitionTable> allocateDataPartition(
140140
throw new DatabaseNotExistsException(database);
141141
}
142142
DataPartitionPolicyTable allotTable = dataPartitionPolicyTableMap.get(database);
143+
allotTable.acquireLock();
143144
try {
144-
allotTable.acquireLock();
145145
// Enumerate SeriesPartitionSlot
146146
for (Map.Entry<TSeriesPartitionSlot, TTimeSlotList> seriesPartitionEntry :
147147
unassignedPartitionSlotsMap.entrySet()) {
@@ -226,8 +226,8 @@ public void reBalanceDataPartitionPolicy(String database) {
226226
dataPartitionPolicyTableMap.computeIfAbsent(
227227
database, empty -> new DataPartitionPolicyTable());
228228

229+
dataPartitionPolicyTable.acquireLock();
229230
try {
230-
dataPartitionPolicyTable.acquireLock();
231231
dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
232232
getPartitionManager().getAllRegionGroupIds(database, TConsensusGroupType.DataRegion));
233233
dataPartitionPolicyTable.logDataAllotTable(database);
@@ -249,9 +249,9 @@ public void setupPartitionBalancer() {
249249
database -> {
250250
DataPartitionPolicyTable dataPartitionPolicyTable = new DataPartitionPolicyTable();
251251
dataPartitionPolicyTableMap.put(database, dataPartitionPolicyTable);
252-
try {
253-
dataPartitionPolicyTable.acquireLock();
254252

253+
dataPartitionPolicyTable.acquireLock();
254+
try {
255255
// Put all DataRegionGroups into the DataPartitionPolicyTable
256256
dataPartitionPolicyTable.reBalanceDataPartitionPolicy(
257257
getPartitionManager()

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -248,26 +248,35 @@ private void setCQConfig(ConfigurationResp dataSet) {
248248

249249
private TRuntimeConfiguration getRuntimeConfiguration() {
250250
getPipeManager().getPipePluginCoordinator().lock();
251-
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
252-
getUDFManager().getUdfInfo().acquireUDFTableLock();
253251
try {
254-
final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
255-
runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
256-
runtimeConfiguration.setAllTriggerInformation(
257-
getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
258-
runtimeConfiguration.setAllUDFInformation(
259-
getUDFManager().getAllUDFTable().getAllUDFInformation());
260-
runtimeConfiguration.setAllPipeInformation(
261-
getPipeManager().getPipePluginCoordinator().getPipePluginTable().getAllPipePluginMeta());
262-
runtimeConfiguration.setAllTTLInformation(
263-
DataNodeRegisterResp.convertAllTTLInformation(getTTLManager().getAllTTL()));
264-
runtimeConfiguration.setTableInfo(
265-
getClusterSchemaManager().getAllTableInfoForDataNodeActivation());
266-
runtimeConfiguration.setClusterId(getClusterManager().getClusterId());
267-
return runtimeConfiguration;
252+
getTriggerManager().getTriggerInfo().acquireTriggerTableLock();
253+
try {
254+
getUDFManager().getUdfInfo().acquireUDFTableLock();
255+
try {
256+
final TRuntimeConfiguration runtimeConfiguration = new TRuntimeConfiguration();
257+
runtimeConfiguration.setTemplateInfo(getClusterSchemaManager().getAllTemplateSetInfo());
258+
runtimeConfiguration.setAllTriggerInformation(
259+
getTriggerManager().getTriggerTable(false).getAllTriggerInformation());
260+
runtimeConfiguration.setAllUDFInformation(
261+
getUDFManager().getAllUDFTable().getAllUDFInformation());
262+
runtimeConfiguration.setAllPipeInformation(
263+
getPipeManager()
264+
.getPipePluginCoordinator()
265+
.getPipePluginTable()
266+
.getAllPipePluginMeta());
267+
runtimeConfiguration.setAllTTLInformation(
268+
DataNodeRegisterResp.convertAllTTLInformation(getTTLManager().getAllTTL()));
269+
runtimeConfiguration.setTableInfo(
270+
getClusterSchemaManager().getAllTableInfoForDataNodeActivation());
271+
runtimeConfiguration.setClusterId(getClusterManager().getClusterId());
272+
return runtimeConfiguration;
273+
} finally {
274+
getUDFManager().getUdfInfo().releaseUDFTableLock();
275+
}
276+
} finally {
277+
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
278+
}
268279
} finally {
269-
getTriggerManager().getTriggerInfo().releaseTriggerTableLock();
270-
getUDFManager().getUdfInfo().releaseUDFTableLock();
271280
getPipeManager().getPipePluginCoordinator().unlock();
272281
}
273282
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ private void checkAllConsensusPipe() {
246246
Collectors.groupingBy(
247247
entry -> entry.getKey().getConsensusGroupId(),
248248
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
249+
stateMachineMapLock.writeLock().lock();
249250
try {
250-
stateMachineMapLock.writeLock().lock();
251251
stateMachineMap.forEach(
252252
(key, value) ->
253253
value.checkConsensusPipe(existedPipes.getOrDefault(key, ImmutableMap.of())));
@@ -322,37 +322,40 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
322322

323323
Lock lock =
324324
consensusGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new ReentrantLock());
325+
lock.lock();
325326
try {
326-
lock.lock();
327327
stateMachineMapLock.readLock().lock();
328-
if (stateMachineMap.containsKey(groupId)) {
329-
throw new ConsensusGroupAlreadyExistException(groupId);
330-
}
328+
try {
329+
if (stateMachineMap.containsKey(groupId)) {
330+
throw new ConsensusGroupAlreadyExistException(groupId);
331+
}
331332

332-
final String path = getPeerDir(groupId);
333-
File consensusDir = new File(path);
334-
if (!consensusDir.exists() && !consensusDir.mkdirs()) {
335-
LOGGER.warn("Unable to create consensus dir for group {} at {}", groupId, path);
336-
throw new ConsensusException(
337-
String.format("Unable to create consensus dir for group %s", groupId));
338-
}
333+
final String path = getPeerDir(groupId);
334+
File consensusDir = new File(path);
335+
if (!consensusDir.exists() && !consensusDir.mkdirs()) {
336+
LOGGER.warn("Unable to create consensus dir for group {} at {}", groupId, path);
337+
throw new ConsensusException(
338+
String.format("Unable to create consensus dir for group %s", groupId));
339+
}
339340

340-
PipeConsensusServerImpl consensus =
341-
new PipeConsensusServerImpl(
342-
new Peer(groupId, thisNodeId, thisNode),
343-
registry.apply(groupId),
344-
peers,
345-
config,
346-
consensusPipeManager,
347-
syncClientManager);
348-
stateMachineMap.put(groupId, consensus);
349-
consensus.start(false); // pipe will start after creating
350-
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
351-
} catch (IOException e) {
352-
LOGGER.warn("Cannot create local peer for group {} with peers {}", groupId, peers, e);
353-
throw new ConsensusException(e);
341+
PipeConsensusServerImpl consensus =
342+
new PipeConsensusServerImpl(
343+
new Peer(groupId, thisNodeId, thisNode),
344+
registry.apply(groupId),
345+
peers,
346+
config,
347+
consensusPipeManager,
348+
syncClientManager);
349+
stateMachineMap.put(groupId, consensus);
350+
consensus.start(false); // pipe will start after creating
351+
KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_CREATE_LOCAL_PEER);
352+
} catch (IOException e) {
353+
LOGGER.warn("Cannot create local peer for group {} with peers {}", groupId, peers, e);
354+
throw new ConsensusException(e);
355+
} finally {
356+
stateMachineMapLock.readLock().unlock();
357+
}
354358
} finally {
355-
stateMachineMapLock.readLock().unlock();
356359
lock.unlock();
357360
}
358361
}
@@ -362,21 +365,24 @@ public void deleteLocalPeer(ConsensusGroupId groupId) throws ConsensusException
362365
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.BEFORE_DELETE);
363366
Lock lock =
364367
consensusGroupIdReentrantLockMap.computeIfAbsent(groupId, key -> new ReentrantLock());
368+
lock.lock();
365369
try {
366-
lock.lock();
367370
stateMachineMapLock.readLock().lock();
368-
if (!stateMachineMap.containsKey(groupId)) {
369-
throw new ConsensusGroupNotExistException(groupId);
370-
}
371+
try {
372+
if (!stateMachineMap.containsKey(groupId)) {
373+
throw new ConsensusGroupNotExistException(groupId);
374+
}
371375

372-
final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
373-
consensus.clear();
374-
stateMachineMap.remove(groupId);
376+
final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
377+
consensus.clear();
378+
stateMachineMap.remove(groupId);
375379

376-
FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
377-
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
380+
FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
381+
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
382+
} finally {
383+
stateMachineMapLock.readLock().unlock();
384+
}
378385
} finally {
379-
stateMachineMapLock.readLock().unlock();
380386
lock.unlock();
381387
consensusGroupIdReentrantLockMap.remove(groupId);
382388
}

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensusServerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,9 @@ public synchronized void checkConsensusPipe(Map<ConsensusPipeName, PipeStatus> e
309309
}
310310

311311
public TSStatus write(IConsensusRequest request) {
312+
stateMachineLock.lock();
312313
try {
313314
long consensusWriteStartTime = System.nanoTime();
314-
stateMachineLock.lock();
315315
long getStateMachineLockTime = System.nanoTime();
316316
// statistic the time of acquiring stateMachine lock
317317
pipeConsensusServerMetrics.recordGetStateMachineLockTime(
@@ -335,9 +335,9 @@ public TSStatus write(IConsensusRequest request) {
335335
}
336336

337337
public TSStatus writeOnFollowerReplica(IConsensusRequest request) {
338+
stateMachineLock.lock();
338339
try {
339340
long consensusWriteStartTime = System.nanoTime();
340-
stateMachineLock.lock();
341341
long getStateMachineLockTime = System.nanoTime();
342342
// statistic the time of acquiring stateMachine lock
343343
pipeConsensusServerMetrics.recordGetStateMachineLockTime(

iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,17 @@ public long getCurrentCommitIndex(ConsensusPipeName consensusPipeName) {
7777

7878
public void addConsensusPipeConnector(
7979
ConsensusPipeName consensusPipeName, ConsensusPipeConnector consensusPipeConnector) {
80+
lock.lock();
8081
try {
81-
lock.lock();
8282
consensusPipe2ConnectorMap.put(consensusPipeName, consensusPipeConnector);
8383
} finally {
8484
lock.unlock();
8585
}
8686
}
8787

8888
public void removeConsensusPipeConnector(ConsensusPipeName consensusPipeName) {
89+
lock.lock();
8990
try {
90-
lock.lock();
9191
consensusPipe2ConnectorMap.remove(consensusPipeName);
9292
} finally {
9393
lock.unlock();
@@ -100,8 +100,8 @@ public void removeConsensusPipeConnector(ConsensusPipeName consensusPipeName) {
100100
* has left to synchronize.
101101
*/
102102
public long calculateSyncLag() {
103+
lock.lock();
103104
try {
104-
lock.lock();
105105
// if there isn't a consensus pipe task, the syncLag is 0
106106
if (consensusPipe2ConnectorMap.isEmpty()) {
107107
return 0;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1055,8 +1055,8 @@ public PipeConsensusTsFileWriter borrowCorrespondingWriter(TCommitId commitId) {
10551055
// buffer for it.
10561056
if (!tsFileWriter.isPresent()) {
10571057
// We should synchronously find the idle writer to avoid concurrency issues.
1058+
lock.lock();
10581059
try {
1059-
lock.lock();
10601060
// We need to check tsFileWriter.isPresent() here. Since there may be both retry-sent
10611061
// tsfile
10621062
// events and real-time-sent tsfile events, causing the receiver's tsFileWriter load to

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -566,18 +566,21 @@ public TSStatus invalidateLastCache(final String database) {
566566
@Override
567567
public TSStatus invalidateSchemaCache(final TInvalidateCacheReq req) {
568568
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
569-
TreeDeviceSchemaCacheManager.getInstance().takeWriteLock();
570569
try {
571-
final String database = req.getFullPath();
572-
// req.getFullPath() is a database path
573-
ClusterTemplateManager.getInstance().invalid(database);
574-
// clear table related cache
575-
DataNodeTableCache.getInstance().invalid(database);
576-
tableDeviceSchemaCache.invalidate(database);
577-
LOGGER.info("Schema cache of {} has been invalidated", req.getFullPath());
578-
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
570+
TreeDeviceSchemaCacheManager.getInstance().takeWriteLock();
571+
try {
572+
final String database = req.getFullPath();
573+
// req.getFullPath() is a database path
574+
ClusterTemplateManager.getInstance().invalid(database);
575+
// clear table related cache
576+
DataNodeTableCache.getInstance().invalid(database);
577+
tableDeviceSchemaCache.invalidate(database);
578+
LOGGER.info("Schema cache of {} has been invalidated", req.getFullPath());
579+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
580+
} finally {
581+
TreeDeviceSchemaCacheManager.getInstance().releaseWriteLock();
582+
}
579583
} finally {
580-
TreeDeviceSchemaCacheManager.getInstance().releaseWriteLock();
581584
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
582585
}
583586
}
@@ -643,14 +646,17 @@ public TSStatus rollbackSchemaBlackList(TRollbackSchemaBlackListReq req) {
643646
}
644647

645648
@Override
646-
public TSStatus invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req) {
647-
TreeDeviceSchemaCacheManager cache = TreeDeviceSchemaCacheManager.getInstance();
649+
public TSStatus invalidateMatchedSchemaCache(final TInvalidateMatchedSchemaCacheReq req) {
650+
final TreeDeviceSchemaCacheManager cache = TreeDeviceSchemaCacheManager.getInstance();
648651
DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
649-
cache.takeWriteLock();
650652
try {
651-
cache.invalidate(PathPatternTree.deserialize(req.pathPatternTree).getAllPathPatterns(true));
653+
cache.takeWriteLock();
654+
try {
655+
cache.invalidate(PathPatternTree.deserialize(req.pathPatternTree).getAllPathPatterns(true));
656+
} finally {
657+
cache.releaseWriteLock();
658+
}
652659
} finally {
653-
cache.releaseWriteLock();
654660
DataNodeSchemaLockManager.getInstance().releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION);
655661
}
656662
return RpcUtils.SUCCESS_STATUS;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ private String getDatabaseName(final IDeviceID deviceID) {
197197
* @return {@code true} if this database exists
198198
*/
199199
private boolean containsDatabase(final String database) {
200+
databaseCacheLock.readLock().lock();
200201
try {
201-
databaseCacheLock.readLock().lock();
202202
return databaseCache.contains(database);
203203
} finally {
204204
databaseCacheLock.readLock().unlock();
@@ -394,8 +394,8 @@ private void getDatabaseMap(
394394
final DatabaseCacheResult<?, ?> result,
395395
final List<IDeviceID> deviceIDs,
396396
final boolean failFast) {
397+
databaseCacheLock.readLock().lock();
397398
try {
398-
databaseCacheLock.readLock().lock();
399399
// reset result before try
400400
result.reset();
401401
boolean status = true;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FixedPriorityBlockingQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ public boolean isEmpty() {
158158

159159
private void pollLast() {
160160
final ReentrantLock lock = this.lock;
161-
lock.lock();
162161
T element = null;
162+
lock.lock();
163163
try {
164164
element = queue.pollLast();
165165
} finally {

0 commit comments

Comments
 (0)