Skip to content

Commit 395219a

Browse files
committed
feat: sql audit store for informix
1 parent f69dc74 commit 395219a

File tree

10 files changed

+329
-68
lines changed

10 files changed

+329
-68
lines changed

community/flamingock-auditstore-sql/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
testImplementation("com.h2database:h2:2.2.224")
2020
testImplementation("org.mockito:mockito-inline:4.11.0")
2121
testImplementation("org.xerial:sqlite-jdbc:3.41.2.1")
22+
testImplementation("com.ibm.informix:jdbc:4.50.10")
2223
}
2324

2425
description = "SQL audit store implementation for distributed change auditing"

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlAuditor.java

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.flamingock.internal.common.core.audit.AuditEntry;
1919
import io.flamingock.internal.common.core.audit.AuditReader;
2020
import io.flamingock.internal.common.core.audit.AuditTxType;
21+
import io.flamingock.internal.common.sql.SqlDialect;
2122
import io.flamingock.internal.core.store.audit.LifecycleAuditWriter;
2223
import io.flamingock.internal.util.Result;
2324

@@ -53,35 +54,53 @@ public void initialize() {
5354

5455
@Override
5556
public Result writeEntry(AuditEntry auditEntry) {
56-
try (Connection conn = dataSource.getConnection();
57-
PreparedStatement ps = conn.prepareStatement(
58-
dialectHelper.getInsertSqlString(auditTableName))) {
59-
ps.setString(1, auditEntry.getExecutionId());
60-
ps.setString(2, auditEntry.getStageId());
61-
ps.setString(3, auditEntry.getTaskId());
62-
ps.setString(4, auditEntry.getAuthor());
63-
ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt()));
64-
ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null);
65-
ps.setString(7, auditEntry.getClassName());
66-
ps.setString(8, auditEntry.getMethodName());
67-
ps.setString(9, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null);
68-
ps.setLong(10, auditEntry.getExecutionMillis());
69-
ps.setString(11, auditEntry.getExecutionHostname());
70-
ps.setString(12, auditEntry.getErrorTrace());
71-
ps.setString(13, auditEntry.getType() != null ? auditEntry.getType().name() : null);
72-
ps.setString(14, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null);
73-
ps.setString(15, auditEntry.getTargetSystemId());
74-
ps.setString(16, auditEntry.getOrder());
75-
ps.setString(17, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null);
76-
ps.setObject(18, auditEntry.getTransactionFlag());
77-
ps.setObject(19, auditEntry.getSystemChange());
78-
ps.executeUpdate();
57+
Connection conn = null;
58+
try {
59+
conn = dataSource.getConnection();
60+
61+
// For Informix, ensure autoCommit is enabled for audit writes
62+
if (dialectHelper.getSqlDialect() == SqlDialect.INFORMIX) {
63+
conn.setAutoCommit(true);
64+
}
65+
66+
try (PreparedStatement ps = conn.prepareStatement(
67+
dialectHelper.getInsertSqlString(auditTableName))) {
68+
ps.setString(1, auditEntry.getExecutionId());
69+
ps.setString(2, auditEntry.getStageId());
70+
ps.setString(3, auditEntry.getTaskId());
71+
ps.setString(4, auditEntry.getAuthor());
72+
ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt()));
73+
ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null);
74+
ps.setString(7, auditEntry.getClassName());
75+
ps.setString(8, auditEntry.getMethodName());
76+
ps.setString(9, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null);
77+
ps.setLong(10, auditEntry.getExecutionMillis());
78+
ps.setString(11, auditEntry.getExecutionHostname());
79+
ps.setString(12, auditEntry.getErrorTrace());
80+
ps.setString(13, auditEntry.getType() != null ? auditEntry.getType().name() : null);
81+
ps.setString(14, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null);
82+
ps.setString(15, auditEntry.getTargetSystemId());
83+
ps.setString(16, auditEntry.getOrder());
84+
ps.setString(17, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null);
85+
ps.setObject(18, auditEntry.getTransactionFlag());
86+
ps.setObject(19, auditEntry.getSystemChange());
87+
ps.executeUpdate();
88+
}
7989
return Result.OK();
8090
} catch (SQLException e) {
8191
return new Result.Error(e);
92+
} finally {
93+
if (conn != null) {
94+
try {
95+
conn.close();
96+
} catch (SQLException e) {
97+
// Log but don't throw
98+
}
99+
}
82100
}
83101
}
84102

