Skip to content

Commit 30117fd

Browse files
authored
Fix a bug that rpcDeadline incorrectly work for iterator (#1719)
Signed-off-by: yhmo <yihua.mo@zilliz.com>
1 parent b21e548 commit 30117fd

File tree

12 files changed

+114
-39
lines changed

12 files changed

+114
-39
lines changed

examples/src/main/java/io/milvus/v1/IteratorExample.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ private void insertColumns() {
150150
List<Long> ages = new ArrayList<>();
151151
List<Long> ids = new ArrayList<>();
152152
for (long i = 0L; i < NUM_ENTITIES; ++i) {
153-
ages.add((long) batch * NUM_ENTITIES + i);
153+
ages.add(((long) batch * NUM_ENTITIES + i) % 100);
154154
ids.add((long) batch * NUM_ENTITIES + i);
155155
}
156156

@@ -199,20 +199,20 @@ private void prepareData() {
199199
}
200200

201201
private void queryIterateCollectionNoOffset() {
202-
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
202+
String expr = String.format("10 <= %s <= 30", AGE_FIELD);
203203

204-
QueryIterator queryIterator = getQueryIterator(expr, 0L, 5L, null);
204+
QueryIterator queryIterator = getQueryIterator(expr, 0L, 1L, null);
205205
iterateQueryResult(queryIterator);
206206
}
207207

208208
private void queryIterateCollectionWithOffset() {
209-
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
209+
String expr = String.format("10 <= %s <= 100", ID_FIELD);
210210
QueryIterator queryIterator = getQueryIterator(expr, 10L, 50L, null);
211211
iterateQueryResult(queryIterator);
212212
}
213213

214214
private void queryIterateCollectionWithLimit() {
215-
String expr = String.format("10 <= %s <= 100", AGE_FIELD);
215+
String expr = String.format("10 <= %s <= 100", ID_FIELD);
216216
QueryIterator queryIterator = getQueryIterator(expr, null, 80L, 530L);
217217
iterateQueryResult(queryIterator);
218218
}
@@ -232,6 +232,7 @@ private void searchIteratorCollectionWithLimit() {
232232
}
233233

234234
private void iterateQueryResult(QueryIterator queryIterator) {
235+
System.out.println("\n========== queryIterator() ==========");
235236
int pageIdx = 0;
236237
int iterateCount = 0;
237238
while (true) {
@@ -252,6 +253,7 @@ private void iterateQueryResult(QueryIterator queryIterator) {
252253
}
253254

254255
private void iterateSearchResult(SearchIterator searchIterator) {
256+
System.out.println("\n========== searchIterator() ==========");
255257
int pageIdx = 0;
256258
int iterateCount = 0;
257259
while (true) {
@@ -321,6 +323,10 @@ public static void main(String[] args) {
321323
example.prepareData();
322324
}
323325

326+
// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
327+
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
328+
milvusClient.withTimeout(200, TimeUnit.MILLISECONDS);
329+
324330
example.queryIterateCollectionNoOffset();
325331
example.queryIterateCollectionWithOffset();
326332
example.queryIterateCollectionWithLimit();

examples/src/main/java/io/milvus/v2/IteratorExample.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.commons.lang3.StringUtils;
4444

4545
import java.util.*;
46+
import java.util.concurrent.TimeUnit;
4647
import java.util.function.Function;
4748

4849
public class IteratorExample {
@@ -315,11 +316,16 @@ private static void searchIteratorV2WithTemplate(int batchSize) {
315316

316317
public static void main(String[] args) {
317318
buildCollection();
318-
queryIterator("userID < 300", 50, 5, 400);
319+
320+
// set rpcTimeoutMs, just to verify it works for each call of query/search inside the iterator
321+
// in versions older than 2.5.16/2.6.11, iterator.next() will timeout after several calls if the rpcTimeoutMs is greater than 0
322+
client.withTimeout(200, TimeUnit.MILLISECONDS);
323+
324+
queryIterator("userID < 3000", 1, 5, 10000);
319325
queryIteratorWithTemplate(80);
320326

321327
searchIteratorV1("userAge > 50 &&userAge < 100", "{\"range_filter\": 15.0, \"radius\": 20.0}", 100, 500);
322-
searchIteratorV1("", "", 10, 99);
328+
searchIteratorV1("", "", 1, 3000);
323329
searchIteratorV2("userAge > 10 &&userAge < 20", null, 50, 120, null);
324330

325331
Map<String, Object> extraParams = new HashMap<>();

sdk-core/src/main/java/io/milvus/client/AbstractMilvusGrpcClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.milvus.exception.ServerException;
3636
import io.milvus.grpc.*;
3737
import io.milvus.orm.iterator.QueryIterator;
38+
import io.milvus.orm.iterator.RpcStubWrapper;
3839
import io.milvus.orm.iterator.SearchIterator;
3940
import io.milvus.param.*;
4041
import io.milvus.param.alias.AlterAliasParam;
@@ -3528,7 +3529,8 @@ public R<QueryIterator> queryIterator(QueryIteratorParam requestParam) {
35283529
return R.failed(descResp.getException());
35293530
}
35303531
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
3531-
QueryIterator queryIterator = new QueryIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
3532+
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
3533+
QueryIterator queryIterator = new QueryIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
35323534
return R.success(queryIterator);
35333535
}
35343536

@@ -3543,7 +3545,8 @@ public R<SearchIterator> searchIterator(SearchIteratorParam requestParam) {
35433545
return R.failed(descResp.getException());
35443546
}
35453547
DescCollResponseWrapper descCollResponseWrapper = new DescCollResponseWrapper(descResp.getData());
3546-
SearchIterator searchIterator = new SearchIterator(requestParam, this.blockingStub(), descCollResponseWrapper.getPrimaryField());
3548+
// for MilvusClientV1, we don't support to set rpcDeadlineMs for iterator, rpcDeadlineMs is always 0(no deadline)
3549+
SearchIterator searchIterator = new SearchIterator(requestParam, new RpcStubWrapper(this.blockingStub(), 0L), descCollResponseWrapper.getPrimaryField());
35473550
return R.success(searchIterator);
35483551
}
35493552

sdk-core/src/main/java/io/milvus/orm/iterator/QueryIterator.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
package io.milvus.orm.iterator;
2121

22-
import io.milvus.grpc.*;
22+
import io.milvus.grpc.DataType;
23+
import io.milvus.grpc.KeyValuePair;
24+
import io.milvus.grpc.QueryRequest;
25+
import io.milvus.grpc.QueryResults;
2326
import io.milvus.param.Constant;
2427
import io.milvus.param.collection.FieldType;
2528
import io.milvus.param.dml.QueryIteratorParam;
@@ -41,7 +44,7 @@
4144
public class QueryIterator {
4245
protected static final Logger logger = LoggerFactory.getLogger(RpcUtils.class);
4346
private final IteratorCache iteratorCache;
44-
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
47+
private final RpcStubWrapper blockingStub;
4548
private final FieldType primaryField;
4649

4750
private final QueryIteratorReq queryIteratorReq;
@@ -56,7 +59,7 @@ public class QueryIterator {
5659
private long sessionTs = 0;
5760

5861
public QueryIterator(QueryIteratorParam queryIteratorParam,
59-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
62+
RpcStubWrapper blockingStub,
6063
FieldType primaryField) {
6164
this.iteratorCache = new IteratorCache();
6265
this.blockingStub = blockingStub;
@@ -74,14 +77,13 @@ public QueryIterator(QueryIteratorParam queryIteratorParam,
7477
}
7578

7679
public QueryIterator(QueryIteratorReq queryIteratorReq,
77-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
80+
RpcStubWrapper blockingStub,
7881
CreateCollectionReq.FieldSchema primaryField) {
7982
this.iteratorCache = new IteratorCache();
8083
this.blockingStub = blockingStub;
8184
this.queryIteratorReq = queryIteratorReq;
8285
this.primaryField = IteratorAdapterV2.convertV2Field(primaryField);
8386

84-
8587
this.batchSize = (int) queryIteratorReq.getBatchSize();
8688
this.expr = queryIteratorReq.getExpr();
8789
this.limit = queryIteratorReq.getLimit();
@@ -247,7 +249,7 @@ private QueryResults executeQuery(String expr, long offset, long limit, long ts,
247249
// set default consistency level
248250
builder.setUseDefaultConsistency(true);
249251

250-
QueryResults response = rpcUtils.retry(() -> blockingStub.query(builder.build()));
252+
QueryResults response = rpcUtils.retry(() -> blockingStub.get().query(builder.build()));
251253
String title = String.format("QueryRequest collectionName:%s", queryIteratorReq.getCollectionName());
252254
rpcUtils.handleResponse(title, response.getStatus());
253255
return response;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package io.milvus.orm.iterator;
21+
22+
import io.milvus.grpc.MilvusServiceGrpc;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
public class RpcStubWrapper {
27+
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
28+
29+
// rpcTimeoutMs of MilvusServiceBlockingStub.withDeadlineAfter() is "end of using time", not "timeout of per call",
30+
// we have to reset this value for each time QueryIterator calls the query() interface.
31+
// the rpcDeadlineMs value is passed from MilvusClient
32+
private long rpcDeadlineMs = 0L;
33+
34+
public RpcStubWrapper(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
35+
long rpcDeadlineMs) {
36+
this.blockingStub = blockingStub;
37+
this.rpcDeadlineMs = rpcDeadlineMs;
38+
}
39+
40+
public MilvusServiceGrpc.MilvusServiceBlockingStub get() {
41+
if (rpcDeadlineMs > 0) {
42+
return blockingStub.withDeadlineAfter(rpcDeadlineMs, TimeUnit.MILLISECONDS);
43+
} else {
44+
return blockingStub;
45+
}
46+
}
47+
}

sdk-core/src/main/java/io/milvus/orm/iterator/SearchIterator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import io.milvus.common.utils.ExceptionUtils;
2626
import io.milvus.common.utils.JsonUtils;
2727
import io.milvus.exception.ParamException;
28-
import io.milvus.grpc.*;
28+
import io.milvus.grpc.DataType;
29+
import io.milvus.grpc.KeyValuePair;
30+
import io.milvus.grpc.SearchRequest;
31+
import io.milvus.grpc.SearchResults;
2932
import io.milvus.param.Constant;
3033
import io.milvus.param.MetricType;
3134
import io.milvus.param.ParamUtils;
@@ -54,7 +57,7 @@
5457
public class SearchIterator {
5558
private static final Logger logger = LoggerFactory.getLogger(SearchIterator.class);
5659
private final IteratorCache iteratorCache;
57-
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
60+
private final RpcStubWrapper blockingStub;
5861
private final FieldType primaryField;
5962

6063
private final SearchIteratorParam searchIteratorParam;
@@ -76,7 +79,7 @@ public class SearchIterator {
7679
private long sessionTs = 0;
7780

7881
public SearchIterator(SearchIteratorParam searchIteratorParam,
79-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
82+
RpcStubWrapper blockingStub,
8083
FieldType primaryField) {
8184
this.iteratorCache = new IteratorCache();
8285
this.searchIteratorParam = searchIteratorParam;
@@ -97,7 +100,7 @@ public SearchIterator(SearchIteratorParam searchIteratorParam,
97100

98101
// to support V2
99102
public SearchIterator(SearchIteratorReq searchIteratorReq,
100-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
103+
RpcStubWrapper blockingStub,
101104
CreateCollectionReq.FieldSchema primaryField) {
102105
this.iteratorCache = new IteratorCache();
103106
this.blockingStub = blockingStub;
@@ -296,7 +299,7 @@ private SearchResults executeSearch(Map<String, Object> params, String nextExpr,
296299
// set default consistency level
297300
builder.setUseDefaultConsistency(true);
298301

299-
SearchResults response = rpcUtils.retry(() -> blockingStub.search(builder.build()));
302+
SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(builder.build()));
300303
String title = String.format("SearchRequest collectionName:%s", searchIteratorParam.getCollectionName());
301304
rpcUtils.handleResponse(title, response.getStatus());
302305
return response;

sdk-core/src/main/java/io/milvus/orm/iterator/SearchIteratorV2.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343

4444
public class SearchIteratorV2 {
4545
private static final Logger logger = LoggerFactory.getLogger(SearchIteratorV2.class);
46-
private final MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub;
46+
private final RpcStubWrapper blockingStub;
4747

4848
private final SearchIteratorReqV2 searchIteratorReq;
4949
private final int batchSize;
@@ -58,7 +58,7 @@ public class SearchIteratorV2 {
5858

5959
// to support V2
6060
public SearchIteratorV2(SearchIteratorReqV2 searchIteratorReq,
61-
MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub) {
61+
RpcStubWrapper blockingStub) {
6262
this.blockingStub = blockingStub;
6363
this.searchIteratorReq = searchIteratorReq;
6464

@@ -101,7 +101,7 @@ private void setupCollectionID() {
101101
if (StringUtils.isNotEmpty(searchIteratorReq.getDatabaseName())) {
102102
builder.setDbName(searchIteratorReq.getDatabaseName());
103103
}
104-
DescribeCollectionResponse response = rpcUtils.retry(() -> this.blockingStub.describeCollection(builder.build()));
104+
DescribeCollectionResponse response = rpcUtils.retry(() -> blockingStub.get().describeCollection(builder.build()));
105105
String title = String.format("DescribeCollectionRequest collectionName:%s", searchIteratorReq.getCollectionName());
106106
rpcUtils.handleResponse(title, response.getStatus());
107107

@@ -130,7 +130,7 @@ private SearchResults executeSearch(int limit) {
130130
.filterTemplateValues(searchIteratorReq.getFilterTemplateValues())
131131
.build();
132132
SearchRequest searchRequest = new VectorUtils().ConvertToGrpcSearchRequest(request);
133-
SearchResults response = rpcUtils.retry(() -> this.blockingStub.search(searchRequest));
133+
SearchResults response = rpcUtils.retry(() -> blockingStub.get().search(searchRequest));
134134
String title = String.format("SearchRequest collectionName:%s", searchIteratorReq.getCollectionName());
135135
rpcUtils.handleResponse(title, response.getStatus());
136136

sdk-core/src/main/java/io/milvus/v2/client/MilvusClientV2.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.milvus.grpc.ConnectResponse;
2626
import io.milvus.grpc.MilvusServiceGrpc;
2727
import io.milvus.orm.iterator.QueryIterator;
28+
import io.milvus.orm.iterator.RpcStubWrapper;
2829
import io.milvus.orm.iterator.SearchIterator;
2930
import io.milvus.orm.iterator.SearchIteratorV2;
3031
import io.milvus.v2.service.cdc.CDCService;
@@ -716,7 +717,7 @@ public SearchResp hybridSearch(HybridSearchReq request) {
716717
* @return QueryIterator
717718
*/
718719
public QueryIterator queryIterator(QueryIteratorReq request) {
719-
return rpcUtils.retry(() -> vectorService.queryIterator(this.getRpcStub(), request));
720+
return rpcUtils.retry(() -> vectorService.queryIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
720721
}
721722

722723
/**
@@ -726,7 +727,7 @@ public QueryIterator queryIterator(QueryIteratorReq request) {
726727
* @return SearchIterator
727728
*/
728729
public SearchIterator searchIterator(SearchIteratorReq request) {
729-
return rpcUtils.retry(() -> vectorService.searchIterator(this.getRpcStub(), request));
730+
return rpcUtils.retry(() -> vectorService.searchIterator(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
730731
}
731732

732733
/**
@@ -736,7 +737,7 @@ public SearchIterator searchIterator(SearchIteratorReq request) {
736737
* @return SearchIteratorV2
737738
*/
738739
public SearchIteratorV2 searchIteratorV2(SearchIteratorReqV2 request) {
739-
return rpcUtils.retry(() -> vectorService.searchIteratorV2(this.getRpcStub(), request));
740+
return rpcUtils.retry(() -> vectorService.searchIteratorV2(new RpcStubWrapper(this.getRpcStub(), connectConfig.getRpcDeadlineMs()), request));
740741
}
741742

742743
/**

sdk-core/src/main/java/io/milvus/v2/service/vector/VectorService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.milvus.common.utils.JsonUtils;
2525
import io.milvus.grpc.*;
2626
import io.milvus.orm.iterator.QueryIterator;
27+
import io.milvus.orm.iterator.RpcStubWrapper;
2728
import io.milvus.orm.iterator.SearchIterator;
2829
import io.milvus.orm.iterator.SearchIteratorV2;
2930
import io.milvus.v2.exception.ErrorCode;
@@ -291,25 +292,25 @@ public SearchResp hybridSearch(MilvusServiceGrpc.MilvusServiceBlockingStub block
291292
.build();
292293
}
293294

294-
public QueryIterator queryIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
295+
public QueryIterator queryIterator(RpcStubWrapper blockingStub,
295296
QueryIteratorReq request) {
296-
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(),
297+
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(),
297298
request.getCollectionName(), false);
298299
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
299300
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
300301
return new QueryIterator(request, blockingStub, pkField);
301302
}
302303

303-
public SearchIterator searchIterator(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
304+
public SearchIterator searchIterator(RpcStubWrapper blockingStub,
304305
SearchIteratorReq request) {
305-
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub, request.getDatabaseName(),
306+
DescribeCollectionResponse descResp = getCollectionInfo(blockingStub.get(), request.getDatabaseName(),
306307
request.getCollectionName(), false);
307308
DescribeCollectionResp respR = convertUtils.convertDescCollectionResp(descResp);
308309
CreateCollectionReq.FieldSchema pkField = respR.getCollectionSchema().getField(respR.getPrimaryFieldName());
309310
return new SearchIterator(request, blockingStub, pkField);
310311
}
311312

312-
public SearchIteratorV2 searchIteratorV2(MilvusServiceGrpc.MilvusServiceBlockingStub blockingStub,
313+
public SearchIteratorV2 searchIteratorV2(RpcStubWrapper blockingStub,
313314
SearchIteratorReqV2 request) {
314315
return new SearchIteratorV2(request, blockingStub);
315316
}

sdk-core/src/main/java/io/milvus/v2/service/vector/request/SearchReq.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public static class SearchReqBuilder {
331331
private int topK = 0; // default value
332332
private String filter;
333333
private List<String> outputFields = new ArrayList<>(); // default value
334-
private List<BaseVector> data;
334+
private List<BaseVector> data = new ArrayList<>(); // default value
335335
private long offset;
336336
private long limit = 0L; // default value
337337
private int roundDecimal = -1; // default value

0 commit comments

Comments
 (0)