Skip to content

Commit 4e3836e

Browse files
committed
Optimize client pool
Signed-off-by: yhmo <[email protected]>
1 parent eb10051 commit 4e3836e

File tree

8 files changed

+848
-68
lines changed

8 files changed

+848
-68
lines changed
Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,273 @@
1+
package io.milvus.v2;
2+
3+
import com.google.gson.Gson;
4+
import com.google.gson.JsonObject;
5+
import io.milvus.pool.MilvusClientV2Pool;
6+
import io.milvus.pool.PoolConfig;
7+
import io.milvus.v1.CommonUtils;
8+
import io.milvus.v2.client.ConnectConfig;
9+
import io.milvus.v2.client.MilvusClientV2;
10+
import io.milvus.v2.common.ConsistencyLevel;
11+
import io.milvus.v2.common.DataType;
12+
import io.milvus.v2.common.IndexParam;
13+
import io.milvus.v2.service.collection.request.AddFieldReq;
14+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
15+
import io.milvus.v2.service.collection.request.DropCollectionReq;
16+
import io.milvus.v2.service.collection.request.HasCollectionReq;
17+
import io.milvus.v2.service.vector.request.InsertReq;
18+
import io.milvus.v2.service.vector.request.QueryReq;
19+
import io.milvus.v2.service.vector.request.SearchReq;
20+
import io.milvus.v2.service.vector.request.data.FloatVec;
21+
import io.milvus.v2.service.vector.response.InsertResp;
22+
import io.milvus.v2.service.vector.response.QueryResp;
23+
import io.milvus.v2.service.vector.response.SearchResp;
24+
25+
import java.time.Duration;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicLong;
33+
34+
public class ClientPoolDemo {
35+
private static final String ServerUri = "http://localhost:19530";
36+
private static final String CollectionName = "java_sdk_example_pool_demo";
37+
private static final String IDFieldName = "id";
38+
private static final String VectorFieldName = "vector";
39+
private static final String TextFieldName = "text";
40+
private static final int DIM = 256;
41+
private static final String DemoKey = "for_demo";
42+
43+
private static final MilvusClientV2Pool pool;
44+
45+
static {
46+
ConnectConfig defaultConnectConfig = ConnectConfig.builder()
47+
.uri(ServerUri)
48+
.build();
49+
// read this issue for more details about the pool configurations:
50+
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
51+
PoolConfig poolConfig = PoolConfig.builder()
52+
.minIdlePerKey(1)
53+
.maxIdlePerKey(2)
54+
.maxTotalPerKey(5)
55+
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
56+
.build();
57+
try {
58+
pool = new MilvusClientV2Pool(poolConfig, defaultConnectConfig);
59+
System.out.printf("Pool is created with config:%n%s%n", poolConfig);
60+
61+
// prepare the pool to pre-create some clients according to the minIdlePerKey.
62+
// it is like a warmup to reduce the first time cost to call the getClient()
63+
pool.preparePool(DemoKey);
64+
} catch (ClassNotFoundException | NoSuchMethodException e) {
65+
throw new RuntimeException(e);
66+
}
67+
}
68+
69+
private static void createCollection(boolean recreate, long rowCount) {
70+
System.out.println("========== createCollection() ==========");
71+
MilvusClientV2 client = null;
72+
try {
73+
client = pool.getClient(DemoKey);
74+
if (client == null) {
75+
System.out.println("Cannot not get client from key:" + DemoKey);
76+
return;
77+
}
78+
79+
if (recreate) {
80+
client.dropCollection(DropCollectionReq.builder()
81+
.collectionName(CollectionName)
82+
.build());
83+
} else if (client.hasCollection(HasCollectionReq.builder()
84+
.collectionName(CollectionName)
85+
.build())) {
86+
return;
87+
}
88+
89+
CreateCollectionReq.CollectionSchema collectionSchema = CreateCollectionReq.CollectionSchema.builder()
90+
.build();
91+
collectionSchema.addField(AddFieldReq.builder()
92+
.fieldName("id")
93+
.dataType(DataType.Int64)
94+
.isPrimaryKey(true)
95+
.autoID(true)
96+
.build());
97+
collectionSchema.addField(AddFieldReq.builder()
98+
.fieldName(VectorFieldName)
99+
.dataType(DataType.FloatVector)
100+
.dimension(DIM)
101+
.build());
102+
collectionSchema.addField(AddFieldReq.builder()
103+
.fieldName(TextFieldName)
104+
.dataType(DataType.VarChar)
105+
.maxLength(1024)
106+
.build());
107+
108+
List<IndexParam> indexes = new ArrayList<>();
109+
indexes.add(IndexParam.builder()
110+
.fieldName(VectorFieldName)
111+
.indexType(IndexParam.IndexType.FLAT)
112+
.metricType(IndexParam.MetricType.COSINE)
113+
.build());
114+
115+
CreateCollectionReq requestCreate = CreateCollectionReq.builder()
116+
.collectionName(CollectionName)
117+
.collectionSchema(collectionSchema)
118+
.indexParams(indexes)
119+
.consistencyLevel(ConsistencyLevel.BOUNDED)
120+
.build();
121+
client.createCollection(requestCreate);
122+
123+
insertData(rowCount);
124+
} finally {
125+
pool.returnClient(DemoKey, client);
126+
}
127+
}
128+
129+
private static void insertData(long rowCount) {
130+
System.out.println("========== insertData() ==========");
131+
MilvusClientV2 client = null;
132+
try {
133+
client = pool.getClient(DemoKey);
134+
if (client == null) {
135+
System.out.println("Cannot not get client from key:" + DemoKey);
136+
return;
137+
}
138+
139+
Gson gson = new Gson();
140+
long inserted = 0L;
141+
while (inserted < rowCount) {
142+
long batch = 1000L;
143+
if (rowCount - inserted < batch) {
144+
batch = rowCount - inserted;
145+
}
146+
List<JsonObject> rows = new ArrayList<>();
147+
for (long i = 0; i < batch; i++) {
148+
JsonObject row = new JsonObject();
149+
row.add(VectorFieldName, gson.toJsonTree(CommonUtils.generateFloatVector(DIM)));
150+
row.addProperty(TextFieldName, "text_" + i);
151+
rows.add(row);
152+
}
153+
InsertResp resp = client.insert(InsertReq.builder()
154+
.collectionName(CollectionName)
155+
.data(rows)
156+
.build());
157+
inserted += resp.getInsertCnt();
158+
System.out.println("Inserted count:" + resp.getInsertCnt());
159+
}
160+
161+
QueryResp countR = client.query(QueryReq.builder()
162+
.collectionName(CollectionName)
163+
.outputFields(Collections.singletonList("count(*)"))
164+
.consistencyLevel(ConsistencyLevel.STRONG)
165+
.build());
166+
System.out.printf("%d rows persisted%n", (long) countR.getQueryResults().get(0).getEntity().get("count(*)"));
167+
} finally {
168+
pool.returnClient(DemoKey, client);
169+
}
170+
}
171+
172+
private static void search() {
173+
MilvusClientV2 client = null;
174+
try {
175+
client = pool.getClient(DemoKey);
176+
while (client == null) {
177+
try {
178+
// getClient() might return null if it exceeds the borrowMaxWaitMillis when the pool is full.
179+
// retry to call until it return a client.
180+
client = pool.getClient(DemoKey);
181+
} catch (Exception e) {
182+
System.out.printf("Failed to get client, will retry, error: %s%n", e.getMessage());
183+
}
184+
}
185+
186+
// long start = System.currentTimeMillis();
187+
FloatVec vector = new FloatVec(CommonUtils.generateFloatVector(DIM));
188+
SearchResp resp = client.search(SearchReq.builder()
189+
.collectionName(CollectionName)
190+
.limit(10)
191+
.data(Collections.singletonList(vector))
192+
.annsField(VectorFieldName)
193+
.outputFields(Collections.singletonList(TextFieldName))
194+
.build());
195+
// System.out.printf("search time cost: %dms%n", System.currentTimeMillis() - start);
196+
} finally {
197+
pool.returnClient(DemoKey, client);
198+
}
199+
}
200+
201+
private static void printPoolState() {
202+
System.out.println("========== printPoolState() ==========");
203+
System.out.printf("%d idle clients and %d active clients%n",
204+
pool.getIdleClientNumber(DemoKey), pool.getActiveClientNumber(DemoKey));
205+
System.out.printf("%.2f clients fetched per second%n", pool.fetchClientPerSecond(DemoKey));
206+
}
207+
208+
private static void concurrentSearch(int threadCount, int requestCount) {
209+
System.out.println("\n======================================================================");
210+
System.out.println("======================= ConcurrentSearch =============================");
211+
System.out.println("======================================================================");
212+
213+
AtomicLong totalTimeCostMs = new AtomicLong(0L);
214+
class Worker implements Runnable {
215+
@Override
216+
public void run() {
217+
long start = System.currentTimeMillis();
218+
search();
219+
long end = System.currentTimeMillis();
220+
totalTimeCostMs.addAndGet(end - start);
221+
}
222+
}
223+
224+
try {
225+
long start = System.currentTimeMillis();
226+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
227+
for (int i = 0; i < requestCount; i++) {
228+
Runnable worker = new Worker();
229+
executor.execute(worker);
230+
}
231+
executor.shutdown();
232+
233+
// with requests start, more active clients will be created
234+
boolean done = false;
235+
while (!done) {
236+
printPoolState();
237+
done = executor.awaitTermination(1, TimeUnit.SECONDS);
238+
}
239+
240+
long timeGapMs = System.currentTimeMillis() - start;
241+
float avgQPS = (float) requestCount * 1000 / timeGapMs;
242+
long avgLatency = totalTimeCostMs.get() / requestCount;
243+
System.out.printf("%n%d requests done in %.1f seconds, average QPS: %.1f, average latency: %dms%n%n",
244+
requestCount, (float) timeGapMs / 1000, avgQPS, avgLatency);
245+
246+
// after all requests are done, the active clients will be retired and eventually only one idle client left.
247+
// just demo the pool can automatically destroy idle clients, you can directly close the pool without waiting
248+
// it in practice.
249+
while (pool.getActiveClientNumber(DemoKey) > 1) {
250+
TimeUnit.SECONDS.sleep(1);
251+
printPoolState();
252+
}
253+
} catch (Exception e) {
254+
System.err.println("Failed to create executor: " + e);
255+
}
256+
}
257+
258+
public static void main(String[] args) throws InterruptedException {
259+
long rowCount = 10000;
260+
createCollection(true, rowCount);
261+
262+
int threadCount = 50;
263+
int requestCount = 10000;
264+
concurrentSearch(threadCount, requestCount);
265+
266+
// do again
267+
threadCount = 100;
268+
requestCount = 20000;
269+
concurrentSearch(threadCount, requestCount);
270+
271+
pool.close();
272+
}
273+
}

