diff --git a/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java b/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java index 48f45aa43..11a331c05 100644 --- a/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java +++ b/sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java @@ -184,17 +184,28 @@ public void start(ClientCall.Listener responseListener, Metadata headers) } assert channel != null; - blockingStub = MilvusServiceGrpc.newBlockingStub(channel); - futureStub = MilvusServiceGrpc.newFutureStub(channel); - - // calls a RPC Connect() to the remote server, and sends the client info to the server - // so that the server knows which client is interacting, especially for accesses log. - this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry() - R resp = this.retry(() -> connect(connectParam)); - if (resp.getStatus() != R.Status.Success.getCode()) { - String msg = "Failed to initialize connection. Error: " + resp.getMessage(); - logError(msg); - throw new RuntimeException(msg); + + try { + blockingStub = MilvusServiceGrpc.newBlockingStub(channel); + futureStub = MilvusServiceGrpc.newFutureStub(channel); + + // calls a RPC Connect() to the remote server, and sends the client info to the server + // so that the server knows which client is interacting, especially for accesses log. + this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry() + R resp = this.retry(() -> connect(connectParam)); + if (resp.getStatus() != R.Status.Success.getCode()) { + String msg = "Failed to initialize connection. Error: " + resp.getMessage(); + logError(msg); + throw new RuntimeException(msg); + } + } catch (Exception e) { + // close the channel if connect() throws exception, avoid leakage + try { + close(3); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + throw e; } this.timeoutMs = 0; // reset the timeout value to default } diff --git a/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java b/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java index 3f25d8404..64eee4086 100644 --- a/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java +++ b/sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java @@ -136,16 +136,27 @@ private void connect(ConnectConfig connectConfig) { close(3); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } channel = clientUtils.getChannel(connectConfig); - blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady(); - connect(connectConfig, blockingStub); + try { + blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady(); + connect(connectConfig, blockingStub); - if (connectConfig.getDbName() != null) { - // check if database exists - clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName()); + if (connectConfig.getDbName() != null) { + // check if database exists + clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName()); + } + } catch (Exception e) { + // close the channel if connect() and checkDatabaseExist() throws exception, avoid leakage + try { + close(3); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + throw e; } }