Skip to content

Commit b424265

Browse files
authored
Merge pull request #32 from zilliztech/dev
dev
2 parents 6c3b88f + 6de5774 commit b424265

22 files changed

+588
-427
lines changed

pom.xml

Lines changed: 27 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -23,79 +23,50 @@
2323
</repositories>
2424

2525
<dependencies>
26-
<dependency>
27-
<groupId>org.apache.kafka</groupId>
28-
<artifactId>connect-api</artifactId>
29-
<version>3.6.0</version>
30-
</dependency>
31-
<dependency>
32-
<groupId>org.slf4j</groupId>
33-
<artifactId>slf4j-api</artifactId>
34-
<version>2.0.7</version>
35-
</dependency>
36-
<dependency>
37-
<groupId>ch.qos.logback</groupId>
38-
<artifactId>logback-classic</artifactId>
39-
<version>1.4.14</version> <!-- Check for the latest version -->
40-
</dependency>
41-
<dependency>
42-
<groupId>io.milvus</groupId>
43-
<artifactId>milvus-sdk-java</artifactId>
44-
<version>2.5.1</version>
45-
</dependency>
46-
<dependency>
47-
<groupId>junit</groupId>
48-
<artifactId>junit</artifactId>
49-
<version>4.13.2</version>
50-
<scope>test</scope>
51-
</dependency>
52-
<!-- update dependence to resolve CVE issues -->
5326
<dependency>
5427
<groupId>org.apache.kafka</groupId>
55-
<artifactId>connect-json</artifactId>
56-
<version>3.7.0</version>
28+
<artifactId>connect-api</artifactId>
29+
<version>3.9.0</version>
5730
</dependency>
5831
<dependency>
59-
<groupId>org.apache.commons</groupId>
60-
<artifactId>commons-text</artifactId>
61-
<version>1.10.0</version>
32+
<groupId>com.konghq</groupId>
33+
<artifactId>unirest-java</artifactId>
34+
<version>3.14.5</version>
6235
</dependency>
36+
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
6337
<dependency>
64-
<groupId>com.squareup.okio</groupId>
65-
<artifactId>okio</artifactId>
66-
<version>3.6.0</version>
67-
</dependency>
68-
<dependency>
69-
<groupId>io.netty</groupId>
70-
<artifactId>netty-common</artifactId>
71-
<version>4.1.115.Final</version>
72-
</dependency>
73-
<dependency>
74-
<groupId>io.netty</groupId>
75-
<artifactId>netty-codec-http2</artifactId>
76-
<version>4.1.108.Final</version>
38+
<groupId>org.projectlombok</groupId>
39+
<artifactId>lombok</artifactId>
40+
<version>1.18.36</version>
7741
</dependency>
42+
7843
<dependency>
79-
<groupId>org.codehaus.plexus</groupId>
80-
<artifactId>plexus-utils</artifactId>
81-
<version>4.0.0</version>
44+
<groupId>com.google.code.gson</groupId>
45+
<artifactId>gson</artifactId>
46+
<version>2.11.0</version>
8247
</dependency>
8348
<dependency>
84-
<groupId>com.fasterxml.jackson.core</groupId>
85-
<artifactId>jackson-databind</artifactId>
86-
<version>2.15.3</version>
49+
<groupId>org.slf4j</groupId>
50+
<artifactId>slf4j-api</artifactId>
51+
<version>2.0.7</version>
8752
</dependency>
8853
<dependency>
89-
<groupId>com.google.guava</groupId>
90-
<artifactId>guava</artifactId>
91-
<version>32.1.3-jre</version>
54+
<groupId>ch.qos.logback</groupId>
55+
<artifactId>logback-classic</artifactId>
56+
<version>1.4.14</version> <!-- Check for the latest version -->
9257
</dependency>
9358
<dependency>
9459
<groupId>org.mockito</groupId>
9560
<artifactId>mockito-core</artifactId>
9661
<version>5.6.0</version>
9762
<scope>test</scope>
9863
</dependency>
64+
<dependency>
65+
<groupId>org.junit.jupiter</groupId>
66+
<artifactId>junit-jupiter</artifactId>
67+
<version>5.8.1</version>
68+
<scope>test</scope>
69+
</dependency>
9970
</dependencies>
10071

