Skip to content

Commit 20189b4

Browse files
authored
Merge pull request #1 from codingapi/dev
add Listener order
2 parents 7f747ec + f8ce2e5 commit 20189b4

File tree

12 files changed

+94
-10
lines changed

12 files changed

+94
-10
lines changed

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ import com.codingapi.dbstream.interceptor.SQLExecuteState;
106106
import com.codingapi.dbstream.listener.SQLExecuteListener;
107107

108108
public class MySQLListener implements SQLExecuteListener {
109+
110+
@Override
111+
public int order() {
112+
return 0;
113+
}
114+
109115
@Override
110116
public void before(SQLExecuteState executeState) {
111117
System.out.println("执行前 - SQL: " + executeState.getSql());
@@ -259,8 +265,9 @@ mvn clean test -P travis
259265
- 手动事务模式下,事件会在 `commit()` 时批量推送
260266
- 事务回滚时,相关事件会被丢弃
261267

262-
3. **数据表限制**
263-
- 执行数据拦截事件的分析,要求表必须存在主键的定义
268+
3. **使用场景限制**
269+
- 数据库表必须有主键的定义,在DELETE事件需要明确主键信息,主键物理表不存在时可通过外部key文件配置的方式添加。
270+
- 若INSERT INTO SELECT 语句中,采用主键自增模式,受限于JDBC的支持将无法解析到自增ID,建议修改单条保存或修改ID为手动传递。
264271

265272
4. **元数据缓存**
266273
- 数据库元数据会在首次连接时自动扫描并缓存

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<modelVersion>4.0.0</modelVersion>
55
<groupId>com.codingapi.dbstream</groupId>
66
<artifactId>dbstream-driver</artifactId>
7-
<version>1.0.6</version>
7+
<version>1.0.7</version>
88

99
<url>https://github.com/codingapi/dbstream-driver</url>
1010
<name>dbstream-driver</name>

src/main/java/com/codingapi/dbstream/interceptor/SQLRunningContext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
import lombok.Getter;
55

66
import java.sql.SQLException;
7+
import java.util.Comparator;
78
import java.util.List;
89
import java.util.concurrent.CopyOnWriteArrayList;
910

10-
public class SQLRunningContext implements SQLExecuteListener {
11+
public class SQLRunningContext {
1112

1213
@Getter
1314
private final static SQLRunningContext instance = new SQLRunningContext();
@@ -23,10 +24,11 @@ public void addListener(SQLExecuteListener listener) {
2324
if (listener != null) {
2425
listeners.add(listener);
2526
}
27+
28+
listeners.sort(Comparator.comparingInt(SQLExecuteListener::order));
2629
}
2730

2831

29-
@Override
3032
public void after(SQLExecuteState executeState, Object result) throws SQLException {
3133
executeState.setResult(result);
3234
executeState.after();
@@ -36,7 +38,6 @@ public void after(SQLExecuteState executeState, Object result) throws SQLExcepti
3638
}
3739

3840

39-
@Override
4041
public void before(SQLExecuteState executeState) throws SQLException {
4142
executeState.begin();
4243
for (SQLExecuteListener listener : listeners) {

src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public class SQLDeleteExecuteListener implements SQLExecuteListener {
1616

1717
private final static ThreadLocal<DeleteDBEventParser> threadLocal = new ThreadLocal<>();
1818

19+
@Override
20+
public int order() {
21+
return 100;
22+
}
23+
1924
@Override
2025
public void before(SQLExecuteState executeState) throws SQLException {
2126
String sql = executeState.getSql();

src/main/java/com/codingapi/dbstream/listener/SQLExecuteListener.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66

77
public interface SQLExecuteListener {
88

9+
/**
10+
* 执行顺序,越小越靠前
11+
* @return index
12+
*/
13+
int order();
14+
915
/**
1016
* before sql execute
1117
*

src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public class SQLInsertExecuteListener implements SQLExecuteListener {
1616

1717
private final static ThreadLocal<InsertDBEventParser> threadLocal = new ThreadLocal<>();
1818

19+
@Override
20+
public int order() {
21+
return 100;
22+
}
23+
1924
@Override
2025
public void before(SQLExecuteState executeState) throws SQLException {
2126
String sql = executeState.getSql();

src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public class SQLUpdateExecuteListener implements SQLExecuteListener {
1616

1717
private final static ThreadLocal<UpdateDBEventParser> threadLocal = new ThreadLocal<>();
1818

19+
@Override
20+
public int order() {
21+
return 100;
22+
}
23+
1924
@Override
2025
public void before(SQLExecuteState executeState) throws SQLException {
2126
String sql = executeState.getSql();

src/main/java/com/codingapi/dbstream/stream/DBEvent.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ void setTransactionKey(String transactionKey) {
8181
}
8282

8383
public void addPrimaryKey(String primaryKey) {
84-
this.primaryKeys.add(primaryKey);
84+
if(!this.primaryKeys.contains(primaryKey)) {
85+
this.primaryKeys.add(primaryKey);
86+
}
87+
}
88+
89+
90+
public boolean hasPrimaryKeys(){
91+
return this.primaryKeys!=null && !this.primaryKeys.isEmpty();
8592
}
8693

8794
}

src/main/java/com/codingapi/dbstream/stream/DefaultDBEventPusher.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ public class DefaultDBEventPusher implements DBEventPusher {
66

77
@Override
88
public void push(List<DBEvent> events) {
9-
System.out.println(events);
9+
System.out.println("<=== DBStream DBEvent Total "+events.size()+" ===> ");
10+
for(DBEvent event:events){
11+
System.out.println(event);
12+
}
1013
}
1114
}

src/test/java/com/example/dbstream/listener/MySQLListener.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
@Slf4j
1010
public class MySQLListener implements SQLExecuteListener {
1111

12+
@Override
13+
public int order() {
14+
return 0;
15+
}
16+
1217
@Override
1318
public void after(SQLExecuteState executeState, Object result) throws SQLException {
1419
log.info("after sql:{},params:{},execute timestamp:{}", executeState.getSql(), executeState.getListParams(), executeState.getExecuteTimestamp());

0 commit comments

Comments
 (0)