Skip to content

Commit 415bae3

Browse files
authored
feat: add SQL Utils to manage dialects in the audit marker (#719)
1 parent 71764cb commit 415bae3

File tree

16 files changed

+290
-50
lines changed

16 files changed

+290
-50
lines changed

core/target-systems/sql-target-system/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import org.jetbrains.kotlin.gradle.utils.extendsFrom
33
dependencies {
44
//Flamingock
55
api(project(":core:flamingock-core"))
6+
implementation(project(":utils:sql-util"))
67

78
//Test
89
testImplementation("org.testcontainers:mysql:1.19.0")
@@ -24,4 +25,4 @@ java {
2425

2526
configurations.testImplementation {
2627
extendsFrom(configurations.compileOnly.get())
27-
}
28+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright 2025 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.targetsystem.sql;
17+
18+
import io.flamingock.internal.common.sql.AbstractSqlDialectHelper;
19+
import io.flamingock.internal.common.sql.SqlDialect;
20+
21+
import javax.sql.DataSource;
22+
23+
public final class SqlAuditMarkerDialectHelper extends AbstractSqlDialectHelper {
24+
25+
public SqlAuditMarkerDialectHelper(DataSource dataSource) {
26+
super(dataSource);
27+
}
28+
29+
public SqlAuditMarkerDialectHelper(SqlDialect dialect) {
30+
super(dialect);
31+
}
32+
33+
public String getListAllSqlString(String tableName) {
34+
return String.format("SELECT task_id, operation FROM %s", tableName);
35+
}
36+
37+
public String getClearMarkSqlString(String tableName) {
38+
return String.format("DELETE FROM %s WHERE task_id = ?", tableName);
39+
}
40+
41+
public String getMarkSqlString(String tableName) {
42+
switch (sqlDialect) {
43+
case MYSQL:
44+
case MARIADB:
45+
return String.format(
46+
"INSERT INTO %s (task_id, operation) VALUES (?, ?) ON DUPLICATE KEY UPDATE operation = VALUES(operation)",
47+
tableName);
48+
case POSTGRESQL:
49+
return String.format(
50+
"INSERT INTO %s (task_id, operation) VALUES (?, ?) ON CONFLICT (task_id) DO UPDATE SET operation = EXCLUDED.operation",
51+
tableName);
52+
case SQLITE:
53+
return String.format(
54+
"INSERT OR REPLACE INTO %s (task_id, operation) VALUES (?, ?)",
55+
tableName);
56+
case SQLSERVER:
57+
case SYBASE:
58+
return String.format(
59+
"MERGE INTO %s AS target USING (SELECT ? AS task_id, ? AS operation) AS source ON (target.task_id = source.task_id) " +
60+
"WHEN MATCHED THEN UPDATE SET operation = source.operation WHEN NOT MATCHED THEN INSERT (task_id, operation) VALUES (source.task_id, source.operation);",
61+
tableName);
62+
case ORACLE:
63+
return String.format(
64+
"MERGE INTO %s t USING (SELECT ? AS task_id, ? AS operation FROM dual) s ON (t.task_id = s.task_id) " +
65+
"WHEN MATCHED THEN UPDATE SET t.operation = s.operation WHEN NOT MATCHED THEN INSERT (task_id, operation) VALUES (s.task_id, s.operation)",
66+
tableName);
67+
case DB2:
68+
return String.format(
69+
"MERGE INTO %s USING (SELECT ? AS task_id, ? AS operation FROM SYSIBM.SYSDUMMY1) AS src ON (%s.task_id = src.task_id) " +
70+
"WHEN MATCHED THEN UPDATE SET operation = src.operation WHEN NOT MATCHED THEN INSERT (task_id, operation) VALUES (src.task_id, src.operation)",
71+
tableName, tableName);
72+
case FIREBIRD:
73+
return String.format(
74+
"UPDATE OR INSERT INTO %s (task_id, operation) VALUES (?, ?) MATCHING (task_id)",
75+
tableName);
76+
case H2:
77+
case HSQLDB:
78+
case DERBY:
79+
return String.format(
80+
"MERGE INTO %s (task_id, operation) KEY (task_id) VALUES (?, ?)",
81+
tableName);
82+
case INFORMIX:
83+
return String.format(
84+
"INSERT INTO %s (task_id, operation) VALUES (?, ?) ON DUPLICATE KEY UPDATE operation = ?",
85+
tableName);
86+
default:
87+
throw new UnsupportedOperationException("Dialect not supported for upsert: " + sqlDialect.name());
88+
}
89+
}
90+
91+
public String getCreateTableSqlString(String tableName) {
92+
switch (sqlDialect) {
93+
case MYSQL:
94+
case MARIADB:
95+
case POSTGRESQL:
96+
case SQLITE:
97+
case H2:
98+
case HSQLDB:
99+
case DERBY:
100+
case SQLSERVER:
101+
case SYBASE:
102+
case FIREBIRD:
103+
case INFORMIX:
104+
return String.format(
105+
"CREATE TABLE %s (task_id VARCHAR(255) PRIMARY KEY, operation VARCHAR(50) NOT NULL)",
106+
tableName);
107+
case ORACLE:
108+
return String.format(
109+
"CREATE TABLE %s (task_id VARCHAR2(255) PRIMARY KEY, operation VARCHAR2(50) NOT NULL)",
110+
tableName);
111+
case DB2:
112+
return String.format(
113+
"CREATE TABLE %s (task_id VARCHAR(255) NOT NULL PRIMARY KEY, operation VARCHAR(50) NOT NULL)",
114+
tableName);
115+
default:
116+
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
117+
}
118+
}
119+
}

core/target-systems/sql-target-system/src/main/java/io/flamingock/targetsystem/mysql/SqlTargetSystem.java renamed to core/target-systems/sql-target-system/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.flamingock.targetsystem.mysql;
16+
package io.flamingock.targetsystem.sql;
1717

1818
import io.flamingock.internal.common.core.context.ContextResolver;
1919
import io.flamingock.internal.common.core.error.FlamingockException;

core/target-systems/sql-target-system/src/main/java/io/flamingock/targetsystem/mysql/SqlTargetSystemAuditMarker.java renamed to core/target-systems/sql-target-system/src/main/java/io/flamingock/targetsystem/sql/SqlTargetSystemAuditMarker.java

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.flamingock.targetsystem.mysql;
16+
package io.flamingock.targetsystem.sql;
1717

1818
import io.flamingock.internal.common.core.error.FlamingockException;
19+
import io.flamingock.internal.common.sql.SqlDialect;
1920
import io.flamingock.internal.core.transaction.TransactionManager;
2021
import io.flamingock.internal.core.store.audit.domain.AuditContextBundle;
2122
import io.flamingock.internal.core.targets.mark.TargetSystemAuditMark;
@@ -37,22 +38,25 @@ public class SqlTargetSystemAuditMarker implements TargetSystemAuditMarker {
3738
private final String tableName;
3839
private final DataSource dataSource;
3940
private final TransactionManager<Connection> txManager;
41+
private final SqlAuditMarkerDialectHelper dialectHelper;
4042

4143
public static Builder builder(DataSource dataSource, TransactionManager<Connection> txManager) {
4244
return new Builder(dataSource, txManager);
4345
}
4446

45-
public SqlTargetSystemAuditMarker(DataSource dataSource,
47+
private SqlTargetSystemAuditMarker(DataSource dataSource,
4648
String tableName,
47-
TransactionManager<Connection> txManager) {
49+
TransactionManager<Connection> txManager,
50+
SqlAuditMarkerDialectHelper dialectHelper) {
4851
this.dataSource = dataSource;
4952
this.tableName = tableName;
5053
this.txManager = txManager;
54+
this.dialectHelper = dialectHelper;
5155
}
5256

5357
@Override
5458
public Set<TargetSystemAuditMark> listAll() {
55-
String sql = String.format("SELECT task_id, operation FROM %s", tableName);
59+
String sql = dialectHelper.getListAllSqlString(tableName);
5660

5761
try (Connection connection = dataSource.getConnection();
5862
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
@@ -73,7 +77,7 @@ public Set<TargetSystemAuditMark> listAll() {
7377

7478
@Override
7579
public void clearMark(String changeId) {
76-
String sql = String.format("DELETE FROM %s WHERE task_id = ?", tableName);
80+
String sql = dialectHelper.getClearMarkSqlString(tableName);
7781
try (Connection connection = dataSource.getConnection();
7882
PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
7983
preparedStatement.setString(1, changeId);
@@ -85,9 +89,7 @@ public void clearMark(String changeId) {
8589

8690
@Override
8791
public void mark(TargetSystemAuditMark auditMark) {
88-
String sql = String.format(
89-
"INSERT INTO %s (task_id, operation) VALUES (?, ?) " +
90-
"ON DUPLICATE KEY UPDATE operation = VALUES(operation)", tableName);
92+
String sql = dialectHelper.getMarkSqlString(tableName);
9193
Connection connection = txManager.getSessionOrThrow(auditMark.getTaskId());
9294
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
9395
preparedStatement.setString(1, auditMark.getTaskId());
@@ -104,12 +106,14 @@ public void mark(TargetSystemAuditMark auditMark) {
104106
public static class Builder {
105107
private final DataSource dataSource;
106108
private final TransactionManager<Connection> txManager;
109+
private SqlAuditMarkerDialectHelper dialectHelper;
107110
private String tableName = "FLAMINGOCK_ONGOING_TASKS";
108111
private boolean autoCreate = true;
109112

110113
public Builder(DataSource dataSource, TransactionManager<Connection> txManager) {
111114
this.dataSource = dataSource;
112115
this.txManager = txManager;
116+
this.dialectHelper = new SqlAuditMarkerDialectHelper(dataSource);
113117
}
114118

115119
public Builder withTableName(String tableName) {
@@ -122,11 +126,16 @@ public Builder withAutoCreate(boolean autoCreate) {
122126
return this;
123127
}
124128

129+
public Builder withSqlDialect(SqlDialect sqlDialect) {
130+
this.dialectHelper = new SqlAuditMarkerDialectHelper(sqlDialect);
131+
return this;
132+
}
133+
125134
public SqlTargetSystemAuditMarker build() {
126135
if (autoCreate) {
127136
createTableIfNotExists();
128137
}
129-
return new SqlTargetSystemAuditMarker(dataSource, tableName, txManager);
138+
return new SqlTargetSystemAuditMarker(dataSource, tableName, txManager, dialectHelper);
130139
}
131140

132141
private void createTableIfNotExists() {
@@ -136,16 +145,12 @@ private void createTableIfNotExists() {
136145
DatabaseMetaData meta = connection.getMetaData();
137146
ResultSet resultSet = meta.getTables(null, null, tableName, new String[]{"TABLE"});
138147
if (!resultSet.next()) {
139-
String createTableSql = String.format(
140-
"CREATE TABLE %s (" +
141-
"task_id VARCHAR(255) PRIMARY KEY, " +
142-
"operation VARCHAR(50) NOT NULL" +
143-
")", tableName);
148+
String createTableSql = dialectHelper.getCreateTableSqlString(tableName);
144149
statement.executeUpdate(createTableSql);
145150
}
146151
} catch (SQLException ex) {
147152
throw new FlamingockException(ex);
148153
}
149154
}
150155
}
151-
}
156+
}

core/target-systems/sql-target-system/src/main/java/io/flamingock/targetsystem/mysql/SqlTxWrapper.java renamed to core/target-systems/sql-target-system/src/main/java/io/flamingock/targetsystem/sql/SqlTxWrapper.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.flamingock.targetsystem.mysql;
16+
package io.flamingock.targetsystem.sql;
1717

1818
import io.flamingock.internal.common.core.context.Dependency;
1919
import io.flamingock.internal.common.core.error.DatabaseTransactionException;
@@ -42,21 +42,21 @@ public SqlTxWrapper(TransactionManager<Connection> txManager) {
4242
@Override
4343
public <T> T wrapInTransaction(ExecutionRuntime executionRuntime, Function<ExecutionRuntime, T> operation) {
4444
LocalDateTime transactionStart = LocalDateTime.now();
45-
45+
4646
try (Connection connection = txManager.startSession(executionRuntime.getSessionId())) {
4747
boolean originalAutoCommit = connection.getAutoCommit();
4848
String isolationLevel = getIsolationLevelName(connection.getTransactionIsolation());
4949
String connectionInfo = getConnectionInfo(connection);
50-
50+
5151
logger.debug("Starting SQL transaction [isolation={} connection={}]", isolationLevel, connectionInfo);
52-
52+
5353
try {
5454
connection.setAutoCommit(false);
5555
executionRuntime.addDependency(new Dependency(connection));
56-
56+
5757
T result = operation.apply(executionRuntime);
5858
Duration transactionDuration = Duration.between(transactionStart, LocalDateTime.now());
59-
59+
6060
if (result instanceof FailedStep) {
6161
logger.info("Rolling back transaction due to failed step [duration={}]", formatDuration(transactionDuration));
6262
connection.rollback();
@@ -67,24 +67,24 @@ public <T> T wrapInTransaction(ExecutionRuntime executionRuntime, Function<Execu
6767
logger.debug("Transaction commit completed successfully [duration={}]", formatDuration(transactionDuration));
6868
}
6969
return result;
70-
70+
7171
} catch (Exception e) {
7272
Duration failureDuration = Duration.between(transactionStart, LocalDateTime.now());
7373
logger.debug("Transaction failed, attempting rollback [duration={} error={}]",
7474
formatDuration(failureDuration), e.getMessage());
75-
75+
7676
DatabaseTransactionException.RollbackStatus rollbackStatus;
7777
try {
7878
connection.rollback();
7979
rollbackStatus = DatabaseTransactionException.RollbackStatus.SUCCESS;
80-
logger.info("Transaction rollback completed successfully after failure [duration={}]",
80+
logger.info("Transaction rollback completed successfully after failure [duration={}]",
8181
formatDuration(failureDuration));
8282
} catch (SQLException rollbackEx) {
8383
rollbackStatus = DatabaseTransactionException.RollbackStatus.FAILED;
8484
logger.debug("Transaction rollback failed [duration={} rollback_error={}]",
8585
formatDuration(failureDuration), rollbackEx.getMessage(), rollbackEx);
8686
}
87-
87+
8888
throw new DatabaseTransactionException(
8989
"SQL transaction failed during operation execution",
9090
DatabaseTransactionException.TransactionState.FAILED,
@@ -96,7 +96,7 @@ public <T> T wrapInTransaction(ExecutionRuntime executionRuntime, Function<Execu
9696
connectionInfo,
9797
e
9898
);
99-
99+
100100
} finally {
101101
try {
102102
connection.setAutoCommit(originalAutoCommit);
@@ -121,7 +121,7 @@ public <T> T wrapInTransaction(ExecutionRuntime executionRuntime, Function<Execu
121121
);
122122
}
123123
}
124-
124+
125125
private String getIsolationLevelName(int isolationLevel) {
126126
switch (isolationLevel) {
127127
case Connection.TRANSACTION_READ_UNCOMMITTED: return "READ_UNCOMMITTED";
@@ -131,16 +131,16 @@ private String getIsolationLevelName(int isolationLevel) {
131131
default: return "UNKNOWN(" + isolationLevel + ")";
132132
}
133133
}
134-
134+
135135
private String getConnectionInfo(Connection connection) {
136136
try {
137-
return String.format("%s@%s", connection.getMetaData().getUserName(),
137+
return String.format("%s@%s", connection.getMetaData().getUserName(),
138138
connection.getMetaData().getURL());
139139
} catch (SQLException e) {
140140
return "connection_info_unavailable";
141141
}
142142
}
143-
143+
144144
private String formatDuration(Duration duration) {
145145
long millis = duration.toMillis();
146146
if (millis < 1000) {
@@ -152,4 +152,4 @@ private String formatDuration(Duration duration) {
152152
}
153153
}
154154

155-
}
155+
}

core/target-systems/sql-target-system/src/test/java/io/flamingock/targetsystem/mysql/MySQLTestHelper.java renamed to core/target-systems/sql-target-system/src/test/java/io/flamingock/targetsystem/sql/MySQLTestHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.flamingock.targetsystem.mysql;
16+
package io.flamingock.targetsystem.sql;
1717

1818
import org.junit.jupiter.api.Assertions;
1919

@@ -117,4 +117,4 @@ private void createOngoingTasksTableIfNotExists(Connection connection) throws SQ
117117
}
118118
}
119119
}
120-
}
120+
}

core/target-systems/sql-target-system/src/test/java/io/flamingock/targetsystem/mysql/PipelineTestHelper.java renamed to core/target-systems/sql-target-system/src/test/java/io/flamingock/targetsystem/sql/PipelineTestHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package io.flamingock.targetsystem.mysql;
16+
package io.flamingock.targetsystem.sql;
1717

1818
import io.flamingock.api.StageType;
1919
import io.flamingock.api.annotations.TargetSystem;

0 commit comments

Comments
 (0)