Skip to content

Commit cf75abf

Browse files
authored
Fix flush command stack problem and clean old code in DataRegion (apache#14027)
* Try to fix flush stack problem and clean old code in DataRegion * clean code * Fix flush command stuck * fix ArrayIndexOutOfBoundsException * refactor flush manager * fix dataregion UT
1 parent 3aef6ad commit cf75abf

File tree

10 files changed

+203
-791
lines changed

10 files changed

+203
-791
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,13 @@
4545
import org.apache.iotdb.consensus.ConsensusFactory;
4646
import org.apache.iotdb.db.conf.IoTDBConfig;
4747
import org.apache.iotdb.db.conf.IoTDBDescriptor;
48-
import org.apache.iotdb.db.consensus.statemachine.dataregion.DataExecutionVisitor;
4948
import org.apache.iotdb.db.exception.DataRegionException;
5049
import org.apache.iotdb.db.exception.LoadReadOnlyException;
5150
import org.apache.iotdb.db.exception.StorageEngineException;
5251
import org.apache.iotdb.db.exception.TsFileProcessorException;
5352
import org.apache.iotdb.db.exception.WriteProcessRejectException;
5453
import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException;
5554
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache;
56-
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
5755
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
5856
import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
5957
import org.apache.iotdb.db.service.metrics.WritingMetrics;
@@ -68,7 +66,6 @@
6866
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
6967
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
7068
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
71-
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
7269
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
7370
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
7471
import org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
@@ -93,6 +90,7 @@
9390
import java.util.LinkedList;
9491
import java.util.List;
9592
import java.util.Map;
93+
import java.util.Objects;
9694
import java.util.Properties;
9795
import java.util.concurrent.Callable;
9896
import java.util.concurrent.ConcurrentHashMap;
@@ -234,7 +232,7 @@ private void asyncRecover(List<Future<Void>> futures) {
234232
for (DataRegionId dataRegionId : entry.getValue()) {
235233
Callable<Void> recoverDataRegionTask =
236234
() -> {
237-
DataRegion dataRegion = null;
235+
DataRegion dataRegion;
238236
try {
239237
dataRegion = buildNewDataRegion(sgName, dataRegionId);
240238
} catch (DataRegionException e) {
@@ -268,7 +266,7 @@ public Map<String, List<DataRegionId>> getLocalDataRegionInfo() {
268266
}
269267
String sgName = sgDir.getName();
270268
List<DataRegionId> dataRegionIdList = new ArrayList<>();
271-
for (File dataRegionDir : sgDir.listFiles()) {
269+
for (File dataRegionDir : Objects.requireNonNull(sgDir.listFiles())) {
272270
if (!dataRegionDir.isDirectory()) {
273271
continue;
274272
}
@@ -446,21 +444,21 @@ public ServiceType getID() {
446444
* build a new data region
447445
*
448446
* @param dataRegionId data region id e.g. 1
449-
* @param logicalStorageGroupName database name e.g. root.sg1
447+
* @param databaseName database name e.g. root.sg1
450448
*/
451-
public DataRegion buildNewDataRegion(String logicalStorageGroupName, DataRegionId dataRegionId)
449+
public DataRegion buildNewDataRegion(String databaseName, DataRegionId dataRegionId)
452450
throws DataRegionException {
453451
DataRegion dataRegion;
454452
LOGGER.info(
455453
"construct a data region instance, the database is {}, Thread is {}",
456-
logicalStorageGroupName,
454+
databaseName,
457455
Thread.currentThread().getId());
458456
dataRegion =
459457
new DataRegion(
460-
systemDir + File.separator + logicalStorageGroupName,
458+
systemDir + File.separator + databaseName,
461459
String.valueOf(dataRegionId.getId()),
462460
fileFlushPolicy,
463-
logicalStorageGroupName);
461+
databaseName);
464462
WRITING_METRICS.createFlushingMemTableStatusMetrics(dataRegionId);
465463
WRITING_METRICS.createDataRegionMemoryCostMetrics(dataRegion);
466464
WRITING_METRICS.createActiveMemtableCounterMetrics(dataRegionId);
@@ -469,11 +467,6 @@ public DataRegion buildNewDataRegion(String logicalStorageGroupName, DataRegionI
469467
return dataRegion;
470468
}
471469

472-
/** Write data into DataRegion. For standalone mode only. */
473-
public TSStatus write(DataRegionId groupId, PlanNode planNode) {
474-
return planNode.accept(new DataExecutionVisitor(), dataRegionMap.get(groupId));
475-
}
476-
477470
/** This function is just for unit test. */
478471
@TestOnly
479472
public synchronized void reset() {
@@ -513,29 +506,31 @@ public void forceCloseAllProcessor() throws TsFileProcessorException {
513506
checkResults(tasks, "Failed to force close processor.");
514507
}
515508

516-
public void closeStorageGroupProcessor(String storageGroupPath, boolean isSeq) {
509+
public void syncCloseProcessorsInDatabase(String databaseName) {
517510
List<Future<Void>> tasks = new ArrayList<>();
518511
for (DataRegion dataRegion : dataRegionMap.values()) {
519-
if (dataRegion.getDatabaseName().equals(storageGroupPath)) {
520-
if (isSeq) {
521-
for (TsFileProcessor tsFileProcessor : dataRegion.getWorkSequenceTsFileProcessors()) {
522-
tasks.add(
523-
cachedThreadPool.submit(
524-
() -> {
525-
dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
526-
return null;
527-
}));
528-
}
529-
} else {
530-
for (TsFileProcessor tsFileProcessor : dataRegion.getWorkUnsequenceTsFileProcessors()) {
531-
tasks.add(
532-
cachedThreadPool.submit(
533-
() -> {
534-
dataRegion.syncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
535-
return null;
536-
}));
537-
}
538-
}
512+
if (dataRegion != null && dataRegion.getDatabaseName().equals(databaseName)) {
513+
tasks.add(
514+
cachedThreadPool.submit(
515+
() -> {
516+
dataRegion.syncCloseAllWorkingTsFileProcessors();
517+
return null;
518+
}));
519+
}
520+
}
521+
checkResults(tasks, "Failed to sync close processor.");
522+
}
523+
524+
public void syncCloseProcessorsInDatabase(String databaseName, boolean isSeq) {
525+
List<Future<Void>> tasks = new ArrayList<>();
526+
for (DataRegion dataRegion : dataRegionMap.values()) {
527+
if (dataRegion.getDatabaseName().equals(databaseName)) {
528+
tasks.add(
529+
cachedThreadPool.submit(
530+
() -> {
531+
dataRegion.syncCloseWorkingTsFileProcessors(isSeq);
532+
return null;
533+
}));
539534
}
540535
}
541536
checkResults(tasks, "Failed to close database processor.");
@@ -638,13 +633,12 @@ public void operateFlush(TFlushReq req) {
638633
StorageEngine.getInstance().syncCloseAllProcessor();
639634
WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
640635
} else {
641-
for (String storageGroup : req.storageGroups) {
636+
for (String databaseName : req.storageGroups) {
642637
if (req.isSeq == null) {
643-
StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, true);
644-
StorageEngine.getInstance().closeStorageGroupProcessor(storageGroup, false);
638+
StorageEngine.getInstance().syncCloseProcessorsInDatabase(databaseName);
645639
} else {
646640
StorageEngine.getInstance()
647-
.closeStorageGroupProcessor(storageGroup, Boolean.parseBoolean(req.isSeq));
641+
.syncCloseProcessorsInDatabase(databaseName, Boolean.parseBoolean(req.isSeq));
648642
}
649643
}
650644
}
@@ -698,15 +692,15 @@ public TSStatus setConfiguration(TSetConfigurationReq req) {
698692

699693
/**
700694
* Add a listener to listen flush start/end events. Notice that this addition only applies to
701-
* TsFileProcessors created afterwards.
695+
* TsFileProcessors created afterward.
702696
*/
703697
public void registerFlushListener(FlushListener listener) {
704698
customFlushListeners.add(listener);
705699
}
706700

707701
/**
708702
* Add a listener to listen file close events. Notice that this addition only applies to
709-
* TsFileProcessors created afterwards.
703+
* TsFileProcessors created afterward.
710704
*/
711705
public void registerCloseFileListener(CloseFileListener listener) {
712706
customCloseFileListeners.add(listener);
@@ -723,24 +717,24 @@ private void makeSureNoOldRegion(DataRegionId regionId) {
723717

724718
// When registering a new region, the coordinator needs to register the corresponding region with
725719
// the local storage before adding the corresponding consensusGroup to the consensus layer
726-
public DataRegion createDataRegion(DataRegionId regionId, String sg) throws DataRegionException {
720+
public void createDataRegion(DataRegionId regionId, String databaseName)
721+
throws DataRegionException {
727722
makeSureNoOldRegion(regionId);
728723
AtomicReference<DataRegionException> exceptionAtomicReference = new AtomicReference<>(null);
729-
DataRegion dataRegion =
730-
dataRegionMap.computeIfAbsent(
731-
regionId,
732-
x -> {
733-
try {
734-
return buildNewDataRegion(sg, x);
735-
} catch (DataRegionException e) {
736-
exceptionAtomicReference.set(e);
737-
}
738-
return null;
739-
});
724+
dataRegionMap.computeIfAbsent(
725+
regionId,
726+
region -> {
727+
try {
728+
return buildNewDataRegion(databaseName, region);
729+
} catch (DataRegionException e) {
730+
exceptionAtomicReference.set(e);
731+
}
732+
return null;
733+
});
734+
740735
if (exceptionAtomicReference.get() != null) {
741736
throw exceptionAtomicReference.get();
742737
}
743-
return dataRegion;
744738
}
745739

746740
public void deleteDataRegion(DataRegionId regionId) {
@@ -966,7 +960,7 @@ public TSStatus executeLoadCommand(
966960
return status;
967961
}
968962

969-
/** reboot timed flush sequence/unsequence memetable thread */
963+
/** reboot timed flush sequence/unsequence memtable thread */
970964
public void rebootTimedService() throws ShutdownException {
971965
LOGGER.info("Start rebooting all timed service.");
972966

0 commit comments

Comments
 (0)