Skip to content

Commit cd8c551

Browse files
committed
kudu add predicate info
1 parent 1d73263 commit cd8c551

File tree

7 files changed

+250
-156
lines changed

7 files changed

+250
-156
lines changed

docs/kuduSide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ create table sideTable(
4747
```
4848

4949
## 2.支持版本
50-
kudu 1.9.0+cdh6.2.0
50+
kudu 1.10.0+cdh6.2.0
5151

5252
## 3.表结构定义
5353

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java

Lines changed: 30 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
import com.dtstack.flink.sql.side.AllReqRow;
44
import com.dtstack.flink.sql.side.FieldInfo;
55
import com.dtstack.flink.sql.side.JoinInfo;
6+
import com.dtstack.flink.sql.side.PredicateInfo;
67
import com.dtstack.flink.sql.side.SideTableInfo;
78
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
9+
import com.dtstack.flink.sql.side.kudu.utils.KuduUtil;
810
import org.apache.calcite.sql.JoinType;
911
import org.apache.commons.collections.CollectionUtils;
1012
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -16,14 +18,24 @@
1618
import org.apache.flink.util.Preconditions;
1719
import org.apache.kudu.ColumnSchema;
1820
import org.apache.kudu.Schema;
19-
import org.apache.kudu.Type;
20-
import org.apache.kudu.client.*;
21+
import org.apache.kudu.client.KuduClient;
22+
import org.apache.kudu.client.KuduException;
23+
import org.apache.kudu.client.KuduPredicate;
24+
import org.apache.kudu.client.KuduScanner;
25+
import org.apache.kudu.client.KuduTable;
26+
import org.apache.kudu.client.PartialRow;
27+
import org.apache.kudu.client.RowResult;
28+
import org.apache.kudu.client.RowResultIterator;
2129
import org.slf4j.Logger;
2230
import org.slf4j.LoggerFactory;
2331

2432
import java.sql.SQLException;
2533
import java.sql.Timestamp;
26-
import java.util.*;
34+
import java.util.Arrays;
35+
import java.util.Calendar;
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
2739
import java.util.concurrent.atomic.AtomicReference;
2840

2941
public class KuduAllReqRow extends AllReqRow {
@@ -158,7 +170,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) {
158170
String sideFieldName = sideFieldName1.trim();
159171
ColumnSchema columnSchema = table.getSchema().getColumn(sideFieldName);
160172
if (null != columnSchema) {
161-
setMapValue(columnSchema.getType(), oneRow, sideFieldName, result);
173+
KuduUtil.setMapValue(columnSchema.getType(), oneRow, sideFieldName, result);
162174
}
163175
}
164176
String cacheKey = buildKey(oneRow, sideInfo.getEqualFieldList());
@@ -244,8 +256,7 @@ private KuduScanner getConn(KuduSideTableInfo tableInfo) {
244256
* @param tableInfo AsyncKuduScanner的配置信息
245257
* @return
246258
*/
247-
private KuduScanner buildScanner(KuduScanner.KuduScannerBuilder builder, Schema schema, KuduSideTableInfo
248-
tableInfo) {
259+
private KuduScanner buildScanner(KuduScanner.KuduScannerBuilder builder, Schema schema, KuduSideTableInfo tableInfo) {
249260
Integer batchSizeBytes = tableInfo.getBatchSizeBytes();
250261
Long limitNum = tableInfo.getLimitNum();
251262
Boolean isFaultTolerant = tableInfo.getFaultTolerant();
@@ -268,6 +279,17 @@ private KuduScanner buildScanner(KuduScanner.KuduScannerBuilder builder, Schema
268279
if (null != isFaultTolerant) {
269280
builder.setFaultTolerant(isFaultTolerant);
270281
}
282+
// 填充谓词信息
283+
List<PredicateInfo> predicateInfoes = sideInfo.getSideTableInfo().getPredicateInfoes();
284+
if (predicateInfoes.size() > 0) {
285+
predicateInfoes.stream().map(info -> {
286+
KuduPredicate kuduPredicate = KuduUtil.buildKuduPredicate(schema, info);
287+
if (null != kuduPredicate) {
288+
builder.addPredicate(kuduPredicate);
289+
}
290+
return info;
291+
}).count();
292+
}
271293

272294
if (null != lowerBoundPrimaryKey && null != upperBoundPrimaryKey && null != primaryKeys) {
273295
List<ColumnSchema> columnSchemas = schema.getPrimaryKeyColumns();
@@ -282,8 +304,8 @@ private KuduScanner buildScanner(KuduScanner.KuduScannerBuilder builder, Schema
282304
PartialRow upperPartialRow = schema.newPartialRow();
283305
for (int i = 0; i < primaryKey.length; i++) {
284306
Integer index = columnName.get(primaryKey[i]);
285-
primaryKeyRange(lowerPartialRow, columnSchemas.get(index).getType(), primaryKey[i], lowerBounds[i]);
286-
primaryKeyRange(upperPartialRow, columnSchemas.get(index).getType(), primaryKey[i], upperBounds[i]);
307+
KuduUtil.primaryKeyRange(lowerPartialRow, columnSchemas.get(index).getType(), primaryKey[i], lowerBounds[i]);
308+
KuduUtil.primaryKeyRange(upperPartialRow, columnSchemas.get(index).getType(), primaryKey[i], upperBounds[i]);
287309
}
288310
builder.lowerBound(lowerPartialRow);
289311
builder.exclusiveUpperBound(upperPartialRow);
@@ -296,80 +318,6 @@ private String[] splitString(String data) {
296318
return data.split(",");
297319
}
298320

299-
private void primaryKeyRange(PartialRow partialRow, Type type, String primaryKey, String value) {
300-
switch (type) {
301-
case STRING:
302-
partialRow.addString(primaryKey, value);
303-
break;
304-
case FLOAT:
305-
partialRow.addFloat(primaryKey, Float.valueOf(value));
306-
break;
307-
case INT8:
308-
partialRow.addByte(primaryKey, Byte.valueOf(value));
309-
break;
310-
case INT16:
311-
partialRow.addShort(primaryKey, Short.valueOf(value));
312-
break;
313-
case INT32:
314-
partialRow.addInt(primaryKey, Integer.valueOf(value));
315-
break;
316-
case INT64:
317-
partialRow.addLong(primaryKey, Long.valueOf(value));
318-
break;
319-
case DOUBLE:
320-
partialRow.addDouble(primaryKey, Double.valueOf(value));
321-
break;
322-
case BOOL:
323-
partialRow.addBoolean(primaryKey, Boolean.valueOf(value));
324-
break;
325-
case UNIXTIME_MICROS:
326-
partialRow.addTimestamp(primaryKey, Timestamp.valueOf(value));
327-
break;
328-
case BINARY:
329-
partialRow.addBinary(primaryKey, value.getBytes());
330-
break;
331-
default:
332-
throw new IllegalArgumentException("Illegal var type: " + type);
333-
}
334-
}
335-
336-
private void setMapValue(Type type, Map<String, Object> oneRow, String sideFieldName, RowResult result) {
337-
switch (type) {
338-
case STRING:
339-
oneRow.put(sideFieldName, result.getString(sideFieldName));
340-
break;
341-
case FLOAT:
342-
oneRow.put(sideFieldName, result.getFloat(sideFieldName));
343-
break;
344-
case INT8:
345-
oneRow.put(sideFieldName, result.getFloat(sideFieldName));
346-
break;
347-
case INT16:
348-
oneRow.put(sideFieldName, result.getShort(sideFieldName));
349-
break;
350-
case INT32:
351-
oneRow.put(sideFieldName, result.getInt(sideFieldName));
352-
break;
353-
case INT64:
354-
oneRow.put(sideFieldName, result.getLong(sideFieldName));
355-
break;
356-
case DOUBLE:
357-
oneRow.put(sideFieldName, result.getDouble(sideFieldName));
358-
break;
359-
case BOOL:
360-
oneRow.put(sideFieldName, result.getBoolean(sideFieldName));
361-
break;
362-
case UNIXTIME_MICROS:
363-
oneRow.put(sideFieldName, result.getTimestamp(sideFieldName));
364-
break;
365-
case BINARY:
366-
oneRow.put(sideFieldName, result.getBinary(sideFieldName));
367-
break;
368-
default:
369-
throw new IllegalArgumentException("Illegal var type: " + type);
370-
}
371-
}
372-
373321
@Override
374322
public void close() throws Exception {
375323
//公用一个client 如果每次刷新间隔时间较长可以每次获取一个

kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllSideInfo.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
import com.dtstack.flink.sql.side.JoinInfo;
55
import com.dtstack.flink.sql.side.SideInfo;
66
import com.dtstack.flink.sql.side.SideTableInfo;
7-
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
87
import com.dtstack.flink.sql.util.ParseUtils;
9-
import org.apache.calcite.sql.SqlBasicCall;
10-
import org.apache.calcite.sql.SqlKind;
118
import org.apache.calcite.sql.SqlNode;
129
import org.apache.commons.collections.CollectionUtils;
1310
import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -25,11 +22,7 @@ public KuduAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInf
2522

2623
@Override
2724
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
28-
KuduSideTableInfo kuduSideTableInfo = (KuduSideTableInfo) sideTableInfo;
29-
30-
sqlCondition = "select ${selectField} from ${tableName} ";
31-
sqlCondition = sqlCondition.replace("${tableName}", kuduSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
32-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
25+
// no use
3326
}
3427

3528
@Override

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.dtstack.flink.sql.side.*;
55
import com.dtstack.flink.sql.side.cache.CacheObj;
66
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
7+
import com.dtstack.flink.sql.side.kudu.utils.KuduUtil;
78
import com.stumbleupon.async.Callback;
89
import com.stumbleupon.async.Deferred;
910
import io.vertx.core.json.JsonArray;
@@ -17,7 +18,6 @@
1718
import org.apache.flink.util.Preconditions;
1819
import org.apache.kudu.ColumnSchema;
1920
import org.apache.kudu.Schema;
20-
import org.apache.kudu.Type;
2121
import org.apache.kudu.client.*;
2222
import org.slf4j.Logger;
2323
import org.slf4j.LoggerFactory;
@@ -126,25 +126,37 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
126126
connKuDu();
127127
JsonArray inputParams = new JsonArray();
128128
Schema schema = table.getSchema();
129-
130-
for (Integer conValIndex : sideInfo.getEqualValIndex()) {
131-
Object equalObj = input.getField(conValIndex);
129+
// @wenbaoup fix bug
130+
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
131+
Object equalObj = input.getField(sideInfo.getEqualValIndex().get(i));
132132
if (equalObj == null) {
133-
resultFuture.complete(null);
133+
dealMissKey(input, resultFuture);
134134
return;
135135
}
136136
//增加过滤条件
137-
scannerBuilder.addPredicate(KuduPredicate.newInListPredicate(schema.getColumn(sideInfo.getEqualFieldList().get(conValIndex)), Collections.singletonList(equalObj)));
137+
scannerBuilder.addPredicate(KuduPredicate.newInListPredicate(schema.getColumn(sideInfo.getEqualFieldList().get(i)), Collections.singletonList(equalObj)));
138138
inputParams.add(equalObj);
139139
}
140140

141+
// 填充谓词信息
142+
List<PredicateInfo> predicateInfoes = sideInfo.getSideTableInfo().getPredicateInfoes();
143+
if (predicateInfoes.size() > 0) {
144+
predicateInfoes.stream().map(info -> {
145+
KuduPredicate kuduPredicate = KuduUtil.buildKuduPredicate(schema, info);
146+
if (null != kuduPredicate) {
147+
scannerBuilder.addPredicate(kuduPredicate);
148+
}
149+
return info;
150+
}).count();
151+
}
152+
153+
141154
String key = buildCacheKey(inputParams);
142155

143156
if (openCache()) {
144157
//判断数据是否已经加载到缓存中
145158
CacheObj val = getFromCache(key);
146159
if (val != null) {
147-
148160
if (ECacheContentType.MissVal == val.getType()) {
149161
dealMissKey(input, resultFuture);
150162
return;
@@ -221,43 +233,6 @@ public void close() throws Exception {
221233
}
222234
}
223235

224-
private void setMapValue(Type type, Map<String, Object> oneRow, String sideFieldName, RowResult result) {
225-
switch (type) {
226-
case STRING:
227-
oneRow.put(sideFieldName, result.getString(sideFieldName));
228-
break;
229-
case FLOAT:
230-
oneRow.put(sideFieldName, result.getFloat(sideFieldName));
231-
break;
232-
case INT8:
233-
oneRow.put(sideFieldName, result.getFloat(sideFieldName));
234-
break;
235-
case INT16:
236-
oneRow.put(sideFieldName, result.getShort(sideFieldName));
237-
break;
238-
case INT32:
239-
oneRow.put(sideFieldName, result.getInt(sideFieldName));
240-
break;
241-
case INT64:
242-
oneRow.put(sideFieldName, result.getLong(sideFieldName));
243-
break;
244-
case DOUBLE:
245-
oneRow.put(sideFieldName, result.getDouble(sideFieldName));
246-
break;
247-
case BOOL:
248-
oneRow.put(sideFieldName, result.getBoolean(sideFieldName));
249-
break;
250-
case UNIXTIME_MICROS:
251-
oneRow.put(sideFieldName, result.getTimestamp(sideFieldName));
252-
break;
253-
case BINARY:
254-
oneRow.put(sideFieldName, result.getBinary(sideFieldName));
255-
break;
256-
default:
257-
throw new IllegalArgumentException("Illegal var type: " + type);
258-
}
259-
}
260-
261236
class GetListRowCB implements Callback<Deferred<List<Row>>, RowResultIterator> {
262237
private Row input;
263238
private List<Map<String, Object>> cacheContent;
@@ -287,7 +262,7 @@ public Deferred<List<Row>> call(RowResultIterator results) throws Exception {
287262
String sideFieldName = sideFieldName1.trim();
288263
ColumnSchema columnSchema = table.getSchema().getColumn(sideFieldName);
289264
if (null != columnSchema) {
290-
setMapValue(columnSchema.getType(), oneRow, sideFieldName, result);
265+
KuduUtil.setMapValue(columnSchema.getType(), oneRow, sideFieldName, result);
291266
}
292267
}
293268
Row row = fillData(input, oneRow);

kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncSideInfo.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@
44
import com.dtstack.flink.sql.side.JoinInfo;
55
import com.dtstack.flink.sql.side.SideInfo;
66
import com.dtstack.flink.sql.side.SideTableInfo;
7-
import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo;
8-
import com.dtstack.flink.sql.util.ParseUtils;
97
import org.apache.calcite.sql.SqlBasicCall;
108
import org.apache.calcite.sql.SqlIdentifier;
119
import org.apache.calcite.sql.SqlKind;
1210
import org.apache.calcite.sql.SqlNode;
1311
import org.apache.flink.api.java.typeutils.RowTypeInfo;
14-
import com.google.common.collect.Lists;
1512

1613
import java.util.List;
1714

@@ -24,22 +21,6 @@ public KuduAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldI
2421

2522
@Override
2623
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
27-
KuduSideTableInfo kuduSideTableInfo = (KuduSideTableInfo) sideTableInfo;
28-
29-
String sideTableName = joinInfo.getSideTableName();
30-
31-
SqlNode conditionNode = joinInfo.getCondition();
32-
33-
List<SqlNode> sqlNodeList = Lists.newArrayList();
34-
ParseUtils.parseAnd(conditionNode, sqlNodeList);
35-
36-
for (SqlNode sqlNode : sqlNodeList) {
37-
dealOneEqualCon(sqlNode, sideTableName);
38-
}
39-
40-
sqlCondition = "select ${selectField} from ${tableName} ";
41-
sqlCondition = sqlCondition.replace("${tableName}", kuduSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
42-
System.out.println("---------side_exe_sql-----\n" + sqlCondition);
4324
}
4425

4526
@Override

0 commit comments

Comments
 (0)