10172
<build>
@@ -170,7 +141,7 @@
170141
<plugin>
171142
<groupId>org.owasp</groupId>
172143
<artifactId>dependency-check-maven</artifactId>
173-
<version>8.4.2</version>
144+
<version>11.1.0</version>
174145
<executions>
175146
<execution>
176147
<goals>
@@ -180,6 +151,7 @@
180151
</executions>
181152
<configuration>
182153
<assemblyAnalyzerEnabled>false</assemblyAnalyzerEnabled>
154+
<nvdApiKey>05865c08-9d5c-4ba0-82b8-f3d8030f3683</nvdApiKey>
183155
</configuration>
184156
</plugin>
185157
</plugins>

src/main/java/com/milvus/io/kafka/MilvusSinkConnectorConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
public class MilvusSinkConnectorConfig extends AbstractConfig {
1010
protected static final String URL = "public.endpoint";
1111
protected static final String TOKEN = "token";
12+
protected static final String DATABASE_NAME = "database.name";
1213
protected static final String COLLECTION_NAME = "collection.name";
1314

1415
public MilvusSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
@@ -23,6 +24,7 @@ public static ConfigDef conf() {
2324
return new ConfigDef()
2425
.define(URL, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Public Endpoint")
2526
.define(TOKEN, ConfigDef.Type.PASSWORD, "db_admin:****", ConfigDef.Importance.HIGH, "Token to connect milvus")
27+
.define(DATABASE_NAME, ConfigDef.Type.STRING, "default", ConfigDef.Importance.MEDIUM, "Database name to save the topic messages")
2628
.define(COLLECTION_NAME, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Collection name to save the topic messages");
2729
}
2830

@@ -34,6 +36,10 @@ public Password getToken() {
3436
return getPassword(TOKEN);
3537
}
3638

39+
public String getDatabaseName() {
40+
return getString(DATABASE_NAME);
41+
}
42+
3743
public String getCollectionName() {
3844
return getString(COLLECTION_NAME);
3945
}

src/main/java/com/milvus/io/kafka/MilvusSinkTask.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,14 @@
22

33
import com.google.gson.JsonObject;
44
import static com.milvus.io.kafka.MilvusSinkConnectorConfig.TOKEN;
5+
import com.milvus.io.kafka.client.MilvusRestClient;
6+
import com.milvus.io.kafka.client.request.UpsertReq;
7+
import com.milvus.io.kafka.client.response.DescribeCollectionResp;
8+
import com.milvus.io.kafka.client.response.GetLoadStateResp;
59
import com.milvus.io.kafka.helper.MilvusClientHelper;
610
import com.milvus.io.kafka.utils.DataConverter;
711
import com.milvus.io.kafka.utils.Utils;
812
import com.milvus.io.kafka.utils.VersionUtil;
9-
import io.milvus.v2.client.MilvusClientV2;
10-
import io.milvus.v2.service.collection.request.DescribeCollectionReq;
11-
import io.milvus.v2.service.collection.request.GetLoadStateReq;
12-
import io.milvus.v2.service.collection.request.HasCollectionReq;
13-
import io.milvus.v2.service.collection.response.DescribeCollectionResp;
14-
import io.milvus.v2.service.vector.request.InsertReq;
15-
import io.milvus.v2.service.vector.request.UpsertReq;
1613
import org.apache.kafka.connect.sink.SinkRecord;
1714
import org.apache.kafka.connect.sink.SinkTask;
1815
import org.slf4j.Logger;
@@ -22,12 +19,13 @@
2219
import java.util.Collection;
2320
import java.util.List;
2421
import java.util.Map;
22+
import java.util.Objects;
2523

2624
public class MilvusSinkTask extends SinkTask {
2725

2826
private static final Logger log = LoggerFactory.getLogger(MilvusSinkTask.class);
2927
private MilvusSinkConnectorConfig config;
30-
private MilvusClientV2 myMilvusClient;
28+
private MilvusRestClient myMilvusClient;
3129
private DataConverter converter;
3230
private DescribeCollectionResp response;
3331

@@ -42,7 +40,7 @@ public void start(Map<String, String> props) {
4240
}
4341

4442
// make visible for test
45-
protected void start(Map<String, String> props, MilvusClientV2 milvusClient) {
43+
protected void start(Map<String, String> props, MilvusRestClient milvusClient) {
4644
log.info("Starting MilvusSinkTask.");
4745
props.put(TOKEN, Utils.encryptToken(props.get(TOKEN)));
4846
this.config = new MilvusSinkConnectorConfig(props);
@@ -54,16 +52,17 @@ protected void start(Map<String, String> props, MilvusClientV2 milvusClient) {
5452

5553
private void preValidate() {
5654
// check if the collection exists
57-
if (!myMilvusClient.hasCollection(HasCollectionReq.builder().collectionName(config.getCollectionName()).build())) {
55+
if (!myMilvusClient.hasCollection(config.getCollectionName())) {
5856
log.error("Collection not exist");
5957
throw new RuntimeException("Collection not exist" + config.getCollectionName());
6058
}
6159
// check if the collection is loaded
62-
if (!myMilvusClient.getLoadState(GetLoadStateReq.builder().collectionName(config.getCollectionName()).build())) {
60+
GetLoadStateResp getLoadStateResp = myMilvusClient.getLoadState(config.getCollectionName());
61+
if (!Objects.equals(getLoadStateResp.getLoadState(), "LoadStateLoaded")) {
6362
log.error("Collection not loaded");
6463
throw new RuntimeException("Collection not loaded" + config.getCollectionName());
6564
}
66-
this.response = myMilvusClient.describeCollection(DescribeCollectionReq.builder().collectionName(config.getCollectionName()).build());
65+
this.response = myMilvusClient.describeCollection(config.getCollectionName());
6766
}
6867

6968
@Override
@@ -83,7 +82,7 @@ public void put(Collection<SinkRecord> records) {
8382
continue;
8483
}
8584
try {
86-
JsonObject data = converter.convertRecord(record, response.getCollectionSchema());
85+
JsonObject data = converter.convertRecord(record, response);
8786
datas.add(data);
8887
} catch (Exception e) {
8988
log.error("Failed to convert record to JSONObject, skip it", e);
@@ -98,24 +97,12 @@ public void put(Collection<SinkRecord> records) {
9897
.build();
9998
log.info("Upserting data to collection: {} with datas: {}", config.getCollectionName(), datas);
10099
myMilvusClient.upsert(upsertReq);
101-
} else {
102-
InsertReq insertReq = InsertReq.builder()
103-
.collectionName(config.getCollectionName())
104-
.data(datas)
105-
.build();
106-
log.info("Inserting data to collection: {} with fields: {}", config.getCollectionName(), datas.get(0).keySet());
107-
myMilvusClient.insert(insertReq);
108100
}
109101

110102
}
111103

112104
@Override
113105
public void stop() {
114106
log.info("Stopping Milvus client.");
115-
try {
116-
myMilvusClient.close(3);
117-
} catch (InterruptedException e) {
118-
throw new RuntimeException(e);
119-
}
120107
}
121108
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.milvus.io.kafka.client;
2+
3+
import com.google.gson.reflect.TypeToken;
4+
import com.milvus.io.kafka.client.common.JsonUtils;
5+
import com.milvus.io.kafka.client.request.UpsertReq;
6+
import com.milvus.io.kafka.client.response.DescribeCollectionResp;
7+
import com.milvus.io.kafka.client.response.GetLoadStateResp;
8+
import com.milvus.io.kafka.client.response.HasCollectionResp;
9+
import com.milvus.io.kafka.client.response.RestfulResponse;
10+
import kong.unirest.HttpResponse;
11+
import kong.unirest.Unirest;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
public class MilvusRestClient {
19+
private static final Logger log = LoggerFactory.getLogger(MilvusRestClient.class);
20+
21+
private final String url;
22+
private final String token;
23+
private final String database;
24+
25+
public MilvusRestClient(String url, String token, String database) {
26+
this.url = url;
27+
this.token = token;
28+
this.database = database;
29+
}
30+
31+
public Boolean hasCollection(String collectionName) {
32+
String endpoint = url + "/v2/vectordb/collections/has";
33+
Map<String, Object> params = createBaseParams(collectionName);
34+
35+
String response = doPost(endpoint, params);
36+
HasCollectionResp restfulResponse = JsonUtils.fromJson(response, new TypeToken<HasCollectionResp>() {}.getType());
37+
return restfulResponse.getHas();
38+
}
39+
40+
public DescribeCollectionResp describeCollection(String collectionName) {
41+
String endpoint = url + "/v2/vectordb/collections/describe";
42+
Map<String, Object> params = createBaseParams(collectionName);
43+
44+
String response = doPost(endpoint, params);
45+
return JsonUtils.fromJson(response, new TypeToken<DescribeCollectionResp>() {}.getType());
46+
}
47+
48+
public GetLoadStateResp getLoadState(String collectionName) {
49+
String endpoint = url + "/v2/vectordb/collections/get_load_state";
50+
Map<String, Object> params = createBaseParams(collectionName);
51+
52+
String response = doPost(endpoint, params);
53+
return JsonUtils.fromJson(response, new TypeToken<GetLoadStateResp>() {}.getType());
54+
}
55+
56+
public void upsert(UpsertReq upsertReq) {
57+
upsertReq.setDbName(database);
58+
String endpoint = url + "/v2/vectordb/entities/upsert";
59+
doPost(endpoint, upsertReq);
60+
}
61+
62+
private String doPost(String endpoint, Object params) {
63+
try {
64+
HttpResponse<String> response = Unirest.post(endpoint)
65+
.header("Authorization", "Bearer " + token)
66+
.header("Content-Type", "application/json")
67+
.body(JsonUtils.toJson(params))
68+
.asString();
69+
70+
if (response.getStatus() != 200) {
71+
log.error("HTTP Error {}: {}", response.getStatus(), response.getStatusText());
72+
throw new RuntimeException("Failed to call Milvus server");
73+
}
74+
75+
RestfulResponse<Object> restfulResponse = JsonUtils.fromJson(response.getBody(), new TypeToken<RestfulResponse<Object>>() {}.getType());
76+
77+
if (restfulResponse.getCode() != 0) {
78+
log.error("Milvus API Error: {}", restfulResponse.getMessage());
79+
throw new RuntimeException("Milvus server returned an error: " + restfulResponse.getMessage());
80+
}
81+
82+
return JsonUtils.toJson(restfulResponse.getData());
83+
} catch (Exception e) {
84+
log.error("Error calling Milvus server at {}: {}", endpoint, e.getMessage());
85+
throw new RuntimeException("Failed to call Milvus server", e);
86+
}
87+
}
88+
89+
private Map<String, Object> createBaseParams(String collectionName) {
90+
Map<String, Object> params = new HashMap<>();
91+
params.put("dbName", database);
92+
params.put("collectionName", collectionName);
93+
return params;
94+
}
95+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.milvus.io.kafka.client.common;
21+
22+
import lombok.Getter;
23+
24+
@Getter
25+
public enum ConsistencyLevel {
26+
STRONG("Strong", 0),
27+
SESSION("Session", 1),
28+
BOUNDED("Bounded", 2),
29+
EVENTUALLY("Eventually", 3),
30+
;
31+
private final String name;
32+
private final int code;
33+
34+
ConsistencyLevel(String name, int code) {
35+
this.name = name;
36+
this.code = code;
37+
}
38+
}

0 commit comments

Comments
 (0)