Skip to content

Commit 1bc2c7f

Browse files
authored
Pipe: Avoid throwing null pointer during Close process (apache#16391)
* Pipe: Avoid throwing null pointer during Close process * fix * fix
1 parent a63a23e commit 1bc2c7f

File tree

2 files changed

+10
-8
lines changed

2 files changed

+10
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public Pair<IoTDBSyncClient, Boolean> getClient(final String deviceId) {
9595
&& endPoint != null
9696
&& endPoint2ClientAndStatus.containsKey(endPoint)
9797
&& Boolean.TRUE.equals(endPoint2ClientAndStatus.get(endPoint).getRight())
98+
&& endPoint2ClientAndStatus.get(endPoint).getLeft() != null
9899
? endPoint2ClientAndStatus.get(endPoint)
99100
: getClient();
100101
}
@@ -104,6 +105,7 @@ public Pair<IoTDBSyncClient, Boolean> getClient(final TEndPoint endPoint) {
104105
&& endPoint != null
105106
&& endPoint2ClientAndStatus.containsKey(endPoint)
106107
&& Boolean.TRUE.equals(endPoint2ClientAndStatus.get(endPoint).getRight())
108+
&& endPoint2ClientAndStatus.get(endPoint).getLeft() != null
107109
? endPoint2ClientAndStatus.get(endPoint)
108110
: getClient();
109111
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public void checkClientStatusAndTryReconstructIfNecessary() {
121121
// Check whether any clients are available, if any client is available, return directly
122122
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus :
123123
endPoint2ClientAndStatus.values()) {
124-
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
124+
if (Boolean.TRUE.equals(clientAndStatus.getRight()) && clientAndStatus.getLeft() != null) {
125125
return;
126126
}
127127
}
@@ -130,7 +130,7 @@ public void checkClientStatusAndTryReconstructIfNecessary() {
130130
// Reconstruct all dead clients
131131
for (final Map.Entry<TEndPoint, Pair<IoTDBSyncClient, Boolean>> entry :
132132
endPoint2ClientAndStatus.entrySet()) {
133-
if (Boolean.TRUE.equals(entry.getValue().getRight())) {
133+
if (Boolean.TRUE.equals(entry.getValue().getRight()) && entry.getValue().getLeft() != null) {
134134
continue;
135135
}
136136

@@ -139,7 +139,7 @@ public void checkClientStatusAndTryReconstructIfNecessary() {
139139

140140
// Check whether any clients are available
141141
for (final Pair<IoTDBSyncClient, Boolean> clientAndStatus : endPoint2ClientAndStatus.values()) {
142-
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
142+
if (Boolean.TRUE.equals(clientAndStatus.getRight()) && clientAndStatus.getLeft() != null) {
143143
lastCheckClientStatusTimestamp = System.currentTimeMillis();
144144
return;
145145
}
@@ -306,7 +306,6 @@ public void close() {
306306
try {
307307
if (clientAndStatus.getLeft() != null) {
308308
clientAndStatus.getLeft().close();
309-
clientAndStatus.setLeft(null);
310309
}
311310
LOGGER.info("Client {}:{} closed.", endPoint.getIp(), endPoint.getPort());
312311
} catch (Exception e) {
@@ -337,7 +336,7 @@ public Pair<IoTDBSyncClient, Boolean> getClient() {
337336
final int clientIndex = (int) (currentClientIndex++ % clientSize);
338337
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
339338
endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
340-
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
339+
if (Boolean.TRUE.equals(clientAndStatus.getRight()) && clientAndStatus.getLeft() != null) {
341340
return clientAndStatus;
342341
}
343342
}
@@ -354,7 +353,7 @@ public Pair<IoTDBSyncClient, Boolean> getClient() {
354353
final int clientIndex = (int) (Math.random() * clientSize);
355354
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
356355
endPoint2ClientAndStatus.get(endPointList.get(clientIndex));
357-
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
356+
if (Boolean.TRUE.equals(clientAndStatus.getRight()) && clientAndStatus.getLeft() != null) {
358357
return clientAndStatus;
359358
}
360359

@@ -363,7 +362,8 @@ public Pair<IoTDBSyncClient, Boolean> getClient() {
363362
final int nextClientIndex = (clientIndex + tryCount + 1) % clientSize;
364363
final Pair<IoTDBSyncClient, Boolean> nextClientAndStatus =
365364
endPoint2ClientAndStatus.get(endPointList.get(nextClientIndex));
366-
if (Boolean.TRUE.equals(nextClientAndStatus.getRight())) {
365+
if (Boolean.TRUE.equals(nextClientAndStatus.getRight())
366+
&& clientAndStatus.getLeft() != null) {
367367
return nextClientAndStatus;
368368
}
369369
}
@@ -380,7 +380,7 @@ public Pair<IoTDBSyncClient, Boolean> getClient() {
380380
for (final TEndPoint endPoint : endPointList) {
381381
final Pair<IoTDBSyncClient, Boolean> clientAndStatus =
382382
endPoint2ClientAndStatus.get(endPoint);
383-
if (Boolean.TRUE.equals(clientAndStatus.getRight())) {
383+
if (Boolean.TRUE.equals(clientAndStatus.getRight()) && clientAndStatus.getLeft() != null) {
384384
return clientAndStatus;
385385
}
386386
}

0 commit comments

Comments
 (0)