examples/src/main/java/io/milvus/v2/ClientPoolExample.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.List;
4848

4949
public class ClientPoolExample {
50-
public static String serverUri = "http://localhost:19530";
50+
public static String ServerUri = "http://localhost:19530";
5151
public static String CollectionName = "java_sdk_example_pool_v2";
5252
public static String VectorFieldName = "vector";
5353
public static int DIM = 128;
@@ -95,7 +95,7 @@ public static void createDatabases(MilvusClientV2Pool pool) {
9595
// the ClientPool will use different config to create client to connect to specific database
9696
for (String dbName : dbNames) {
9797
ConnectConfig config = ConnectConfig.builder()
98-
.uri(serverUri)
98+
.uri(ServerUri)
9999
.dbName(dbName)
100100
.build();
101101
pool.configForKey(dbName, config);
@@ -288,13 +288,13 @@ public static void dropDatabases(MilvusClientV2Pool pool) {
288288

289289
public static void main(String[] args) throws InterruptedException {
290290
ConnectConfig defaultConfig = ConnectConfig.builder()
291-
.uri(serverUri)
291+
.uri(ServerUri)
292292
.build();
293293
// read this issue for more details about the pool configurations:
294294
// https://github.com/milvus-io/milvus-sdk-java/issues/1577
295295
PoolConfig poolConfig = PoolConfig.builder()
296-
.maxIdlePerKey(10) // max idle clients per key
297-
.maxTotalPerKey(50) // max total(idle + active) clients per key
296+
.maxIdlePerKey(1) // max idle clients per key
297+
.maxTotalPerKey(5) // max total(idle + active) clients per key
298298
.maxTotal(1000) // max total clients for all keys
299299
.maxBlockWaitDuration(Duration.ofSeconds(5L)) // getClient() will wait 5 seconds if no idle client available
300300
.minEvictableIdleDuration(Duration.ofSeconds(10L)) // if number of idle clients is larger than maxIdlePerKey, redundant idle clients will be evicted after 10 seconds
@@ -340,7 +340,7 @@ public static void main(String[] args) throws InterruptedException {
340340

341341
long end = System.currentTimeMillis();
342342
System.out.printf("%d insert requests and %d search requests finished in %.3f seconds%n",
343-
threadCount * repeatRequests * 3, threadCount * repeatRequests * 3, (end - start) * 0.001);
343+
threadCount * repeatRequests * dbNames.size(), threadCount * repeatRequests * dbNames.size(), (end - start) * 0.001);
344344

345345
printClientNumber(pool);
346346
pool.clear(); // clear idle clients

0 commit comments

Comments
 (0)