Skip to content

Commit 0ed5a0f

Browse files
committed
[Improve] jdbc-datastream-connector filterFunction improvements
1 parent dbc31b8 commit 0ed5a0f

File tree

13 files changed

+146
-149
lines changed

13 files changed

+146
-149
lines changed

streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
*/
1717
package org.apache.streampark.flink.quickstart.connector;
1818

19-
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
20-
import org.apache.streampark.flink.connector.function.SQLResultFunction;
19+
import org.apache.streampark.flink.connector.function.QueryFunction;
20+
import org.apache.streampark.flink.connector.function.ResultFunction;
2121
import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
2222
import org.apache.streampark.flink.core.StreamEnvConfig;
2323
import org.apache.streampark.flink.core.scala.StreamingContext;
@@ -40,7 +40,7 @@ public static void main(String[] args) {
4040
// 读取MySQL数据源
4141
new JdbcJavaSource<>(context, Order.class)
4242
.getDataStream(
43-
(SQLQueryFunction<Order>)
43+
(QueryFunction<Order>)
4444
lastOne -> {
4545
// 5秒抽取一次
4646
Thread.sleep(3000);
@@ -52,7 +52,7 @@ public static void main(String[] args) {
5252
+ "order by timestamp asc ",
5353
lastOffset);
5454
},
55-
(SQLResultFunction<Order>)
55+
(ResultFunction<Order>)
5656
map -> {
5757
List<Order> result = new ArrayList<>();
5858
map.forEach(

streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java renamed to streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@
2020
import java.io.Serializable;
2121

2222
@FunctionalInterface
23-
public interface RunningFunction extends Serializable {
23+
public interface FilterFunction<T> extends Serializable {
2424

25-
/**
26-
* Is it running...
27-
*
28-
* @return Boolean: isRunning
29-
*/
30-
Boolean running();
25+
/** filter function */
26+
Boolean filter(T t);
3127
}

streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java renamed to streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.io.Serializable;
2121

2222
@FunctionalInterface
23-
public interface SQLQueryFunction<T> extends Serializable {
23+
public interface QueryFunction<T> extends Serializable {
2424
/**
2525
* Get the SQL to query
2626
*

streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java renamed to streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.Map;
2222

2323
@FunctionalInterface
24-
public interface SQLResultFunction<T> extends Serializable {
24+
public interface ResultFunction<T> extends Serializable {
2525
/**
2626
* The result of the search is returned as a Map, and the user can convert it into an object.
2727
*

streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.streampark.flink.connector.hbase.source;
1919

2020
import org.apache.streampark.common.util.ConfigUtils;
21-
import org.apache.streampark.flink.connector.function.RunningFunction;
21+
import org.apache.streampark.flink.connector.function.FilterFunction;
2222
import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
2323
import org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
2424
import org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction;
@@ -64,7 +64,7 @@ public DataStreamSource<T> getDataStream(
6464
public DataStreamSource<T> getDataStream(
6565
HBaseQueryFunction<T> queryFunction,
6666
HBaseResultFunction<T> resultFunction,
67-
RunningFunction runningFunc) {
67+
FilterFunction<T> filterFunction) {
6868

6969
if (queryFunction == null) {
7070
throw new NullPointerException("HBaseJavaSource error: query function cannot be null");
@@ -79,7 +79,7 @@ public DataStreamSource<T> getDataStream(
7979

8080
HBaseSourceFunction<T> sourceFunction =
8181
new HBaseSourceFunction<>(
82-
property, queryFunction, resultFunction, runningFunc, typeInformation);
82+
property, queryFunction, resultFunction, filterFunction, typeInformation);
8383
return context.getJavaEnv().addSource(sourceFunction);
8484
}
8585
}

streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.hbase.internal
2020
import org.apache.streampark.common.enums.ApiType
2121
import org.apache.streampark.common.enums.ApiType.ApiType
2222
import org.apache.streampark.common.util.Logger
23-
import org.apache.streampark.flink.connector.function.RunningFunction
23+
import org.apache.streampark.flink.connector.function.FilterFunction
2424
import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
2525
import org.apache.streampark.flink.connector.hbase.function.{HBaseQueryFunction, HBaseResultFunction}
2626
import org.apache.streampark.flink.util.FlinkUtils
@@ -47,8 +47,12 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
4747
with Logger {
4848

4949
@volatile private[this] var running = true
50-
private[this] var scalaRunningFunc: Unit => Boolean = _
51-
private[this] var javaRunningFunc: RunningFunction = _
50+
private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
51+
private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
52+
53+
/** filter function */
54+
override def filter(t: R): lang.Boolean = true
55+
}
5256

5357
@transient private[this] var table: Table = _
5458

@@ -69,32 +73,29 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
6973
prop: Properties,
7074
queryFunc: R => HBaseQuery,
7175
resultFunc: Result => R,
72-
runningFunc: Unit => Boolean) = {
76+
filter: R => Boolean) = {
7377

7478
this(ApiType.scala, prop)
7579
this.scalaQueryFunc = queryFunc
7680
this.scalaResultFunc = resultFunc
77-
this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
78-
81+
if (filter != null) {
82+
this.scalaFilterFunc = filter
83+
}
7984
}
8085

8186
// for JAVA
8287
def this(
8388
prop: Properties,
8489
queryFunc: HBaseQueryFunction[R],
8590
resultFunc: HBaseResultFunction[R],
86-
runningFunc: RunningFunction) {
91+
filter: FilterFunction[R]) {
8792

8893
this(ApiType.java, prop)
8994
this.javaQueryFunc = queryFunc
9095
this.javaResultFunc = resultFunc
91-
this.javaRunningFunc =
92-
if (runningFunc != null) runningFunc
93-
else
94-
new RunningFunction {
95-
override def running(): lang.Boolean = true
96-
}
97-
96+
if (filter != null) {
97+
this.javaFilterFunc = filter
98+
}
9899
}
99100

100101
@throws[Exception]
@@ -106,40 +107,42 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
106107
while (this.running) {
107108
apiType match {
108109
case ApiType.scala =>
109-
if (scalaRunningFunc()) {
110-
ctx.getCheckpointLock.synchronized {
111-
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
112-
query = scalaQueryFunc(last)
113-
require(
114-
query != null && query.getTable != null,
115-
"[StreamPark] HBaseSource query and query's param table must not be null ")
116-
table = query.getTable(prop)
117-
table
118-
.getScanner(query)
119-
.foreach(
120-
x => {
121-
last = scalaResultFunc(x)
122-
ctx.collectWithTimestamp(last, System.currentTimeMillis())
123-
})
124-
}
110+
ctx.getCheckpointLock.synchronized {
111+
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
112+
query = scalaQueryFunc(last)
113+
require(
114+
query != null && query.getTable != null,
115+
"[StreamPark] HBaseSource query and query's param table must not be null ")
116+
table = query.getTable(prop)
117+
table
118+
.getScanner(query)
119+
.foreach(
120+
x => {
121+
val r = scalaResultFunc(x)
122+
if (scalaFilterFunc(r)) {
123+
last = r
124+
ctx.collectWithTimestamp(r, System.currentTimeMillis())
125+
}
126+
})
125127
}
126128
case ApiType.java =>
127-
if (javaRunningFunc.running()) {
128-
ctx.getCheckpointLock.synchronized {
129-
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
130-
query = javaQueryFunc.query(last)
131-
require(
132-
query != null && query.getTable != null,
133-
"[StreamPark] HBaseSource query and query's param table must not be null ")
134-
table = query.getTable(prop)
135-
table
136-
.getScanner(query)
137-
.foreach(
138-
x => {
139-
last = javaResultFunc.result(x)
140-
ctx.collectWithTimestamp(last, System.currentTimeMillis())
141-
})
142-
}
129+
ctx.getCheckpointLock.synchronized {
130+
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
131+
query = javaQueryFunc.query(last)
132+
require(
133+
query != null && query.getTable != null,
134+
"[StreamPark] HBaseSource query and query's param table must not be null ")
135+
table = query.getTable(prop)
136+
table
137+
.getScanner(query)
138+
.foreach(
139+
x => {
140+
val r = javaResultFunc.result(x)
141+
if (javaFilterFunc.filter(r)) {
142+
last = r
143+
ctx.collectWithTimestamp(r, System.currentTimeMillis())
144+
}
145+
})
143146
}
144147
}
145148
}

streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class HBaseSource(
5151
def getDataStream[R: TypeInformation](
5252
query: R => HBaseQuery,
5353
func: Result => R,
54-
running: Unit => Boolean): DataStream[R] = {
54+
running: R => Boolean): DataStream[R] = {
5555

5656
if (query == null) {
5757
throw new NullPointerException("getDataStream error, SQLQueryFunction must not be null")

streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
package org.apache.streampark.flink.connector.jdbc.source;
1919

2020
import org.apache.streampark.common.util.ConfigUtils;
21-
import org.apache.streampark.flink.connector.function.RunningFunction;
22-
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
23-
import org.apache.streampark.flink.connector.function.SQLResultFunction;
21+
import org.apache.streampark.flink.connector.function.FilterFunction;
22+
import org.apache.streampark.flink.connector.function.QueryFunction;
23+
import org.apache.streampark.flink.connector.function.ResultFunction;
2424
import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction;
2525
import org.apache.streampark.flink.core.scala.StreamingContext;
2626

@@ -59,30 +59,28 @@ public JdbcJavaSource<T> alias(String alias) {
5959
}
6060

6161
public DataStreamSource<T> getDataStream(
62-
SQLQueryFunction<T> queryFunction, SQLResultFunction<T> resultFunction) {
62+
QueryFunction<T> queryFunction, ResultFunction<T> resultFunction) {
6363
return getDataStream(queryFunction, resultFunction, null);
6464
}
6565

6666
public DataStreamSource<T> getDataStream(
67-
SQLQueryFunction<T> queryFunction,
68-
SQLResultFunction<T> resultFunction,
69-
RunningFunction runningFunc) {
67+
QueryFunction<T> queryFunction, ResultFunction<T> resultFunction, FilterFunction<T> filter) {
7068

7169
if (queryFunction == null) {
7270
throw new NullPointerException(
73-
"JdbcJavaSource getDataStream error: SQLQueryFunction must not be null");
71+
"JdbcJavaSource getDataStream error: QueryFunction must not be null");
7472
}
7573
if (resultFunction == null) {
7674
throw new NullPointerException(
77-
"JdbcJavaSource getDataStream error: SQLResultFunction must not be null");
75+
"JdbcJavaSource getDataStream error: ResultFunction must not be null");
7876
}
7977

8078
if (this.jdbc == null) {
8179
this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias);
8280
}
8381

8482
JdbcSourceFunction<T> sourceFunction =
85-
new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, runningFunc, typeInformation);
83+
new JdbcSourceFunction<T>(jdbc, queryFunction, resultFunction, filter, typeInformation);
8684
return context.getJavaEnv().addSource(sourceFunction);
8785
}
8886
}

0 commit comments

Comments
 (0)