Skip to content

Commit 08ab2c9

Browse files
authored
Pipe: Optimized the degrading logger & Deleted useless UT & Copied some historical filter logic from dev/1.3 (#16019)
* logger * fix-ut * Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
1 parent f616e34 commit 08ab2c9

File tree

4 files changed

+22
-72
lines changed

4 files changed

+22
-72
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
5252
import org.apache.iotdb.db.storageengine.StorageEngine;
5353
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
54+
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
5455
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
5556
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
5657
import org.apache.iotdb.db.utils.DateTimeUtils;
@@ -387,21 +388,6 @@ public void customize(
387388
}
388389
}
389390

390-
private void flushDataRegionAllTsFiles() {
391-
final DataRegion dataRegion =
392-
StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
393-
if (Objects.isNull(dataRegion)) {
394-
return;
395-
}
396-
397-
dataRegion.writeLock("Pipe: create historical TsFile extractor");
398-
try {
399-
dataRegion.syncCloseAllWorkingTsFileProcessors();
400-
} finally {
401-
dataRegion.writeUnlock();
402-
}
403-
}
404-
405391
/**
406392
* IoTV2 will only resend event that contains un-replicated local write data. So we only extract
407393
* ProgressIndex containing local writes for comparison to prevent misjudgment on whether
@@ -563,6 +549,9 @@ private void extractTsFiles(
563549
// Some resource may not be closed due to the control of
564550
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
565551
!resource.isClosed()
552+
&& Optional.ofNullable(resource.getProcessor())
553+
.map(TsFileProcessor::alreadyMarkedClosing)
554+
.orElse(true)
566555
|| mayTsFileContainUnprocessedData(resource)
567556
&& isTsFileResourceOverlappedWithTimeRange(resource)
568557
&& mayTsFileResourceOverlappedWithPattern(resource)))
@@ -585,6 +574,9 @@ && mayTsFileResourceOverlappedWithPattern(resource)))
585574
// Some resource may not be closed due to the control of
586575
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them.
587576
!resource.isClosed()
577+
&& Optional.ofNullable(resource.getProcessor())
578+
.map(TsFileProcessor::alreadyMarkedClosing)
579+
.orElse(true)
588580
|| mayTsFileContainUnprocessedData(resource)
589581
&& isTsFileResourceOverlappedWithTimeRange(resource)
590582
&& mayTsFileResourceOverlappedWithPattern(resource)))

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
3131
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
3232
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
33+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeOperator;
34+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
3335
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
3436
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
3537
import org.apache.iotdb.pipe.api.event.Event;
@@ -40,6 +42,7 @@
4042
import org.slf4j.LoggerFactory;
4143

4244
import java.util.Objects;
45+
import java.util.Optional;
4346

4447
public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegionExtractor {
4548

@@ -218,13 +221,18 @@ private boolean mayInsertNodeMemoryReachDangerousThreshold(final PipeRealtimeEve
218221
final boolean mayInsertNodeMemoryReachDangerousThreshold =
219222
floatingMemoryUsageInByte * pipeCount >= totalFloatingMemorySizeInBytes;
220223
if (mayInsertNodeMemoryReachDangerousThreshold && event.mayExtractorUseTablets(this)) {
224+
final PipeDataNodeRemainingEventAndTimeOperator operator =
225+
PipeDataNodeSinglePipeMetrics.getInstance().remainingEventAndTimeOperatorMap.get(pipeID);
221226
LOGGER.info(
222-
"Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold {}",
227+
"Pipe task {}@{} canNotUseTabletAnyMore(1) for tsFile {}: The memory usage of the insert node {} has reached the dangerous threshold of single pipe {}, event count: {}",
223228
pipeName,
224229
dataRegionId,
225230
event.getTsFileEpoch().getFilePath(),
226-
floatingMemoryUsageInByte * pipeCount,
227-
totalFloatingMemorySizeInBytes);
231+
floatingMemoryUsageInByte,
232+
totalFloatingMemorySizeInBytes / pipeCount,
233+
Optional.ofNullable(operator)
234+
.map(PipeDataNodeRemainingEventAndTimeOperator::getInsertNodeEventCount)
235+
.orElse(0));
228236
}
229237
return mayInsertNodeMemoryReachDangerousThreshold;
230238
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ public long getRemainingNonHeartbeatEvents() {
116116
return remainingEvents >= 0 ? remainingEvents : 0;
117117
}
118118

119+
public int getInsertNodeEventCount() {
120+
return insertNodeEventCount.get();
121+
}
122+
119123
long getRemainingEvents() {
120124
final long remainingEvents =
121125
tsfileEventCount.get()

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeConnectorTest.java

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
2727
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
2828
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
29-
import org.apache.iotdb.pipe.api.exception.PipeParameterNotValidException;
3029

3130
import org.junit.Assert;
3231
import org.junit.Test;
@@ -35,24 +34,6 @@
3534

3635
public class PipeConnectorTest {
3736

38-
@Test(expected = PipeParameterNotValidException.class)
39-
public void testIoTDBLegacyPipeConnectorToSelf() throws Exception {
40-
try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
41-
connector.validate(
42-
new PipeParameterValidator(
43-
new PipeParameters(
44-
new HashMap<String, String>() {
45-
{
46-
put(
47-
PipeConnectorConstant.CONNECTOR_KEY,
48-
BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName());
49-
put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1");
50-
put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, "6667");
51-
}
52-
})));
53-
}
54-
}
55-
5637
@Test
5738
public void testIoTDBLegacyPipeConnectorToOthers() {
5839
try (IoTDBLegacyPipeConnector connector = new IoTDBLegacyPipeConnector()) {
@@ -73,24 +54,6 @@ public void testIoTDBLegacyPipeConnectorToOthers() {
7354
}
7455
}
7556

76-
@Test(expected = PipeParameterNotValidException.class)
77-
public void testIoTDBThriftSyncConnectorToSelf() throws Exception {
78-
try (IoTDBDataRegionSyncConnector connector = new IoTDBDataRegionSyncConnector()) {
79-
connector.validate(
80-
new PipeParameterValidator(
81-
new PipeParameters(
82-
new HashMap<String, String>() {
83-
{
84-
put(
85-
PipeConnectorConstant.CONNECTOR_KEY,
86-
BuiltinPipePlugin.IOTDB_THRIFT_SYNC_CONNECTOR.getPipePluginName());
87-
put(PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.1");
88-
put(PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY, "6667");
89-
}
90-
})));
91-
}
92-
}
93-
9457
@Test
9558
public void testIoTDBThriftSyncConnectorToOthers() {
9659
try (IoTDBDataRegionSyncConnector connector = new IoTDBDataRegionSyncConnector()) {
@@ -111,23 +74,6 @@ public void testIoTDBThriftSyncConnectorToOthers() {
11174
}
11275
}
11376

114-
@Test(expected = PipeParameterNotValidException.class)
115-
public void testIoTDBThriftAsyncConnectorToSelf() throws Exception {
116-
try (IoTDBDataRegionAsyncConnector connector = new IoTDBDataRegionAsyncConnector()) {
117-
connector.validate(
118-
new PipeParameterValidator(
119-
new PipeParameters(
120-
new HashMap<String, String>() {
121-
{
122-
put(
123-
PipeConnectorConstant.CONNECTOR_KEY,
124-
BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName());
125-
put(PipeConnectorConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667");
126-
}
127-
})));
128-
}
129-
}
130-
13177
@Test
13278
public void testIoTDBThriftAsyncConnectorToOthers() {
13379
try (IoTDBDataRegionAsyncConnector connector = new IoTDBDataRegionAsyncConnector()) {

0 commit comments

Comments
 (0)