Skip to content

Commit 387d487

Browse files
committed
m
1 parent d0fa4a2 commit 387d487

File tree

3 files changed

+10
-11
lines changed

3 files changed

+10
-11
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@
3737
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3838
import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
3939
import org.apache.iotdb.db.storageengine.StorageEngine;
40-
import org.apache.iotdb.pipe.api.PipeSink;
40+
import org.apache.iotdb.pipe.api.PipeConnector;
4141
import org.apache.iotdb.pipe.api.annotation.TreeModel;
42-
import org.apache.iotdb.pipe.api.customizer.configuration.PipeSinkRuntimeConfiguration;
42+
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
4343
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
4444
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
4545
import org.apache.iotdb.pipe.api.event.Event;
@@ -91,7 +91,7 @@
9191
import static org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent.isTabletEmpty;
9292

9393
@TreeModel
94-
public class IoTDBLegacyPipeSink implements PipeSink {
94+
public class IoTDBLegacyPipeSink implements PipeConnector {
9595

9696
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBLegacyPipeSink.class);
9797

@@ -173,7 +173,7 @@ private Set<TEndPoint> parseNodeUrls(final PipeParameters parameters) {
173173

174174
@Override
175175
public void customize(
176-
final PipeParameters parameters, final PipeSinkRuntimeConfiguration configuration)
176+
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
177177
throws Exception {
178178
ipAddress = parameters.getStringByKeys(CONNECTOR_IOTDB_IP_KEY, SINK_IOTDB_IP_KEY);
179179
port = parameters.getIntByKeys(CONNECTOR_IOTDB_PORT_KEY, SINK_IOTDB_PORT_KEY);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public void close() {
474474
* @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
475475
*/
476476
private void waitForResourceEnough4Slicing(final long timeoutMs) throws InterruptedException {
477-
if (!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) {
477+
if (!PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled()) {
478478
return;
479479
}
480480

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskSinkStage.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public SubscriptionTaskSinkStage(
4141

4242
@Override
4343
protected void registerSubtask() {
44-
this.connectorSubtaskId =
44+
this.sinkSubtaskId =
4545
SubscriptionSinkSubtaskManager.instance()
4646
.register(
4747
executor.get(),
@@ -56,22 +56,21 @@ public void createSubtask() throws PipeException {
5656

5757
@Override
5858
public void startSubtask() throws PipeException {
59-
SubscriptionSinkSubtaskManager.instance().start(connectorSubtaskId);
59+
SubscriptionSinkSubtaskManager.instance().start(sinkSubtaskId);
6060
}
6161

6262
@Override
6363
public void stopSubtask() throws PipeException {
64-
SubscriptionSinkSubtaskManager.instance().stop(connectorSubtaskId);
64+
SubscriptionSinkSubtaskManager.instance().stop(sinkSubtaskId);
6565
}
6666

6767
@Override
6868
public void dropSubtask() throws PipeException {
6969
SubscriptionSinkSubtaskManager.instance()
70-
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
70+
.deregister(pipeName, creationTime, regionId, sinkSubtaskId);
7171
}
7272

7373
public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
74-
return SubscriptionSinkSubtaskManager.instance()
75-
.getPipeConnectorPendingQueue(connectorSubtaskId);
74+
return SubscriptionSinkSubtaskManager.instance().getPipeConnectorPendingQueue(sinkSubtaskId);
7675
}
7776
}

0 commit comments

Comments
 (0)