Skip to content

Commit 2bfc35a

Browse files
XNX02SteveYurongSu
andauthored
Pipe: add information about sender's IP and port in the pipe receiver logs (#14343) (#14351)
Co-authored-by: Steve Yurong Su <rong@apache.org>
1 parent 2783e80 commit 2bfc35a

File tree

5 files changed

+37
-3
lines changed

5 files changed

+37
-3
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
6969
import org.apache.iotdb.confignode.service.ConfigNode;
7070
import org.apache.iotdb.consensus.exception.ConsensusException;
71+
import org.apache.iotdb.db.protocol.session.IClientSession;
72+
import org.apache.iotdb.db.protocol.session.SessionManager;
7173
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
7274
import org.apache.iotdb.rpc.RpcUtils;
7375
import org.apache.iotdb.rpc.TSStatusCode;
@@ -91,6 +93,8 @@ public class IoTDBConfigNodeReceiver extends IoTDBFileReceiver {
9193

9294
private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConfigNodeReceiver.class);
9395

96+
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
97+
9498
private static final AtomicInteger QUERY_ID_GENERATOR = new AtomicInteger(0);
9599

96100
private static final PipeConfigPhysicalPlanTSStatusVisitor STATUS_VISITOR =
@@ -329,6 +333,18 @@ protected String getReceiverFileBaseDir() {
329333
return ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir();
330334
}
331335

336+
@Override
337+
protected String getSenderHost() {
338+
final IClientSession session = SESSION_MANAGER.getCurrSession();
339+
return session != null ? session.getClientAddress() : "unknown";
340+
}
341+
342+
@Override
343+
protected String getSenderPort() {
344+
final IClientSession session = SESSION_MANAGER.getCurrSession();
345+
return session != null ? String.valueOf(session.getClientPort()) : "unknown";
346+
}
347+
332348
@Override
333349
protected TSStatus loadFileV1(
334350
final PipeTransferFileSealReqV1 req, final String fileAbsolutePath) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,18 @@ protected String getReceiverFileBaseDir() throws DiskSpaceInsufficientException
410410
return Objects.isNull(folderManager) ? null : folderManager.getNextFolder();
411411
}
412412

413+
@Override
414+
protected String getSenderHost() {
415+
final IClientSession session = SESSION_MANAGER.getCurrSession();
416+
return session != null ? session.getClientAddress() : "unknown";
417+
}
418+
419+
@Override
420+
protected String getSenderPort() {
421+
final IClientSession session = SESSION_MANAGER.getCurrSession();
422+
return session != null ? String.valueOf(session.getClientPort()) : "unknown";
423+
}
424+
413425
@Override
414426
protected TSStatus loadFileV1(final PipeTransferFileSealReqV1 req, final String fileAbsolutePath)
415427
throws IOException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public abstract class IClientSession {
4545

4646
public abstract String getClientAddress();
4747

48-
abstract int getClientPort();
48+
public abstract int getClientPort();
4949

5050
abstract TSConnectionType getConnectionType();
5151

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/RestClientSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public String getClientAddress() {
3838
}
3939

4040
@Override
41-
int getClientPort() {
41+
public int getClientPort() {
4242
return 0;
4343
}
4444

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,20 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
174174
receiverFileDirWithIdSuffix.set(newReceiverDir);
175175

176176
LOGGER.info(
177-
"Receiver id = {}: Handshake successfully, receiver file dir = {}.",
177+
"Receiver id = {}: Handshake successfully! Sender's host = {}, port = {}. Receiver's file dir = {}.",
178178
receiverId.get(),
179+
getSenderHost(),
180+
getSenderPort(),
179181
newReceiverDir.getPath());
180182
return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS);
181183
}
182184

183185
protected abstract String getReceiverFileBaseDir() throws Exception;
184186

187+
protected abstract String getSenderHost();
188+
189+
protected abstract String getSenderPort();
190+
185191
protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshakeV2Req req)
186192
throws IOException {
187193
// Reject to handshake if the receiver can not take clusterId from config node.

0 commit comments

Comments
 (0)