Skip to content

Commit a841a97

Browse files
committed
feat: add sql dialect helpers in sql audit store
1 parent ac2780a commit a841a97

File tree

6 files changed

+393
-30
lines changed

6 files changed

+393
-30
lines changed

community/flamingock-auditstore-sql/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
dependencies {
22
api(project(":core:flamingock-core"))
33
api(project(":core:target-systems:sql-target-system"))
4+
implementation(project(":utils:sql-util"))
45

56
testImplementation("mysql:mysql-connector-java:8.0.33")
67
testImplementation(project(":utils:test-util"))

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlAuditor.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.flamingock.internal.common.core.audit.AuditEntry;
1919
import io.flamingock.internal.common.core.audit.AuditReader;
20+
import io.flamingock.internal.common.core.audit.AuditTxType;
2021
import io.flamingock.internal.core.store.audit.LifecycleAuditWriter;
2122
import io.flamingock.internal.util.Result;
2223

@@ -30,27 +31,20 @@ public class SqlAuditor implements LifecycleAuditWriter, AuditReader {
3031
private final DataSource dataSource;
3132
private final String auditTableName;
3233
private final boolean autoCreate;
34+
private final SqlAuditorDialectHelper dialectHelper;
3335

3436
public SqlAuditor(DataSource dataSource, String auditTableName, boolean autoCreate) {
3537
this.dataSource = dataSource;
3638
this.auditTableName = auditTableName;
3739
this.autoCreate = autoCreate;
40+
this.dialectHelper = new SqlAuditorDialectHelper(dataSource);
3841
}
3942

4043
public void initialize() {
4144
if (autoCreate) {
4245
try (Connection conn = dataSource.getConnection();
4346
Statement stmt = conn.createStatement()) {
44-
stmt.executeUpdate(
45-
"CREATE TABLE IF NOT EXISTS " + auditTableName + " (" +
46-
"id BIGINT AUTO_INCREMENT PRIMARY KEY," +
47-
"execution_id VARCHAR(255)," +
48-
"author VARCHAR(255)," +
49-
"task_id VARCHAR(255)," +
50-
"state VARCHAR(255)," +
51-
"created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP" +
52-
")"
53-
);
47+
stmt.executeUpdate(dialectHelper.getCreateTableSqlString(auditTableName));
5448
} catch (SQLException e) {
5549
throw new RuntimeException("Failed to initialize audit table", e);
5650
}
@@ -61,12 +55,26 @@ public void initialize() {
6155
public Result writeEntry(AuditEntry auditEntry) {
6256
try (Connection conn = dataSource.getConnection();
6357
PreparedStatement ps = conn.prepareStatement(
64-
"INSERT INTO " + auditTableName + " (execution_id, author, task_id, state, created_at) VALUES (?, ?, ?, ?, ?)")) {
58+
dialectHelper.getInsertSqlString(auditTableName))) {
6559
ps.setString(1, auditEntry.getExecutionId());
66-
ps.setString(2, auditEntry.getAuthor());
60+
ps.setString(2, auditEntry.getStageId());
6761
ps.setString(3, auditEntry.getTaskId());
68-
ps.setString(4, auditEntry.getState().name());
62+
ps.setString(4, auditEntry.getAuthor());
6963
ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt()));
64+
ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null);
65+
ps.setString(7, auditEntry.getClassName());
66+
ps.setString(8, auditEntry.getMethodName());
67+
ps.setString(9, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null);
68+
ps.setLong(10, auditEntry.getExecutionMillis());
69+
ps.setString(11, auditEntry.getExecutionHostname());
70+
ps.setString(12, auditEntry.getErrorTrace());
71+
ps.setString(13, auditEntry.getType() != null ? auditEntry.getType().name() : null);
72+
ps.setString(14, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null);
73+
ps.setString(15, auditEntry.getTargetSystemId());
74+
ps.setString(16, auditEntry.getOrder());
75+
ps.setString(17, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null);
76+
ps.setObject(18, auditEntry.getTransactionFlag());
77+
ps.setObject(19, auditEntry.getSystemChange());
7078
ps.executeUpdate();
7179
return Result.OK();
7280
} catch (SQLException e) {
@@ -79,16 +87,28 @@ public List<AuditEntry> getAuditHistory() {
7987
List<AuditEntry> entries = new ArrayList<>();
8088
try (Connection conn = dataSource.getConnection();
8189
Statement stmt = conn.createStatement();
82-
ResultSet rs = stmt.executeQuery("SELECT execution_id, author, task_id, state, created_at FROM " + auditTableName + " ORDER BY created_at DESC")) {
90+
ResultSet rs = stmt.executeQuery(dialectHelper.getSelectHistorySqlString(auditTableName))) {
8391
while (rs.next()) {
8492
AuditEntry entry = new AuditEntry(
8593
rs.getString("execution_id"),
86-
null,
94+
rs.getString("stage_id"),
8795
rs.getString("task_id"),
8896
rs.getString("author"),
8997
rs.getTimestamp("created_at").toLocalDateTime(),
90-
AuditEntry.Status.valueOf(rs.getString("state")),
91-
null, null, null, 0L, null, null, false, null, null
98+
rs.getString("state") != null ? AuditEntry.Status.valueOf(rs.getString("state")) : null,
99+
rs.getString("type") != null ? AuditEntry.ExecutionType.valueOf(rs.getString("type")) : null,
100+
rs.getString("class_name"),
101+
rs.getString("method_name"),
102+
rs.getLong("execution_millis"),
103+
rs.getString("execution_hostname"),
104+
rs.getString("metadata"),
105+
rs.getBoolean("system_change"),
106+
rs.getString("error_trace"),
107+
AuditTxType.fromString(rs.getString("tx_type")),
108+
rs.getString("target_system_id"),
109+
rs.getString("order_col"),
110+
rs.getString("recovery_strategy") != null ? io.flamingock.api.RecoveryStrategy.valueOf(rs.getString("recovery_strategy")) : null,
111+
rs.getObject("transaction_flag") != null ? rs.getBoolean("transaction_flag") : null
92112
);
93113
entries.add(entry);
94114
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Copyright 2025 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.community.sql.internal;
17+
18+
import io.flamingock.internal.common.sql.AbstractSqlDialectHelper;
19+
import io.flamingock.internal.common.sql.SqlDialect;
20+
21+
import javax.sql.DataSource;
22+
23+
public final class SqlAuditorDialectHelper extends AbstractSqlDialectHelper {
24+
25+
public SqlAuditorDialectHelper(DataSource dataSource) {
26+
super(dataSource);
27+
}
28+
29+
public SqlAuditorDialectHelper(SqlDialect dialect) {
30+
super(dialect);
31+
}
32+
33+
private static final String COMMON_COLUMNS =
34+
"execution_id VARCHAR(255), " +
35+
"stage_id VARCHAR(255), " +
36+
"task_id VARCHAR(255), " +
37+
"author VARCHAR(255), " +
38+
"created_at %s, " +
39+
"state VARCHAR(255), " +
40+
"class_name VARCHAR(255), " +
41+
"method_name VARCHAR(255), " +
42+
"metadata %s, " +
43+
"execution_millis BIGINT, " +
44+
"execution_hostname VARCHAR(255), " +
45+
"error_trace %s, " +
46+
"type VARCHAR(50), " +
47+
"tx_type VARCHAR(50), " +
48+
"target_system_id VARCHAR(255), " +
49+
"order_col VARCHAR(50), " +
50+
"recovery_strategy VARCHAR(50), " +
51+
"transaction_flag %s, " +
52+
"system_change %s";
53+
54+
private String getCreatedAtType() {
55+
switch (sqlDialect) {
56+
case SQLSERVER:
57+
case SYBASE:
58+
return "DATETIME DEFAULT GETDATE()";
59+
case INFORMIX:
60+
return "DATETIME YEAR TO SECOND DEFAULT CURRENT YEAR TO SECOND";
61+
case ORACLE:
62+
case POSTGRESQL:
63+
case MYSQL:
64+
case MARIADB:
65+
case SQLITE:
66+
case H2:
67+
case HSQLDB:
68+
case DERBY:
69+
case FIREBIRD:
70+
return "TIMESTAMP DEFAULT CURRENT_TIMESTAMP";
71+
case DB2:
72+
return "TIMESTAMP DEFAULT CURRENT TIMESTAMP";
73+
default:
74+
return "TIMESTAMP";
75+
}
76+
}
77+
78+
private String getMetadataType() {
79+
switch (sqlDialect) {
80+
case ORACLE:
81+
case DB2:
82+
return "CLOB";
83+
case FIREBIRD:
84+
return "BLOB SUB_TYPE TEXT";
85+
default:
86+
return "TEXT";
87+
}
88+
}
89+
90+
private String getErrorTraceType() {
91+
switch (sqlDialect) {
92+
case ORACLE:
93+
case DB2:
94+
return "CLOB";
95+
case FIREBIRD:
96+
return "BLOB SUB_TYPE TEXT";
97+
default:
98+
return "TEXT";
99+
}
100+
}
101+
102+
private String getBooleanType() {
103+
switch (sqlDialect) {
104+
case SQLSERVER:
105+
case SYBASE:
106+
return "BIT";
107+
case ORACLE:
108+
case DB2:
109+
case FIREBIRD:
110+
case INFORMIX:
111+
return "SMALLINT";
112+
default:
113+
return "BOOLEAN";
114+
}
115+
}
116+
117+
public String getCreateTableSqlString(String tableName) {
118+
String columns = String.format(COMMON_COLUMNS,
119+
getCreatedAtType(),
120+
getMetadataType(),
121+
getErrorTraceType(),
122+
getBooleanType(),
123+
getBooleanType());
124+
125+
switch (sqlDialect) {
126+
case MYSQL:
127+
case MARIADB:
128+
return String.format(
129+
"CREATE TABLE IF NOT EXISTS %s (" +
130+
"id BIGINT AUTO_INCREMENT PRIMARY KEY, " +
131+
columns +
132+
")", tableName);
133+
case POSTGRESQL:
134+
return String.format(
135+
"CREATE TABLE IF NOT EXISTS %s (" +
136+
"id SERIAL PRIMARY KEY, " +
137+
columns +
138+
")", tableName);
139+
case SQLITE:
140+
case H2:
141+
case HSQLDB:
142+
case DERBY:
143+
return String.format(
144+
"CREATE TABLE IF NOT EXISTS %s (" +
145+
"id INTEGER PRIMARY KEY AUTOINCREMENT, " +
146+
columns +
147+
")", tableName);
148+
case SQLSERVER:
149+
case SYBASE:
150+
return String.format(
151+
"IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='%s' AND xtype='U') " +
152+
"CREATE TABLE %s (" +
153+
"id BIGINT IDENTITY(1,1) PRIMARY KEY, " +
154+
columns +
155+
")", tableName, tableName);
156+
case ORACLE:
157+
return String.format(
158+
"BEGIN EXECUTE IMMEDIATE 'CREATE TABLE %s (" +
159+
"id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, " +
160+
columns +
161+
")'; EXCEPTION WHEN OTHERS THEN IF SQLCODE != -955 THEN RAISE; END IF; END;", tableName);
162+
case DB2:
163+
return String.format(
164+
"CREATE TABLE %s (" +
165+
"id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, " +
166+
columns +
167+
")", tableName);
168+
case FIREBIRD:
169+
return String.format(
170+
"CREATE TABLE %s (" +
171+
"id BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, " +
172+
columns +
173+
")", tableName);
174+
case INFORMIX:
175+
return String.format(
176+
"CREATE TABLE %s (" +
177+
"id SERIAL PRIMARY KEY, " +
178+
columns +
179+
")", tableName);
180+
default:
181+
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
182+
}
183+
}
184+
185+
public String getInsertSqlString(String tableName) {
186+
return String.format(
187+
"INSERT INTO %s (" +
188+
"execution_id, stage_id, task_id, author, created_at, state, class_name, method_name, metadata, " +
189+
"execution_millis, execution_hostname, error_trace, type, tx_type, target_system_id, order_col, " +
190+
"recovery_strategy, transaction_flag, system_change" +
191+
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
192+
tableName);
193+
}
194+
195+
public String getSelectHistorySqlString(String tableName) {
196+
return String.format(
197+
"SELECT execution_id, stage_id, task_id, author, created_at, state, class_name, method_name, metadata, " +
198+
"execution_millis, execution_hostname, error_trace, type, tx_type, target_system_id, order_col, " +
199+
"recovery_strategy, transaction_flag, system_change " +
200+
"FROM %s ORDER BY created_at DESC",
201+
tableName);
202+
}
203+
}

0 commit comments

Comments
 (0)