Skip to content

Commit 0ecaf37

Browse files
authored
Revert "Revert Pipe: Added rate limiter for tsFile sending" (apache#16288)
This reverts commit 73a5048.
1 parent cd15077 commit 0ecaf37

File tree

23 files changed

+291
-92
lines changed

23 files changed

+291
-92
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionAirGapSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ protected byte[] generateHandShakeV2Payload() throws IOException {
8888
return PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferBytes(params);
8989
}
9090

91+
@Override
92+
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
93+
// Do nothing
94+
}
95+
9196
@Override
9297
protected boolean mayNeedHandshakeWhenFail() {
9398
return true;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
103103
return PipeTransferConfigSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad);
104104
}
105105

106+
@Override
107+
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
108+
// Do nothing
109+
}
110+
106111
@Override
107112
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
108113
throw new UnsupportedOperationException(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,11 @@
5656
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_DEFAULT_VALUE;
5757
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
5858
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE;
59+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE;
60+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY;
5961
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
6062
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
63+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY;
6164

6265
public class PipeDataNodeTaskBuilder {
6366

@@ -184,10 +187,6 @@ private void checkConflict(
184187
|| extractorModeValue.equalsIgnoreCase(EXTRACTOR_MODE_QUERY_VALUE);
185188
}
186189

187-
if (!insertionDeletionListeningOptionPair.right
188-
&& !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
189-
return;
190-
}
191190
} catch (final IllegalPathException e) {
192191
LOGGER.warn(
193192
"PipeDataNodeTaskBuilder failed to parse 'inclusion' and 'exclusion' parameters: {}",
@@ -196,29 +195,50 @@ private void checkConflict(
196195
return;
197196
}
198197

199-
final Boolean isRealtime =
200-
connectorParameters.getBooleanByKeys(
201-
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
202-
PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
203-
if (isRealtime == null) {
204-
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, "false");
205-
if (insertionDeletionListeningOptionPair.right) {
206-
LOGGER.info(
207-
"PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion.");
208-
} else {
209-
LOGGER.info(
210-
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion.");
198+
if (insertionDeletionListeningOptionPair.right
199+
|| shouldTerminatePipeOnAllHistoricalEventsConsumed) {
200+
final Boolean isRealtime =
201+
connectorParameters.getBooleanByKeys(
202+
PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY,
203+
PipeSinkConstant.SINK_REALTIME_FIRST_KEY);
204+
if (isRealtime == null) {
205+
connectorParameters.addAttribute(PipeSinkConstant.CONNECTOR_REALTIME_FIRST_KEY, "false");
206+
if (insertionDeletionListeningOptionPair.right) {
207+
LOGGER.info(
208+
"PipeDataNodeTaskBuilder: When 'inclusion' contains 'data.delete', 'realtime-first' is defaulted to 'false' to prevent sync issues after deletion.");
209+
} else {
210+
LOGGER.info(
211+
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' is defaulted to 'false' to prevent premature halt before transfer completion.");
212+
}
213+
} else if (isRealtime) {
214+
if (insertionDeletionListeningOptionPair.right) {
215+
LOGGER.warn(
216+
"PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion.");
217+
} else {
218+
LOGGER.warn(
219+
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion.");
220+
}
211221
}
212-
return;
213222
}
214223

215-
if (isRealtime) {
216-
if (insertionDeletionListeningOptionPair.right) {
217-
LOGGER.warn(
218-
"PipeDataNodeTaskBuilder: When 'inclusion' includes 'data.delete', 'realtime-first' set to 'true' may result in data synchronization issues after deletion.");
219-
} else {
224+
final boolean isRealtimeEnabled =
225+
extractorParameters.getBooleanOrDefault(
226+
Arrays.asList(EXTRACTOR_REALTIME_ENABLE_KEY, SOURCE_REALTIME_ENABLE_KEY),
227+
EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE);
228+
229+
if (isRealtimeEnabled && !shouldTerminatePipeOnAllHistoricalEventsConsumed) {
230+
final Boolean enableSendTsFileLimit =
231+
connectorParameters.getBooleanByKeys(
232+
PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT,
233+
PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT);
234+
235+
if (enableSendTsFileLimit == null) {
236+
connectorParameters.addAttribute(PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT, "true");
237+
LOGGER.info(
238+
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, we enable rate limiter in sending tsfile by default to reserve disk and network IO for realtime sending.");
239+
} else if (!enableSendTsFileLimit) {
220240
LOGGER.warn(
221-
"PipeDataNodeTaskBuilder: When extractor uses snapshot model, 'realtime-first' set to 'true' may cause prevent premature halt before transfer completion.");
241+
"PipeDataNodeTaskBuilder: When the realtime sync is enabled, not enabling the rate limiter in sending tsfile may introduce delay for realtime sending.");
222242
}
223243
}
224244
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
2727
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
2828
import org.apache.iotdb.metrics.AbstractMetricService;
29+
import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
2930
import org.apache.iotdb.metrics.metricsets.IMetricSet;
31+
import org.apache.iotdb.metrics.type.Counter;
3032
import org.apache.iotdb.metrics.utils.MetricLevel;
3133
import org.apache.iotdb.metrics.utils.MetricType;
3234

@@ -41,6 +43,7 @@ public class PipeResourceMetrics implements IMetricSet {
4143

4244
private static final String PIPE_TOTAL_MEMORY = "PipeTotalMemory";
4345

46+
private Counter diskIOCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
4447
private static final String PIPE_FLOATING_MEMORY = "PipeFloatingMemory";
4548

4649
//////////////////////////// bindTo & unbindFrom (metric framework) ////////////////////////////
@@ -96,6 +99,10 @@ public void bindTo(final AbstractMetricService metricService) {
9699
MetricLevel.IMPORTANT,
97100
PipeDataNodeResourceManager.ref(),
98101
PipePhantomReferenceManager::getPhantomReferenceCount);
102+
// tsFile send rate
103+
diskIOCounter =
104+
metricService.getOrCreateCounter(
105+
Metric.PIPE_TSFILE_SEND_DISK_IO.toString(), MetricLevel.IMPORTANT);
99106
}
100107

101108
@Override
@@ -130,6 +137,12 @@ public void unbindFrom(final AbstractMetricService metricService) {
130137
metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_LINKED_TSFILE_SIZE.toString());
131138
// phantom reference count
132139
metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
140+
141+
metricService.remove(MetricType.RATE, Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
142+
}
143+
144+
public void recordDiskIO(final long bytes) {
145+
diskIOCounter.inc(bytes);
133146
}
134147

135148
//////////////////////////// singleton ////////////////////////////

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
24+
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
2425
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
2526
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
2627
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2728
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2829
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
2930
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
31+
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
3032
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
3133
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq;
3234
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
@@ -39,6 +41,8 @@
3941
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
4042
import org.apache.iotdb.pipe.api.annotation.TableModel;
4143
import org.apache.iotdb.pipe.api.annotation.TreeModel;
44+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
45+
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
4246
import org.apache.iotdb.pipe.api.event.Event;
4347
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4448
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -51,14 +55,33 @@
5155

5256
import java.io.File;
5357
import java.io.IOException;
58+
import java.util.Arrays;
5459
import java.util.Objects;
5560

61+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
62+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
63+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
64+
5665
@TreeModel
5766
@TableModel
5867
public class IoTDBDataRegionAirGapSink extends IoTDBDataNodeAirGapSink {
5968

6069
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionAirGapSink.class);
6170

71+
private boolean enableSendTsFileLimit;
72+
73+
@Override
74+
public void customize(
75+
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
76+
throws Exception {
77+
super.customize(parameters, configuration);
78+
79+
enableSendTsFileLimit =
80+
parameters.getBooleanOrDefault(
81+
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
82+
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
83+
}
84+
6285
@Override
6386
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
6487
// PipeProcessor can change the type of TabletInsertionEvent
@@ -353,6 +376,14 @@ private void doTransfer(
353376
}
354377
}
355378

379+
@Override
380+
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
381+
PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
382+
if (enableSendTsFileLimit) {
383+
TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
384+
}
385+
}
386+
356387
@Override
357388
protected byte[] getTransferSingleFilePieceBytes(
358389
final String fileName, final long position, final byte[] payLoad) throws IOException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBSchemaRegionAirGapSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ private void doTransfer(
197197
}
198198
}
199199

200+
@Override
201+
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
202+
// Do nothing
203+
}
204+
200205
@Override
201206
protected byte[] getTransferSingleFilePieceBytes(
202207
final String fileName, final long position, final byte[] payLoad) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@
8181
import java.util.concurrent.atomic.AtomicBoolean;
8282
import java.util.concurrent.atomic.AtomicInteger;
8383

84+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
85+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
8486
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE;
8587
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY;
88+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
8689
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_ENABLE_KEY;
8790
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY;
8891
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY;
@@ -121,6 +124,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
121124
private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers =
122125
new ConcurrentHashMap<>();
123126

127+
private boolean enableSendTsFileLimit;
128+
124129
@Override
125130
public void validate(final PipeParameterValidator validator) throws Exception {
126131
super.validate(validator);
@@ -176,6 +181,11 @@ public void customize(
176181
if (isTabletBatchModeEnabled) {
177182
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
178183
}
184+
185+
enableSendTsFileLimit =
186+
parameters.getBooleanOrDefault(
187+
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
188+
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
179189
}
180190

181191
@Override
@@ -707,6 +717,10 @@ public void addFailureEventsToRetryQueue(final Iterable<EnrichedEvent> events) {
707717
events.forEach(this::addFailureEventToRetryQueue);
708718
}
709719

720+
public boolean isEnableSendTsFileLimit() {
721+
return enableSendTsFileLimit;
722+
}
723+
710724
//////////////////////////// Operations for close ////////////////////////////
711725

712726
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@
2323
import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
2424
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2525
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
26+
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
2627
import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp;
2728
import org.apache.iotdb.commons.utils.RetryUtils;
2829
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
30+
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
2931
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
3032
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
3133
import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock;
@@ -170,6 +172,10 @@ public void transfer(
170172
client.setShouldReturnSelf(false);
171173
client.setTimeoutDynamically(clientManager.getConnectionTimeout());
172174

175+
PipeResourceMetrics.getInstance().recordDiskIO(readFileBufferSize);
176+
if (connector.isEnableSendTsFileLimit()) {
177+
TsFileSendRateLimiter.getInstance().acquire(readFileBufferSize);
178+
}
173179
final int readLength = reader.read(readBuffer);
174180

175181
if (readLength == -1) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2424
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2525
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
26+
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
2627
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
2728
import org.apache.iotdb.commons.utils.RetryUtils;
2829
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -31,6 +32,7 @@
3132
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
3233
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
3334
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
35+
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
3436
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
3537
import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager;
3638
import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch;
@@ -69,18 +71,24 @@
6971
import java.io.File;
7072
import java.io.IOException;
7173
import java.nio.file.NoSuchFileException;
74+
import java.util.Arrays;
7275
import java.util.Collections;
7376
import java.util.List;
7477
import java.util.Map;
7578
import java.util.Objects;
7679

80+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT;
81+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE;
82+
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_ENABLE_SEND_TSFILE_LIMIT;
83+
7784
@TreeModel
7885
@TableModel
7986
public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink {
8087

8188
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataRegionSyncSink.class);
8289

8390
private PipeTransferBatchReqBuilder tabletBatchBuilder;
91+
private boolean enableSendTsFileLimit;
8492

8593
@Override
8694
public void customize(
@@ -92,6 +100,11 @@ public void customize(
92100
if (isTabletBatchModeEnabled) {
93101
tabletBatchBuilder = new PipeTransferBatchReqBuilder(parameters);
94102
}
103+
104+
enableSendTsFileLimit =
105+
parameters.getBooleanOrDefault(
106+
Arrays.asList(SINK_ENABLE_SEND_TSFILE_LIMIT, CONNECTOR_ENABLE_SEND_TSFILE_LIMIT),
107+
CONNECTOR_ENABLE_SEND_TSFILE_LIMIT_DEFAULT_VALUE);
95108
}
96109

97110
@Override
@@ -106,6 +119,14 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
106119
return PipeTransferTsFilePieceWithModReq.toTPipeTransferReq(fileName, position, payLoad);
107120
}
108121

122+
@Override
123+
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
124+
PipeResourceMetrics.getInstance().recordDiskIO(requiredBytes);
125+
if (enableSendTsFileLimit) {
126+
TsFileSendRateLimiter.getInstance().acquire(requiredBytes);
127+
}
128+
}
129+
109130
@Override
110131
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
111132
// PipeProcessor can change the type of TabletInsertionEvent

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBSchemaRegionSink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,4 +244,9 @@ protected PipeTransferFilePieceReq getTransferMultiFilePieceReq(
244244
final String fileName, final long position, final byte[] payLoad) throws IOException {
245245
return PipeTransferSchemaSnapshotPieceReq.toTPipeTransferReq(fileName, position, payLoad);
246246
}
247+
248+
@Override
249+
protected void mayLimitRateAndRecordIO(final long requiredBytes) {
250+
// Do nothing
251+
}
247252
}

0 commit comments

Comments
 (0)