Skip to content

Commit d0fa4a2

Browse files
committed
mm
1 parent 1652232 commit d0fa4a2

File tree

15 files changed

+120
-126
lines changed

15 files changed

+120
-126
lines changed

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ImportTsFileRemotely.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void sendHandshake() {
164164
"Handshake error with target server ip: %s, port: %s, because: %s.",
165165
client.getIpAddress(), client.getPort(), resp.getStatus()));
166166
} else {
167-
client.setTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
167+
client.setTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
168168
IOT_PRINTER.println(
169169
String.format(
170170
"Handshake success. Target server ip: %s, port: %s",
@@ -232,7 +232,7 @@ public void doTransfer(final File tsFile, final File modFile) throws PipeExcepti
232232

233233
private void transferFilePieces(final File file, final boolean isMultiFile)
234234
throws PipeException, IOException {
235-
final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
235+
final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
236236
final byte[] readBuffer = new byte[readFileBufferSize];
237237
long position = 0;
238238
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {
@@ -299,8 +299,7 @@ private void initClient() {
299299
this.client =
300300
new IoTDBSyncClient(
301301
new ThriftClientProperty.Builder()
302-
.setConnectionTimeoutMs(
303-
PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs())
302+
.setConnectionTimeoutMs(PipeConfig.getInstance().getPipeSinkHandshakeTimeoutMs())
304303
.setRpcThriftCompressionEnabled(
305304
PipeConfig.getInstance().isPipeConnectorRPCThriftCompressionEnabled())
306305
.build(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint)
208208
private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException {
209209
return new IoTDBSyncClient(
210210
new ThriftClientProperty.Builder()
211-
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeConnectorHandshakeTimeoutMs())
211+
.setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs())
212212
.setRpcThriftCompressionEnabled(
213213
PIPE_CONFIG.isPipeConnectorRPCThriftCompressionEnabled())
214214
.build(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public IoTDBAirGapReceiver(final Socket socket, final long receiverId) {
7070

7171
@Override
7272
public void runMayThrow() throws Throwable {
73-
socket.setSoTimeout(PipeConfig.getInstance().getPipeConnectorTransferTimeoutMs());
73+
socket.setSoTimeout(PipeConfig.getInstance().getPipeSinkTransferTimeoutMs());
7474
socket.setKeepAlive(true);
7575

7676
LOGGER.info("Pipe air gap receiver {} started. Socket: {}", receiverId, socket);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
5757
import org.apache.iotdb.pipe.api.annotation.TableModel;
5858
import org.apache.iotdb.pipe.api.annotation.TreeModel;
59+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
5960
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
6061
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
6162
import org.apache.iotdb.pipe.api.event.Event;
@@ -143,7 +144,7 @@ public void customize(PipeParameters parameters, PipeConnectorRuntimeConfigurati
143144
// initialize metric components
144145
pipeConsensusSinkMetrics = new PipeConsensusSinkMetrics(this);
145146
PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
146-
.addConsensusPipeConnector(new ConsensusPipeName(consensusPipeName), this);
147+
.addConsensusPipeSink(new ConsensusPipeName(consensusPipeName), this);
147148
MetricService.getInstance().addMetricSet(this.pipeConsensusSinkMetrics);
148149

149150
// In PipeConsensus, one pipeConsensusTask corresponds to a pipeConsensusConnector. Thus,
@@ -700,7 +701,7 @@ public synchronized void close() {
700701
}
701702

702703
PipeConsensusSyncLagManager.getInstance(getConsensusGroupIdStr())
703-
.removeConsensusPipeConnector(new ConsensusPipeName(consensusPipeName));
704+
.removeConsensusPipeSink(new ConsensusPipeName(consensusPipeName));
704705
MetricService.getInstance().removeMetricSet(this.pipeConsensusSinkMetrics);
705706
}
706707

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
5050
import org.apache.iotdb.pipe.api.annotation.TableModel;
5151
import org.apache.iotdb.pipe.api.annotation.TreeModel;
52+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
5253
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
5354
import org.apache.iotdb.pipe.api.event.Event;
5455
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -445,7 +446,7 @@ protected void transferFilePieces(
445446
final TCommitId tCommitId,
446447
final TConsensusGroupId tConsensusGroupId)
447448
throws PipeException, IOException {
448-
final int readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
449+
final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
449450
final byte[] readBuffer = new byte[readFileBufferSize];
450451
long position = 0;
451452
try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public PipeConsensusTsFileInsertionEventHandler(
104104
transferMod = event.isWithMod();
105105
currentFile = transferMod ? modFile : tsFile;
106106

107-
readFileBufferSize = PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
107+
readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
108108
readBuffer = new byte[readFileBufferSize];
109109
position = 0;
110110

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@
5252
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
5353
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
5454
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
55+
import org.apache.iotdb.pipe.api.PipeConnector;
5556
import org.apache.iotdb.pipe.api.annotation.TableModel;
5657
import org.apache.iotdb.pipe.api.annotation.TreeModel;
58+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
5759
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
5860
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
5961
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/pipe/PipeFunctionSupport.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ public class PipeFunctionSupport {
3434
public static void applyNowFunctionToExtractorAttributes(
3535
final Map<String, String> sourceAttributes,
3636
final String sourceKey,
37-
final String sourceKey,
37+
final String extractorKey,
3838
final long currentTime) {
3939
final Pair<String, String> pair =
40-
getExtractorAttributesKeyAndValue(sourceAttributes, sourceKey, sourceKey);
40+
getExtractorAttributesKeyAndValue(sourceAttributes, sourceKey, extractorKey);
4141

4242
if (pair == null) {
4343
return;
@@ -48,7 +48,9 @@ public static void applyNowFunctionToExtractorAttributes(
4848
}
4949

5050
private static Pair<String, String> getExtractorAttributesKeyAndValue(
51-
final Map<String, String> sourceAttributes, final String sourceKey, final String sourceKey) {
51+
final Map<String, String> sourceAttributes,
52+
final String sourceKey,
53+
final String extractorKey) {
5254
String key = sourceKey;
5355
String value = sourceAttributes.get(key);
5456
if (value != null) {
@@ -69,7 +71,7 @@ private static Pair<String, String> getExtractorAttributesKeyAndValue(
6971
return new Pair<>(key, value);
7072
}
7173

72-
key = sourceKey;
74+
key = extractorKey;
7375
value = sourceAttributes.get(key);
7476
if (value != null) {
7577
return new Pair<>(key, value);

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 36 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public class CommonConfig {
268268
private int pipeConnectorHandshakeTimeoutMs = 10 * 1000; // 10 seconds
269269
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
270270
private int pipeConnectorReadFileBufferSize = 5242880; // 5MB
271-
private boolean isPipeConnectorReadFileBufferMemoryControlEnabled = false;
271+
private boolean isPipeSinkReadFileBufferMemoryControlEnabled = false;
272272
private long pipeConnectorRetryIntervalMs = 1000L;
273273
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
274274

@@ -1049,77 +1049,77 @@ public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
10491049
logger.info("pipeSourceMatcherCacheSize is set to {}.", pipeSourceMatcherCacheSize);
10501050
}
10511051

1052-
public int getPipeConnectorHandshakeTimeoutMs() {
1052+
public int getPipeSinkHandshakeTimeoutMs() {
10531053
return pipeConnectorHandshakeTimeoutMs;
10541054
}
10551055

1056-
public void setPipeConnectorHandshakeTimeoutMs(long pipeConnectorHandshakeTimeoutMs) {
1057-
final int fPipeConnectorHandshakeTimeoutMs = this.pipeConnectorHandshakeTimeoutMs;
1056+
public void setPipeSinkHandshakeTimeoutMs(long pipeConnectorHandshakeTimeoutMs) {
1057+
final int fPipeSinkHandshakeTimeoutMs = this.pipeConnectorHandshakeTimeoutMs;
10581058
try {
10591059
this.pipeConnectorHandshakeTimeoutMs = Math.toIntExact(pipeConnectorHandshakeTimeoutMs);
10601060
} catch (ArithmeticException e) {
10611061
this.pipeConnectorHandshakeTimeoutMs = Integer.MAX_VALUE;
10621062
logger.warn(
10631063
"Given pipe sink handshake timeout is too large, set to {} ms.", Integer.MAX_VALUE);
10641064
} finally {
1065-
if (fPipeConnectorHandshakeTimeoutMs != this.pipeConnectorHandshakeTimeoutMs) {
1065+
if (fPipeSinkHandshakeTimeoutMs != this.pipeConnectorHandshakeTimeoutMs) {
10661066
logger.info(
10671067
"pipeConnectorHandshakeTimeoutMs is set to {}.", this.pipeConnectorHandshakeTimeoutMs);
10681068
}
10691069
}
10701070
}
10711071

1072-
public int getPipeConnectorTransferTimeoutMs() {
1072+
public int getPipeSinkTransferTimeoutMs() {
10731073
return pipeConnectorTransferTimeoutMs;
10741074
}
10751075

1076-
public void setPipeConnectorTransferTimeoutMs(long pipeConnectorTransferTimeoutMs) {
1077-
final int fPipeConnectorTransferTimeoutMs = this.pipeConnectorTransferTimeoutMs;
1076+
public void setPipeSinkTransferTimeoutMs(long pipeConnectorTransferTimeoutMs) {
1077+
final int fPipeSinkTransferTimeoutMs = this.pipeConnectorTransferTimeoutMs;
10781078
try {
10791079
this.pipeConnectorTransferTimeoutMs = Math.toIntExact(pipeConnectorTransferTimeoutMs);
10801080
} catch (ArithmeticException e) {
10811081
this.pipeConnectorTransferTimeoutMs = Integer.MAX_VALUE;
10821082
logger.warn(
10831083
"Given pipe sink transfer timeout is too large, set to {} ms.", Integer.MAX_VALUE);
10841084
} finally {
1085-
if (fPipeConnectorTransferTimeoutMs != this.pipeConnectorTransferTimeoutMs) {
1085+
if (fPipeSinkTransferTimeoutMs != this.pipeConnectorTransferTimeoutMs) {
10861086
logger.info("pipeConnectorTransferTimeoutMs is set to {}.", pipeConnectorTransferTimeoutMs);
10871087
}
10881088
}
10891089
}
10901090

1091-
public int getPipeConnectorReadFileBufferSize() {
1091+
public int getPipeSinkReadFileBufferSize() {
10921092
return pipeConnectorReadFileBufferSize;
10931093
}
10941094

1095-
public void setPipeConnectorReadFileBufferSize(int pipeConnectorReadFileBufferSize) {
1095+
public void setPipeSinkReadFileBufferSize(int pipeConnectorReadFileBufferSize) {
10961096
if (this.pipeConnectorReadFileBufferSize == pipeConnectorReadFileBufferSize) {
10971097
return;
10981098
}
10991099
this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
11001100
logger.info("pipeConnectorReadFileBufferSize is set to {}.", pipeConnectorReadFileBufferSize);
11011101
}
11021102

1103-
public boolean isPipeConnectorReadFileBufferMemoryControlEnabled() {
1104-
return isPipeConnectorReadFileBufferMemoryControlEnabled;
1103+
public boolean isPipeSinkReadFileBufferMemoryControlEnabled() {
1104+
return isPipeSinkReadFileBufferMemoryControlEnabled;
11051105
}
11061106

1107-
public void setIsPipeConnectorReadFileBufferMemoryControlEnabled(
1108-
boolean isPipeConnectorReadFileBufferMemoryControlEnabled) {
1109-
if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
1110-
== isPipeConnectorReadFileBufferMemoryControlEnabled) {
1107+
public void setIsPipeSinkReadFileBufferMemoryControlEnabled(
1108+
boolean isPipeSinkReadFileBufferMemoryControlEnabled) {
1109+
if (this.isPipeSinkReadFileBufferMemoryControlEnabled
1110+
== isPipeSinkReadFileBufferMemoryControlEnabled) {
11111111
return;
11121112
}
1113-
this.isPipeConnectorReadFileBufferMemoryControlEnabled =
1114-
isPipeConnectorReadFileBufferMemoryControlEnabled;
1113+
this.isPipeSinkReadFileBufferMemoryControlEnabled =
1114+
isPipeSinkReadFileBufferMemoryControlEnabled;
11151115
logger.info(
1116-
"isPipeConnectorReadFileBufferMemoryControlEnabled is set to {}.",
1117-
isPipeConnectorReadFileBufferMemoryControlEnabled);
1116+
"isPipeSinkReadFileBufferMemoryControlEnabled is set to {}.",
1117+
isPipeSinkReadFileBufferMemoryControlEnabled);
11181118
}
11191119

1120-
public void setPipeConnectorRPCThriftCompressionEnabled(
1120+
public void setPipeSinkRPCThriftCompressionEnabled(
11211121
boolean pipeConnectorRPCThriftCompressionEnabled) {
1122-
if (this.isPipeConnectorReadFileBufferMemoryControlEnabled
1122+
if (this.isPipeSinkReadFileBufferMemoryControlEnabled
11231123
== pipeConnectorRPCThriftCompressionEnabled) {
11241124
return;
11251125
}
@@ -1129,7 +1129,7 @@ public void setPipeConnectorRPCThriftCompressionEnabled(
11291129
pipeConnectorRPCThriftCompressionEnabled);
11301130
}
11311131

1132-
public boolean isPipeConnectorRPCThriftCompressionEnabled() {
1132+
public boolean isPipeSinkRPCThriftCompressionEnabled() {
11331133
return pipeConnectorRPCThriftCompressionEnabled;
11341134
}
11351135

@@ -1183,7 +1183,7 @@ public int getPipeAsyncSinkForcedRetryTotalEventQueueSize() {
11831183
return pipeAsyncSinkForcedRetryTotalEventQueueSize;
11841184
}
11851185

1186-
public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
1186+
public void setPipeAsyncSinkMaxRetryExecutionTimeMsPerCall(
11871187
long pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
11881188
if (this.pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall
11891189
== pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall) {
@@ -1196,15 +1196,15 @@ public void setPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall(
11961196
pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall);
11971197
}
11981198

1199-
public long getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall() {
1199+
public long getPipeAsyncSinkMaxRetryExecutionTimeMsPerCall() {
12001200
return pipeAsyncConnectorMaxRetryExecutionTimeMsPerCall;
12011201
}
12021202

1203-
public int getPipeAsyncConnectorSelectorNumber() {
1203+
public int getPipeAsyncSinkSelectorNumber() {
12041204
return pipeAsyncConnectorSelectorNumber;
12051205
}
12061206

1207-
public void setPipeAsyncConnectorSelectorNumber(int pipeAsyncConnectorSelectorNumber) {
1207+
public void setPipeAsyncSinkSelectorNumber(int pipeAsyncConnectorSelectorNumber) {
12081208
if (pipeAsyncConnectorSelectorNumber <= 0) {
12091209
logger.info(
12101210
"pipeAsyncConnectorSelectorNumber should be greater than 0, configuring it not to change.");
@@ -1218,11 +1218,11 @@ public void setPipeAsyncConnectorSelectorNumber(int pipeAsyncConnectorSelectorNu
12181218
logger.info("pipeAsyncConnectorSelectorNumber is set to {}.", pipeAsyncConnectorSelectorNumber);
12191219
}
12201220

1221-
public int getPipeAsyncConnectorMaxClientNumber() {
1221+
public int getPipeAsyncSinkMaxClientNumber() {
12221222
return pipeAsyncConnectorMaxClientNumber;
12231223
}
12241224

1225-
public void setPipeAsyncConnectorMaxClientNumber(int pipeAsyncConnectorMaxClientNumber) {
1225+
public void setPipeAsyncSinkMaxClientNumber(int pipeAsyncConnectorMaxClientNumber) {
12261226
if (pipeAsyncConnectorMaxClientNumber <= 0) {
12271227
logger.info(
12281228
" pipeAsyncConnectorMaxClientNumber should be greater than 0, configuring it not to change.");
@@ -1237,12 +1237,11 @@ public void setPipeAsyncConnectorMaxClientNumber(int pipeAsyncConnectorMaxClient
12371237
"pipeAsyncConnectorMaxClientNumber is set to {}.", pipeAsyncConnectorMaxClientNumber);
12381238
}
12391239

1240-
public int getPipeAsyncConnectorMaxTsFileClientNumber() {
1240+
public int getPipeAsyncSinkMaxTsFileClientNumber() {
12411241
return pipeAsyncConnectorMaxTsFileClientNumber;
12421242
}
12431243

1244-
public void setPipeAsyncConnectorMaxTsFileClientNumber(
1245-
int pipeAsyncConnectorMaxTsFileClientNumber) {
1244+
public void setPipeAsyncSinkMaxTsFileClientNumber(int pipeAsyncConnectorMaxTsFileClientNumber) {
12461245
if (pipeAsyncConnectorMaxTsFileClientNumber <= 0) {
12471246
logger.info(
12481247
"pipeAsyncConnectorMaxTsFileClientNumber should be greater than 0, configuring it not to change.");
@@ -1354,11 +1353,11 @@ public void setPipeAutoRestartEnabled(boolean pipeAutoRestartEnabled) {
13541353
logger.info("pipeAutoRestartEnabled is set to {}.", pipeAutoRestartEnabled);
13551354
}
13561355

1357-
public long getPipeConnectorRetryIntervalMs() {
1356+
public long getPipeSinkRetryIntervalMs() {
13581357
return pipeConnectorRetryIntervalMs;
13591358
}
13601359

1361-
public void setPipeConnectorRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
1360+
public void setPipeSinkRetryIntervalMs(long pipeConnectorRetryIntervalMs) {
13621361
if (this.pipeConnectorRetryIntervalMs == pipeConnectorRetryIntervalMs) {
13631362
return;
13641363
}
@@ -2070,12 +2069,11 @@ public void setRateLimiterHotReloadCheckIntervalMs(int rateLimiterHotReloadCheck
20702069
"rateLimiterHotReloadCheckIntervalMs is set to {}", rateLimiterHotReloadCheckIntervalMs);
20712070
}
20722071

2073-
public int getPipeConnectorRequestSliceThresholdBytes() {
2072+
public int getPipeSinkRequestSliceThresholdBytes() {
20742073
return pipeConnectorRequestSliceThresholdBytes;
20752074
}
20762075

2077-
public void setPipeConnectorRequestSliceThresholdBytes(
2078-
int pipeConnectorRequestSliceThresholdBytes) {
2076+
public void setPipeSinkRequestSliceThresholdBytes(int pipeConnectorRequestSliceThresholdBytes) {
20792077
if (this.pipeConnectorRequestSliceThresholdBytes == pipeConnectorRequestSliceThresholdBytes) {
20802078
return;
20812079
}

0 commit comments

Comments
 (0)