Skip to content

Commit 125483b

Browse files
authored
feat: optimize insert in java sdk (#3525)
1 parent bb6bc09 commit 125483b

File tree

20 files changed

+749
-760
lines changed

20 files changed

+749
-760
lines changed

java/openmldb-common/src/main/java/com/_4paradigm/openmldb/common/codec/FlexibleRowBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ public boolean setNULL(int idx) {
213213
}
214214
Type.DataType type = metaData.getSchema().get(idx).getDataType();
215215
if (type == Type.DataType.kVarchar || type == Type.DataType.kString) {
216+
if (settedValue.at(idx)) {
217+
return false;
218+
}
216219
if (idx != metaData.getStrIdxList().get(curStrIdx)) {
217220
if (stringValueCache == null) {
218221
stringValueCache = new TreeMap<>();

java/openmldb-jdbc/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@
6161
<artifactId>snappy-java</artifactId>
6262
<version>1.1.7.2</version>
6363
</dependency>
64+
<dependency>
65+
<groupId>com.github.ben-manes.caffeine</groupId>
66+
<artifactId>caffeine</artifactId>
67+
<version>2.9.3</version>
68+
</dependency>
6469

6570
<!-- https://mvnrepository.com/artifact/org.testng/testng -->
6671
<dependency>

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/jdbc/SQLInsertMetaData.java

Lines changed: 19 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,53 +18,34 @@
1818

1919
import static com._4paradigm.openmldb.sdk.impl.Util.sqlTypeToString;
2020

21-
import com._4paradigm.openmldb.DataType;
22-
import com._4paradigm.openmldb.Schema;
23-
import com._4paradigm.openmldb.common.Pair;
24-
import com._4paradigm.openmldb.sdk.Common;
21+
import com._4paradigm.openmldb.sdk.Schema;
2522

2623
import java.sql.ResultSetMetaData;
2724
import java.sql.SQLException;
2825
import java.util.List;
2926

3027
public class SQLInsertMetaData implements ResultSetMetaData {
3128

32-
private final List<DataType> schema;
33-
private final Schema realSchema;
34-
private final List<Pair<Long, Integer>> idx;
29+
private final Schema schema;
30+
private final List<Integer> holeIdx;
3531

36-
public SQLInsertMetaData(List<DataType> schema,
37-
Schema realSchema,
38-
List<Pair<Long, Integer>> idx) {
32+
public SQLInsertMetaData(Schema schema, List<Integer> holeIdx) {
3933
this.schema = schema;
40-
this.realSchema = realSchema;
41-
this.idx = idx;
34+
this.holeIdx = holeIdx;
4235
}
4336

44-
private void checkSchemaNull() throws SQLException {
45-
if (schema == null) {
46-
throw new SQLException("schema is null");
47-
}
48-
}
49-
50-
private void checkIdx(int i) throws SQLException {
51-
if (i <= 0) {
37+
private void check(int i) throws SQLException {
38+
if (i < 0) {
5239
throw new SQLException("index underflow");
5340
}
54-
if (i > schema.size()) {
41+
if (i >= holeIdx.size()) {
5542
throw new SQLException("index overflow");
5643
}
5744
}
5845

59-
public void check(int i) throws SQLException {
60-
checkIdx(i);
61-
checkSchemaNull();
62-
}
63-
6446
@Override
6547
public int getColumnCount() throws SQLException {
66-
checkSchemaNull();
67-
return schema.size();
48+
return holeIdx.size();
6849
}
6950

7051
@Override
@@ -93,9 +74,10 @@ public boolean isCurrency(int i) throws SQLException {
9374

9475
@Override
9576
public int isNullable(int i) throws SQLException {
96-
check(i);
97-
Long index = idx.get(i - 1).getKey();
98-
if (realSchema.IsColumnNotNull(index)) {
77+
int realIdx = i - 1;
78+
check(realIdx);
79+
boolean nullable = schema.isNullable(holeIdx.get(realIdx));
80+
if (!nullable) {
9981
return columnNoNulls;
10082
} else {
10183
return columnNullable;
@@ -122,9 +104,9 @@ public String getColumnLabel(int i) throws SQLException {
122104

123105
@Override
124106
public String getColumnName(int i) throws SQLException {
125-
check(i);
126-
Long index = idx.get(i - 1).getKey();
127-
return realSchema.GetColumnName(index);
107+
int realIdx = i - 1;
108+
check(realIdx);
109+
return schema.getColumnName(holeIdx.get(realIdx));
128110
}
129111

130112
@Override
@@ -159,9 +141,9 @@ public String getCatalogName(int i) throws SQLException {
159141

160142
@Override
161143
public int getColumnType(int i) throws SQLException {
162-
check(i);
163-
Long index = idx.get(i - 1).getKey();
164-
return Common.type2SqlType(realSchema.GetColumnType(index));
144+
int realIdx = i - 1;
145+
check(realIdx);
146+
return schema.getColumnType(holeIdx.get(realIdx));
165147
}
166148

167149
@Override

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/Common.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,12 @@ public static ProcedureInfo convertProcedureInfo(com._4paradigm.openmldb.Procedu
171171
spInfo.setDbName(procedureInfo.GetDbName());
172172
spInfo.setProName(procedureInfo.GetSpName());
173173
spInfo.setSql(procedureInfo.GetSql());
174-
spInfo.setInputSchema(convertSchema(procedureInfo.GetInputSchema()));
175-
spInfo.setOutputSchema(convertSchema(procedureInfo.GetOutputSchema()));
174+
com._4paradigm.openmldb.Schema inputSchema = procedureInfo.GetInputSchema();
175+
spInfo.setInputSchema(convertSchema(inputSchema));
176+
inputSchema.delete();
177+
com._4paradigm.openmldb.Schema outputSchema = procedureInfo.GetOutputSchema();
178+
spInfo.setOutputSchema(convertSchema(outputSchema));
179+
outputSchema.delete();
176180
spInfo.setMainTable(procedureInfo.GetMainTable());
177181
spInfo.setInputTables(procedureInfo.GetTables());
178182
spInfo.setInputDbs(procedureInfo.GetDbs());

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/QueryFuture.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ public java.sql.ResultSet get() throws InterruptedException, ExecutionException
7474
if (resultSet != null) {
7575
resultSet.delete();
7676
}
77+
queryFuture.delete();
78+
queryFuture = null;
7779
logger.error("call procedure failed: {}", msg);
7880
throw new ExecutionException(new SqlException("call procedure failed: " + msg));
7981
}

java/openmldb-jdbc/src/main/java/com/_4paradigm/openmldb/sdk/SqlExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@ public interface SqlExecutor {
4848
@Deprecated
4949
java.sql.ResultSet executeSQL(String db, String sql);
5050

51+
@Deprecated
5152
SQLInsertRow getInsertRow(String db, String sql);
5253

54+
@Deprecated
5355
SQLInsertRows getInsertRows(String db, String sql);
5456

57+
@Deprecated
5558
ResultSet executeSQLRequest(String db, String sql, SQLRequestRow row);
5659

5760
Statement getStatement();
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com._4paradigm.openmldb.sdk.impl;
2+
3+
import com._4paradigm.openmldb.common.zk.ZKClient;
4+
import com._4paradigm.openmldb.proto.NS;
5+
import com._4paradigm.openmldb.sdk.SqlException;
6+
import com.github.benmanes.caffeine.cache.Cache;
7+
import com.github.benmanes.caffeine.cache.Caffeine;
8+
import org.apache.curator.framework.recipes.cache.NodeCache;
9+
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
10+
11+
import java.util.*;
12+
import java.util.concurrent.TimeUnit;
13+
14+
public class InsertPreparedStatementCache {
15+
16+
private Cache<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta> cache;
17+
18+
private ZKClient zkClient;
19+
private NodeCache nodeCache;
20+
private String tablePath;
21+
22+
public InsertPreparedStatementCache(int cacheSize, ZKClient zkClient) throws SqlException {
23+
cache = Caffeine.newBuilder().maximumSize(cacheSize).build();
24+
this.zkClient = zkClient;
25+
if (zkClient != null) {
26+
tablePath = zkClient.getConfig().getNamespace() + "/table/db_table_data";
27+
nodeCache = new NodeCache(zkClient.getClient(), zkClient.getConfig().getNamespace() + "/table/notify");
28+
try {
29+
nodeCache.start();
30+
nodeCache.getListenable().addListener(new NodeCacheListener() {
31+
@Override
32+
public void nodeChanged() throws Exception {
33+
checkAndInvalid();
34+
}
35+
});
36+
} catch (Exception e) {
37+
throw new SqlException("NodeCache exception: " + e.getMessage());
38+
}
39+
}
40+
}
41+
42+
public InsertPreparedStatementMeta get(String db, String sql) {
43+
return cache.getIfPresent(new AbstractMap.SimpleImmutableEntry<>(db, sql));
44+
}
45+
46+
public void put(String db, String sql, InsertPreparedStatementMeta meta) {
47+
cache.put(new AbstractMap.SimpleImmutableEntry<>(db, sql), meta);
48+
}
49+
50+
public void checkAndInvalid() throws Exception {
51+
if (!zkClient.checkExists(tablePath)) {
52+
return;
53+
}
54+
List<String> children = zkClient.getChildren(tablePath);
55+
Map<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta> view = cache.asMap();
56+
Map<AbstractMap.SimpleImmutableEntry<String, String>, Integer> tableMap = new HashMap<>();
57+
for (String path : children) {
58+
byte[] bytes = zkClient.getClient().getData().forPath(tablePath + "/" + path);
59+
NS.TableInfo tableInfo = NS.TableInfo.parseFrom(bytes);
60+
tableMap.put(new AbstractMap.SimpleImmutableEntry<>(tableInfo.getDb(), tableInfo.getName()), tableInfo.getTid());
61+
}
62+
Iterator<Map.Entry<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta>> iterator
63+
= view.entrySet().iterator();
64+
while (iterator.hasNext()) {
65+
Map.Entry<AbstractMap.SimpleImmutableEntry<String, String>, InsertPreparedStatementMeta> entry = iterator.next();
66+
String db = entry.getKey().getKey();
67+
InsertPreparedStatementMeta meta = entry.getValue();
68+
String name = meta.getName();
69+
Integer tid = tableMap.get(new AbstractMap.SimpleImmutableEntry<>(db, name));
70+
if (tid != null && tid != meta.getTid()) {
71+
cache.invalidate(entry.getKey());
72+
}
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)