Skip to content

Commit d29e45c

Browse files
authored
Pipe: Fix unnecessary client shutdown logic and reduce handshake waiting time (apache#16341)
* Pipe: Fix unnecessary client shutdown logic and reduce handshake waiting time * spotless * update
1 parent f26f13d commit d29e45c

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,9 @@ public void onComplete(final TPipeTransferResp response) {
252252
}
253253

254254
isHandshakeFinished.set(true);
255+
synchronized (isHandshakeFinished) {
256+
isHandshakeFinished.notifyAll();
257+
}
255258
}
256259

257260
@Override
@@ -265,6 +268,9 @@ public void onError(final Exception e) {
265268
exception.set(e);
266269

267270
isHandshakeFinished.set(true);
271+
synchronized (isHandshakeFinished) {
272+
isHandshakeFinished.notifyAll();
273+
}
268274
}
269275
};
270276

@@ -350,14 +356,13 @@ public void onError(final Exception e) {
350356

351357
private void waitHandshakeFinished(final AtomicBoolean isHandshakeFinished) {
352358
try {
353-
final long startTime = System.currentTimeMillis();
354359
while (!isHandshakeFinished.get()) {
355-
if (isClosed
356-
|| System.currentTimeMillis() - startTime
357-
> PipeConfig.getInstance().getPipeConnectorHandshakeTimeoutMs() * 2L) {
360+
if (isClosed) {
358361
throw new PipeConnectionException("Timed out when waiting for client handshake finish.");
359362
}
360-
Thread.sleep(10);
363+
synchronized (isHandshakeFinished) {
364+
isHandshakeFinished.wait(1);
365+
}
361366
}
362367
} catch (final InterruptedException e) {
363368
Thread.currentThread().interrupt();

0 commit comments

Comments
 (0)