103+
85104
@Override
86105
public List<AuditEntry> getAuditHistory() {
87106
List<AuditEntry> entries = new ArrayList<>();

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlAuditorDialectHelper.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public String getCreateTableSqlString(String tableName) {
3737
case H2:
3838
case HSQLDB:
3939
case FIREBIRD:
40-
case INFORMIX:
4140
return String.format(
4241
"CREATE TABLE IF NOT EXISTS %s (" +
4342
"id %s PRIMARY KEY, " +
@@ -189,6 +188,32 @@ public String getCreateTableSqlString(String tableName) {
189188
"transaction_flag INTEGER, " +
190189
"system_change INTEGER" +
191190
")", tableName);
191+
case INFORMIX:
192+
return String.format(
193+
"CREATE TABLE IF NOT EXISTS %s (" +
194+
"id SERIAL8 PRIMARY KEY, " +
195+
"execution_id VARCHAR(100), " +
196+
"stage_id VARCHAR(100), " +
197+
"task_id VARCHAR(100), " +
198+
"author VARCHAR(100), " +
199+
"created_at DATETIME YEAR TO FRACTION(3) DEFAULT CURRENT YEAR TO FRACTION(3), " +
200+
"state VARCHAR(50), " +
201+
"class_name VARCHAR(200), " +
202+
"method_name VARCHAR(100), " +
203+
"metadata LVARCHAR(8000), " +
204+
"execution_millis BIGINT, " +
205+
"execution_hostname VARCHAR(100), " +
206+
"error_trace LVARCHAR(8000), " +
207+
"type VARCHAR(50), " +
208+
"tx_type VARCHAR(50), " +
209+
"target_system_id VARCHAR(100), " +
210+
"order_col VARCHAR(50), " +
211+
"recovery_strategy VARCHAR(50), " +
212+
"transaction_flag BOOLEAN, " +
213+
"system_change BOOLEAN" +
214+
")", tableName);
215+
216+
192217
default:
193218
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
194219
}

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlLockDialectHelper.java

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public String getCreateTableSqlString(String tableName) {
5050
case H2:
5151
case HSQLDB:
5252
case FIREBIRD:
53-
case INFORMIX:
5453
return String.format(
5554
"CREATE TABLE IF NOT EXISTS %s (" +
5655
"`key` VARCHAR(255) PRIMARY KEY," +
@@ -86,6 +85,14 @@ public String getCreateTableSqlString(String tableName) {
8685
"owner VARCHAR(255), " +
8786
"expires_at TIMESTAMP)'; " +
8887
"END", tableName);
88+
case INFORMIX:
89+
return String.format(
90+
"CREATE TABLE %s (" +
91+
"lock_key VARCHAR(255) PRIMARY KEY, " +
92+
"status VARCHAR(32), " +
93+
"owner VARCHAR(255), " +
94+
"expires_at DATETIME YEAR TO FRACTION(3)" +
95+
")", tableName);
8996
default:
9097
throw new UnsupportedOperationException("Dialect not supported for CREATE TABLE: " + sqlDialect.name());
9198
}
@@ -103,6 +110,8 @@ public String getSelectLockSqlString(String tableName) {
103110
return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName);
104111
case ORACLE:
105112
return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName);
113+
case INFORMIX:
114+
return String.format("SELECT lock_key, status, owner, expires_at FROM %s WHERE lock_key = ?", tableName);
106115
default:
107116
return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName);
108117
}
@@ -112,7 +121,6 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
112121
switch (sqlDialect) {
113122
case MYSQL:
114123
case MARIADB:
115-
case INFORMIX:
116124
return String.format(
117125
"INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) " +
118126
"ON DUPLICATE KEY UPDATE status = VALUES(status), owner = VALUES(owner), expires_at = VALUES(expires_at)",
@@ -161,6 +169,15 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
161169
return String.format(
162170
"UPDATE OR INSERT INTO %s (`key`, status, owner, expires_at) VALUES (?, ?, ?, ?) MATCHING (`key`)",
163171
tableName);
172+
case INFORMIX:
173+
// Informix doesn't support ON DUPLICATE KEY UPDATE
174+
// Use a procedural approach similar to SQL Server
175+
return String.format(
176+
"UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?; " +
177+
"INSERT INTO %s (lock_key, status, owner, expires_at) " +
178+
"SELECT ?, ?, ?, ? FROM sysmaster:sysdual " +
179+
"WHERE NOT EXISTS (SELECT 1 FROM %s WHERE lock_key = ?)",
180+
tableName, tableName, tableName);
164181
default:
165182
throw new UnsupportedOperationException("Dialect not supported for upsert: " + sqlDialect.name());
166183
}
@@ -170,7 +187,7 @@ public String getDeleteLockSqlString(String tableName) {
170187
if (Objects.requireNonNull(sqlDialect) == SqlDialect.POSTGRESQL) {
171188
return String.format("DELETE FROM %s WHERE \"key\" = ?", tableName);
172189
}
173-
if (sqlDialect == SqlDialect.DB2) {
190+
if (sqlDialect == SqlDialect.INFORMIX || sqlDialect == SqlDialect.DB2) {
174191
return String.format("DELETE FROM %s WHERE lock_key = ?", tableName);
175192
}
176193
return String.format("DELETE FROM %s WHERE `key` = ?", tableName);
@@ -196,13 +213,38 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin
196213
try (PreparedStatement insert = conn.prepareStatement(
197214
"INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) {
198215
insert.setString(1, key);
199-
// Use "LOCKED" string to avoid using a non-existing enum constant (previous "ACQUIRED" caused failures)
200216
insert.setString(2, LockStatus.LOCK_HELD.name());
201217
insert.setString(3, owner);
202218
insert.setTimestamp(4, Timestamp.valueOf(expiresAt));
203219
insert.executeUpdate();
204-
return;
205220
}
221+
return;
222+
}
223+
224+
if (getSqlDialect() == SqlDialect.INFORMIX) {
225+
// Try UPDATE first
226+
try (PreparedStatement update = conn.prepareStatement(
227+
"UPDATE " + tableName + " SET status = ?, owner = ?, expires_at = ? WHERE lock_key = ?")) {
228+
update.setString(1, LockStatus.LOCK_HELD.name());
229+
update.setString(2, owner);
230+
update.setTimestamp(3, Timestamp.valueOf(expiresAt));
231+
update.setString(4, key);
232+
int updated = update.executeUpdate();
233+
if (updated > 0) {
234+
return;
235+
}
236+
}
237+
238+
// If no row updated, try INSERT
239+
try (PreparedStatement insert = conn.prepareStatement(
240+
"INSERT INTO " + tableName + " (lock_key, status, owner, expires_at) VALUES (?, ?, ?, ?)")) {
241+
insert.setString(1, key);
242+
insert.setString(2, LockStatus.LOCK_HELD.name());
243+
insert.setString(3, owner);
244+
insert.setTimestamp(4, Timestamp.valueOf(expiresAt));
245+
insert.executeUpdate();
246+
}
247+
return;
206248
}
207249

208250
if (getSqlDialect() == SqlDialect.SQLSERVER || getSqlDialect() == SqlDialect.SYBASE) {
@@ -219,18 +261,20 @@ public void upsertLockEntry(Connection conn, String tableName, String key, Strin
219261
.replaceFirst("\\?", "'" + Timestamp.valueOf(expiresAt) + "'");
220262
stmt.execute(formattedSql);
221263
}
222-
} else {
223-
try (PreparedStatement ps = conn.prepareStatement(sql)) {
224-
// For DB2 we use lock_key but callers pass key as first parameter - that's correct
225-
ps.setString(1, key);
226-
ps.setString(2, LockStatus.LOCK_HELD.name());
227-
ps.setString(3, owner);
228-
ps.setTimestamp(4, Timestamp.valueOf(expiresAt));
229-
ps.executeUpdate();
230-
}
264+
return;
265+
}
266+
267+
// Default case for other dialects
268+
try (PreparedStatement ps = conn.prepareStatement(sql)) {
269+
ps.setString(1, key);
270+
ps.setString(2, LockStatus.LOCK_HELD.name());
271+
ps.setString(3, owner);
272+
ps.setTimestamp(4, Timestamp.valueOf(expiresAt));
273+
ps.executeUpdate();
231274
}
232275
}
233276

277+
234278
public SqlDialect getSqlDialect() {
235279
return sqlDialect;
236280
}

0 commit comments

Comments
 (0)