Skip to content

Commit ed68979

Browse files
committed
update code to use MilvusClientV2 API
1 parent 258358b commit ed68979

File tree

8 files changed

+155
-171
lines changed

8 files changed

+155
-171
lines changed

README_OSS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ Note: Make sure the schema on both sides match each other. In the schema, there
5252
```properties
5353
key.converter.schemas.enable=false
5454
value.converter.schemas.enable=false
55-
plugin.path=libs/zilliz-kafka-connect-milvus-0.1.0
55+
plugin.path=libs/zilliz-kafka-connect-milvus-xxx
5656
```
5757
4. create and configure a `milvus-sink-connector.properties` file in the `config` directory of your Kafka installation.
5858
```properties

pom.xml

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<url>https://github.com/zilliztech/kafka-connect-milvus</url>
1111

1212
<properties>
13-
<project-version>0.1.2</project-version>
13+
<project-version>0.1.3</project-version>
1414
<confluent.maven.repo>https://packages.confluent.io/maven/</confluent.maven.repo>
1515
</properties>
1616

@@ -49,27 +49,11 @@
4949
<version>4.13.2</version>
5050
<scope>test</scope>
5151
</dependency>
52-
<!-- grpc dependence-->
53-
<dependency>
54-
<groupId>io.grpc</groupId>
55-
<artifactId>grpc-netty-shaded</artifactId>
56-
<version>1.46.0</version>
57-
</dependency>
58-
<dependency>
59-
<groupId>io.grpc</groupId>
60-
<artifactId>grpc-grpclb</artifactId>
61-
<version>1.59.0</version>
62-
</dependency>
63-
<dependency>
64-
<groupId>io.grpc</groupId>
65-
<artifactId>grpc-protobuf</artifactId>
66-
<version>1.59.0</version>
67-
</dependency>
6852
<!-- update dependence to resolve CVE issues -->
6953
<dependency>
7054
<groupId>org.apache.kafka</groupId>
7155
<artifactId>connect-json</artifactId>
72-
<version>3.6.0</version>
56+
<version>3.7.0</version>
7357
</dependency>
7458
<dependency>
7559
<groupId>org.apache.commons</groupId>
Lines changed: 61 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
11
package com.milvus.io.kafka;
22

3+
import com.alibaba.fastjson.JSONObject;
34
import com.milvus.io.kafka.helper.MilvusClientHelper;
45
import com.milvus.io.kafka.utils.DataConverter;
56
import com.milvus.io.kafka.utils.Utils;
67
import com.milvus.io.kafka.utils.VersionUtil;
7-
import io.milvus.client.MilvusServiceClient;
8-
import io.milvus.grpc.CollectionSchema;
9-
import io.milvus.grpc.DescribeCollectionResponse;
10-
import io.milvus.grpc.GetLoadStateResponse;
11-
import io.milvus.grpc.LoadState;
12-
import io.milvus.param.R;
13-
import io.milvus.param.collection.DescribeCollectionParam;
14-
import io.milvus.param.collection.GetLoadStateParam;
15-
import io.milvus.param.dml.InsertParam;
8+
import io.milvus.v2.client.MilvusClientV2;
9+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
10+
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
11+
import io.milvus.v2.service.collection.request.GetLoadStateReq;
12+
import io.milvus.v2.service.collection.request.HasCollectionReq;
13+
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
14+
import io.milvus.v2.service.vector.request.InsertReq;
15+
import io.milvus.v2.service.vector.request.UpsertReq;
1616
import org.apache.kafka.connect.sink.SinkRecord;
1717
import org.apache.kafka.connect.sink.SinkTask;
1818
import org.slf4j.Logger;
1919
import org.slf4j.LoggerFactory;
2020

21-
import java.util.Collection;
22-
import java.util.List;
23-
import java.util.Map;
21+
import java.util.*;
2422

2523
import static com.milvus.io.kafka.MilvusSinkConnectorConfig.TOKEN;
2624

2725
public class MilvusSinkTask extends SinkTask {
2826

2927
private static final Logger log = LoggerFactory.getLogger(MilvusSinkTask.class);
3028
private MilvusSinkConnectorConfig config;
31-
private MilvusServiceClient myMilvusClient;
29+
private MilvusClientV2 myMilvusClient;
3230
private DataConverter converter;
33-
private CollectionSchema collectionSchema;
31+
private DescribeCollectionResp response;
3432

3533
@Override
3634
public String version() {
@@ -43,64 +41,80 @@ public void start(Map<String, String> props) {
4341
}
4442

4543
// make visible for test
46-
protected void start(Map<String, String> props, MilvusServiceClient milvusClient) {
44+
protected void start(Map<String, String> props, MilvusClientV2 milvusClient) {
4745
log.info("Starting MilvusSinkTask.");
4846
props.put(TOKEN, Utils.encryptToken(props.get(TOKEN)));
4947
this.config = new MilvusSinkConnectorConfig(props);
5048
this.converter = new DataConverter(config);
5149
this.myMilvusClient = milvusClient == null ? new MilvusClientHelper().createMilvusClient(config) : milvusClient;
52-
this.collectionSchema = GetCollectionInfo(config.getCollectionName());
53-
5450
log.info("Started MilvusSinkTask, Connecting to Zilliz Cluster:" + config.getUrl());
51+
preValidate();
52+
}
5553

54+
private void preValidate() {
55+
// check if the collection exists
56+
if (!myMilvusClient.hasCollection(HasCollectionReq.builder().collectionName(config.getCollectionName()).build())) {
57+
log.error("Collection not exist");
58+
throw new RuntimeException("Collection not exist" + config.getCollectionName());
59+
}
60+
// check if the collection is loaded
61+
if (!myMilvusClient.getLoadState(GetLoadStateReq.builder().collectionName(config.getCollectionName()).build())){
62+
log.error("Collection not loaded");
63+
throw new RuntimeException("Collection not loaded" + config.getCollectionName());
64+
}
65+
this.response = myMilvusClient.describeCollection(DescribeCollectionReq.builder().collectionName(config.getCollectionName()).build());
5666
}
5767

5868
@Override
5969
public void put(Collection<SinkRecord> records) {
6070
log.info("Putting {} records to Milvus.", records.size());
71+
if(records.isEmpty()) {
72+
log.info("No records to put.");
73+
return;
74+
}
6175

76+
// not support dynamic schema for now, for dynamic schema, we need to put the data into a JSONObject
77+
List<JSONObject> datas = new ArrayList<>();
6278
for (SinkRecord record : records) {
6379
log.debug("Writing {} to Milvus.", record);
64-
WriteRecord(record, collectionSchema);
80+
if(record.value() == null) {
81+
log.warn("Skipping record with null value.");
82+
continue;
83+
}
84+
try {
85+
JSONObject data = converter.convertRecord(record, response.getCollectionSchema());
86+
datas.add(data);
87+
}catch (Exception e){
88+
log.error("Failed to convert record to JSONObject, skip it", e);
89+
}
6590
}
66-
}
6791

68-
protected CollectionSchema GetCollectionInfo(String collectionName) {
69-
// check if the collection exists
70-
R<DescribeCollectionResponse> response = myMilvusClient.describeCollection(DescribeCollectionParam.newBuilder()
71-
.withCollectionName(collectionName).build());
72-
if (response.getData() == null) {
73-
log.error("Collection not exist");
74-
throw new RuntimeException("Collection not exist" + collectionName);
92+
if(!response.getAutoID()){
93+
// default to use upsert
94+
UpsertReq upsertReq = UpsertReq.builder()
95+
.collectionName(config.getCollectionName())
96+
.data(datas)
97+
.build();
98+
log.info("Upserting data to collection: {} with datas: {}", config.getCollectionName(), datas);
99+
myMilvusClient.upsert(upsertReq);
75100
}else {
76-
GetLoadStateParam getLoadStateParam = GetLoadStateParam.newBuilder()
77-
.withCollectionName(collectionName)
101+
InsertReq insertReq = InsertReq.builder()
102+
.collectionName(config.getCollectionName())
103+
.data(datas)
78104
.build();
79-
R<GetLoadStateResponse> loadState = myMilvusClient.getLoadState(getLoadStateParam);
80-
if (loadState.getData().getState() != LoadState.LoadStateLoaded){
81-
log.error("Collection not loaded");
82-
throw new RuntimeException("Collection not loaded" + collectionName);
83-
}
105+
log.info("Inserting data to collection: {} with fields: {}", config.getCollectionName(), datas.get(0).keySet());
106+
myMilvusClient.insert(insertReq);
84107
}
85-
return response.getData().getSchema();
86-
}
87108

88-
protected void WriteRecord(SinkRecord record, CollectionSchema collectionSchema) {
89-
// not support dynamic schema for now, for dynamic schema, we need to put the data into a JSONObject
90-
List<InsertParam.Field> fields = converter.convertRecord(record, collectionSchema);
91-
InsertParam insertParam = InsertParam.newBuilder()
92-
.withCollectionName(config.getCollectionName())
93-
.withFields(fields)
94-
.build();
95-
96-
log.info("Inserting data to collection: " + config.getCollectionName() + " with fields: " +
97-
insertParam.getFields());
98-
myMilvusClient.insert(insertParam);
99109
}
100110

101111
@Override
102112
public void stop() {
103113
log.info("Stopping Milvus client.");
104-
myMilvusClient.close();
114+
try {
115+
myMilvusClient.close(3);
116+
} catch (InterruptedException e) {
117+
throw new RuntimeException(e);
118+
}
105119
}
106120
}

src/main/java/com/milvus/io/kafka/helper/MilvusClientHelper.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
import com.milvus.io.kafka.utils.Utils;
55
import io.milvus.client.MilvusServiceClient;
66
import io.milvus.param.ConnectParam;
7+
import io.milvus.v2.client.ConnectConfig;
8+
import io.milvus.v2.client.MilvusClientV2;
79

810
public class MilvusClientHelper {
9-
public MilvusServiceClient createMilvusClient(MilvusSinkConnectorConfig config) {
10-
ConnectParam connectParam = ConnectParam.newBuilder()
11-
.withUri(config.getUrl())
12-
.withToken(Utils.decryptToken(config.getToken().value()))
11+
public MilvusClientV2 createMilvusClient(MilvusSinkConnectorConfig config) {
12+
ConnectConfig connectConfig = ConnectConfig.builder()
13+
.uri(config.getUrl())
14+
.token(Utils.decryptToken(config.getToken().value()))
1315
.build();
14-
return new MilvusServiceClient(connectParam);
16+
return new MilvusClientV2(connectConfig);
1517
}
1618
}

src/main/java/com/milvus/io/kafka/utils/DataConverter.java

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import com.google.common.collect.Lists;
55
import com.google.gson.Gson;
66
import com.milvus.io.kafka.MilvusSinkConnectorConfig;
7-
import io.milvus.grpc.CollectionSchema;
8-
import io.milvus.grpc.DataType;
9-
import io.milvus.grpc.FieldSchema;
107
import io.milvus.param.dml.InsertParam;
8+
import io.milvus.v2.common.DataType;
9+
import io.milvus.v2.service.collection.request.CreateCollectionReq;
1110
import org.apache.kafka.connect.data.Struct;
1211
import org.apache.kafka.connect.sink.SinkRecord;
1312
import org.slf4j.Logger;
@@ -27,9 +26,9 @@ public DataConverter(MilvusSinkConnectorConfig config) {
2726
this.config = config;
2827
}
2928
/*
30-
* Convert SinkRecord to List<InsertParam.Field>
29+
* Convert SinkRecord to JSONObject
3130
*/
32-
public List<InsertParam.Field> convertRecord(SinkRecord sr, CollectionSchema collectionSchema) {
31+
public JSONObject convertRecord(SinkRecord sr, CreateCollectionReq.CollectionSchema collectionSchema) {
3332
// parse sinkRecord to get filed name and value
3433
if(sr.value() instanceof Struct) {
3534
return parseValue((Struct)sr.value(), collectionSchema);
@@ -40,35 +39,29 @@ public List<InsertParam.Field> convertRecord(SinkRecord sr, CollectionSchema col
4039
}
4140
}
4241

43-
private List<InsertParam.Field> parseValue(HashMap<?, ?> mapValue, CollectionSchema collectionSchema) {
44-
List<InsertParam.Field> fields = new ArrayList<>();
45-
// convert collectionSchema.getFieldsList: Filed's Name and DataType to a Map
46-
Map<String, DataType> fieldType = collectionSchema.getFieldsList().stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getDataType));
47-
mapValue.forEach((key1, value) -> {
48-
// for each field, create a InsertParam.Field
49-
if(fieldType.containsKey(key1.toString())){
42+
private JSONObject parseValue(HashMap<?, ?> mapValue, CreateCollectionReq.CollectionSchema collectionSchema) {
43+
JSONObject fields = new JSONObject();
44+
mapValue.forEach((field, value) -> {
45+
if(collectionSchema.getField(field.toString())!=null){
5046
// if the key exists in the collection, store the value by collectionSchema DataType
51-
fields.add(new InsertParam.Field(key1.toString(), Collections.singletonList(castValueToType(value, fieldType.get(key1.toString())))));
52-
}else if(collectionSchema.getEnableDynamicField()){
53-
// if the key not exists in the collection and the collection is dynamic, store the value directly
54-
fields.add(new InsertParam.Field(key1.toString(), Collections.singletonList(value)));
47+
fields.put(field.toString(), castValueToType(value, collectionSchema.getField(field.toString()).getDataType()));
48+
}else {
49+
log.warn("Field {} not exists in collection", field);
5550
}
51+
5652
});
5753
return fields;
5854
}
5955

60-
private List<InsertParam.Field> parseValue(Struct structValue, CollectionSchema collectionSchema) {
61-
List<InsertParam.Field> fields = new ArrayList<>();
62-
// convert collectionSchema.getFieldsList: Filed's Name and DataType to a Map
63-
Map<String, DataType> fieldType = collectionSchema.getFieldsList().stream().collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getDataType));
56+
private JSONObject parseValue(Struct structValue, CreateCollectionReq.CollectionSchema collectionSchema) {
57+
JSONObject fields = new JSONObject();
58+
6459
structValue.schema().fields().forEach(field -> {
65-
// for each field, create a InsertParam.Field
66-
if(fieldType.containsKey(field.name())){
60+
if(collectionSchema.getField(field.name()) != null){
6761
// if the key exists in the collection, store the value by collectionSchema DataType
68-
fields.add(new InsertParam.Field(field.name(), Collections.singletonList(castValueToType(structValue.get(field.name()), fieldType.get(field.name())))));
69-
}else if(collectionSchema.getEnableDynamicField()){
70-
// if the key not exists in the collection and the collection is dynamic, store the value directly
71-
fields.add(new InsertParam.Field(field.name(), Collections.singletonList(structValue.get(field.name()))));
62+
fields.put(field.toString(), castValueToType(structValue.get(field.name()), collectionSchema.getField(field.name()).getDataType()));
63+
}else {
64+
log.warn("Field {} not exists in collection", field);
7265
}
7366
});
7467

@@ -141,18 +134,4 @@ protected ByteBuffer parseBinaryVectorField(String vectors){
141134
throw new RuntimeException("parse binary vector field error: " + e.getMessage() + vectors);
142135
}
143136
}
144-
145-
public List<JSONObject> convertRecordWithDynamicSchema(SinkRecord sr, CollectionSchema collectionSchema) {
146-
List<InsertParam.Field> fields = convertRecord(sr, collectionSchema);
147-
List<JSONObject> jsonObjects = new ArrayList<>();
148-
int rows = fields.get(0).getValues().size();
149-
for (int i = 0; i < rows; i++) {
150-
JSONObject jsonObject = new JSONObject();
151-
for (InsertParam.Field field : fields) {
152-
jsonObject.put(field.getName(), field.getValues().get(i));
153-
}
154-
jsonObjects.add(jsonObject);
155-
}
156-
return jsonObjects;
157-
}
158137
}

0 commit comments

Comments
 (0)