Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions sdk-core/src/main/java/io/milvus/client/MilvusServiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,28 @@ public void start(ClientCall.Listener<RespT> responseListener, Metadata headers)
}

assert channel != null;
blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
futureStub = MilvusServiceGrpc.newFutureStub(channel);

// calls a RPC Connect() to the remote server, and sends the client info to the server
// so that the server knows which client is interacting, especially for accesses log.
this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry()
R<ConnectResponse> resp = this.retry(() -> connect(connectParam));
if (resp.getStatus() != R.Status.Success.getCode()) {
String msg = "Failed to initialize connection. Error: " + resp.getMessage();
logError(msg);
throw new RuntimeException(msg);

try {
blockingStub = MilvusServiceGrpc.newBlockingStub(channel);
futureStub = MilvusServiceGrpc.newFutureStub(channel);

// calls a RPC Connect() to the remote server, and sends the client info to the server
// so that the server knows which client is interacting, especially for accesses log.
this.timeoutMs = connectParam.getConnectTimeoutMs(); // set this value to connectTimeoutMs to control the retry()
R<ConnectResponse> resp = this.retry(() -> connect(connectParam));
if (resp.getStatus() != R.Status.Success.getCode()) {
String msg = "Failed to initialize connection. Error: " + resp.getMessage();
logError(msg);
throw new RuntimeException(msg);
}
} catch (Exception e) {
// close the channel if connect() throws exception, avoid leakage
try {
close(3);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
throw e;
}
this.timeoutMs = 0; // reset the timeout value to default
}
Expand Down
21 changes: 16 additions & 5 deletions sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,27 @@ private void connect(ConnectConfig connectConfig) {
close(3);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
channel = clientUtils.getChannel(connectConfig);

blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
connect(connectConfig, blockingStub);
try {
blockingStub = MilvusServiceGrpc.newBlockingStub(channel).withWaitForReady();
connect(connectConfig, blockingStub);

if (connectConfig.getDbName() != null) {
// check if database exists
clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName());
if (connectConfig.getDbName() != null) {
// check if database exists
clientUtils.checkDatabaseExist(this.blockingStub, connectConfig.getDbName());
}
} catch (Exception e) {
// close the channel if connect() and checkDatabaseExist() throws exception, avoid leakage
try {
close(3);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
throw e;
}
}

Expand Down
Loading