Skip to content

Commit 4d83ac8

Browse files
committed
Close rpc channel if MilvusClient fails to initialize
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent cbf0629 commit 4d83ac8

File tree

2 files changed

+38
-16
lines changed

2 files changed

+38
-16
lines changed

sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -184,17 +184,28 @@ public void start(ClientCall.Listener<RespT> responseListener, Metadata headers)
184184
}
185185

186186
assert channel != null;
187-
blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
188-
futureStub = MilvusServiceGrpc.newFutureStub(channel);
189-
190-
// calls a RPC Connect() to the remote server, and sends the client info to the server
191-
// so that the server knows which client is interacting, especially for accesses log.
192-
this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry()
193-
R<ConnectResponse> resp = this.retry(() -> connect(connectParam));
194-
if (resp.getStatus() != R.Status.Success.getCode()) {
195-
String msg = "Failed to initialize connection. Error: " + resp.getMessage();
196-
logError(msg);
197-
throw new RuntimeException(msg);
187+
188+
try {
189+
blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
190+
futureStub = MilvusServiceGrpc.newFutureStub(channel);
191+
192+
// calls a RPC Connect() to the remote server, and sends the client info to the server
193+
// so that the server knows which client is interacting, especially for accesses log.
194+
this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry()
195+
R<ConnectResponse> resp = this.retry(() -> connect(connectParam));
196+
if (resp.getStatus() != R.Status.Success.getCode()) {
197+
String msg = "Failed to initialize connection. Error: " + resp.getMessage();
198+
logError(msg);
199+
throw new RuntimeException(msg);
200+
}
201+
} catch (Exception e) {
202+
// close the channel if connect() throws exception, avoid leakage
203+
try {
204+
close(3);
205+
} catch (InterruptedException ignored) {
206+
Thread.currentThread().interrupt();
207+
}
208+
throw e;
198209
}
199210
this.timeoutMs = 0; // reset the timeout value to default
200211
}

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,27 @@ private void connect(ConnectConfig connectConfig) {
136136
close(3);
137137
}
138138
} catch (InterruptedException e) {
139+
Thread.currentThread().interrupt();
139140
throw new RuntimeException(e);
140141
}
141142
channel = clientUtils.getChannel(connectConfig);
142143

143-
blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
144-
connect(connectConfig, blockingStub);
144+
try {
145+
blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
146+
connect(connectConfig, blockingStub);
145147

146-
if (connectConfig.getDbName() != null) {
147-
// check if database exists
148-
clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName());
148+
if (connectConfig.getDbName() != null) {
149+
// check if database exists
150+
clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName());
151+
}
152+
} catch (Exception e) {
153+
// close the channel if connect() and checkDatabaseExist() throws exception, avoid leakage
154+
try {
155+
close(3);
156+
} catch (InterruptedException ignored) {
157+
Thread.currentThread().interrupt();
158+
}
159+
throw e;
149160
}
150161
}
151162

0 commit comments

Comments
 (0)