Skip to content

Commit d79c7e9

Browse files
feat: sql audit store (#720)
1 parent 0573a33 commit d79c7e9

File tree

48 files changed

+3387
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+3387
-0
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
dependencies {
2+
api(project(":core:flamingock-core"))
3+
api(project(":core:target-systems:sql-target-system"))
4+
implementation(project(":utils:sql-util"))
5+
6+
testImplementation("mysql:mysql-connector-java:8.0.33")
7+
testImplementation("com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre8")
8+
testImplementation("com.oracle.database.jdbc:ojdbc8:21.9.0.0")
9+
testImplementation("org.postgresql:postgresql:42.7.3")
10+
testImplementation("org.mariadb.jdbc:mariadb-java-client:3.3.2")
11+
testImplementation("org.testcontainers:mysql:1.21.3")
12+
testImplementation("org.testcontainers:mssqlserver:1.21.3")
13+
testImplementation("org.testcontainers:oracle-xe:1.21.3")
14+
testImplementation("org.testcontainers:postgresql:1.21.3")
15+
testImplementation("org.testcontainers:mariadb:1.21.3")
16+
testImplementation(project(":utils:test-util"))
17+
testImplementation("com.zaxxer:HikariCP:3.4.5")
18+
testImplementation("org.testcontainers:junit-jupiter:1.21.3")
19+
testImplementation("com.h2database:h2:2.2.224")
20+
testImplementation("org.mockito:mockito-inline:4.11.0")
21+
testImplementation("org.xerial:sqlite-jdbc:3.41.2.1")
22+
}
23+
24+
description = "SQL audit store implementation for distributed change auditing"
25+
26+
java {
27+
toolchain {
28+
languageVersion.set(JavaLanguageVersion.of(8))
29+
}
30+
}
31+
32+
configurations.testImplementation {
33+
extendsFrom(configurations.compileOnly.get())
34+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2025 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.community.sql.driver;
17+
18+
import io.flamingock.internal.core.store.CommunityAuditStore;
19+
import io.flamingock.internal.core.store.audit.community.CommunityAuditPersistence;
20+
import io.flamingock.internal.core.store.lock.community.CommunityLockService;
21+
import io.flamingock.internal.common.core.context.ContextResolver;
22+
import io.flamingock.internal.core.configuration.community.CommunityConfigurable;
23+
import io.flamingock.internal.util.constants.CommunityPersistenceConstants;
24+
import io.flamingock.internal.util.id.RunnerId;
25+
import io.flamingock.community.sql.internal.SqlAuditPersistence;
26+
import io.flamingock.community.sql.internal.SqlLockService;
27+
28+
import javax.sql.DataSource;
29+
30+
public class SqlAuditStore implements CommunityAuditStore {
31+
32+
private final DataSource dataSource;
33+
private CommunityConfigurable communityConfiguration;
34+
private RunnerId runnerId;
35+
private SqlAuditPersistence persistence;
36+
private SqlLockService lockService;
37+
private String auditRepositoryName = CommunityPersistenceConstants.DEFAULT_AUDIT_STORE_NAME;
38+
private String lockRepositoryName = CommunityPersistenceConstants.DEFAULT_LOCK_STORE_NAME;
39+
private boolean autoCreate = true;
40+
41+
public SqlAuditStore(DataSource dataSource) {
42+
this.dataSource = dataSource;
43+
}
44+
45+
public SqlAuditStore withAuditRepositoryName(String auditRepositoryName) {
46+
this.auditRepositoryName = auditRepositoryName;
47+
return this;
48+
}
49+
50+
public SqlAuditStore withLockRepositoryName(String lockRepositoryName) {
51+
this.lockRepositoryName = lockRepositoryName;
52+
return this;
53+
}
54+
55+
public SqlAuditStore withAutoCreate(boolean autoCreate) {
56+
this.autoCreate = autoCreate;
57+
return this;
58+
}
59+
60+
@Override
61+
public void initialize(ContextResolver baseContext) {
62+
runnerId = baseContext.getRequiredDependencyValue(RunnerId.class);
63+
communityConfiguration = baseContext.getRequiredDependencyValue(CommunityConfigurable.class);
64+
}
65+
66+
@Override
67+
public synchronized CommunityAuditPersistence getPersistence() {
68+
if (persistence == null) {
69+
persistence = new SqlAuditPersistence(communityConfiguration, dataSource, auditRepositoryName, autoCreate);
70+
persistence.initialize(runnerId);
71+
}
72+
return persistence;
73+
}
74+
75+
76+
@Override
77+
public synchronized CommunityLockService getLockService() {
78+
if (lockService == null) {
79+
lockService = new SqlLockService(dataSource, lockRepositoryName);
80+
lockService.initialize(autoCreate);
81+
}
82+
return lockService;
83+
}
84+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2025 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.community.sql.internal;
17+
18+
import io.flamingock.internal.core.configuration.community.CommunityConfigurable;
19+
import io.flamingock.internal.core.store.audit.community.AbstractCommunityAuditPersistence;
20+
import io.flamingock.internal.common.core.audit.AuditEntry;
21+
import io.flamingock.internal.util.Result;
22+
import io.flamingock.internal.util.id.RunnerId;
23+
24+
import javax.sql.DataSource;
25+
import java.util.List;
26+
27+
public class SqlAuditPersistence extends AbstractCommunityAuditPersistence {
28+
29+
private final DataSource dataSource;
30+
private final String auditRepositoryName;
31+
private final boolean autoCreate;
32+
private SqlAuditor auditor;
33+
34+
public SqlAuditPersistence(CommunityConfigurable localConfiguration,
35+
DataSource dataSource,
36+
String auditRepositoryName,
37+
boolean autoCreate) {
38+
super(localConfiguration);
39+
this.dataSource = dataSource;
40+
this.auditRepositoryName = auditRepositoryName;
41+
this.autoCreate = autoCreate;
42+
}
43+
44+
@Override
45+
protected void doInitialize(RunnerId runnerId) {
46+
auditor = new SqlAuditor(dataSource, auditRepositoryName, autoCreate);
47+
auditor.initialize();
48+
}
49+
50+
@Override
51+
public List<AuditEntry> getAuditHistory() {
52+
return auditor.getAuditHistory();
53+
}
54+
55+
@Override
56+
public Result writeEntry(AuditEntry auditEntry) {
57+
return auditor.writeEntry(auditEntry);
58+
}
59+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2025 Flamingock (https://www.flamingock.io)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.flamingock.community.sql.internal;
17+
18+
import io.flamingock.internal.common.core.audit.AuditEntry;
19+
import io.flamingock.internal.common.core.audit.AuditReader;
20+
import io.flamingock.internal.common.core.audit.AuditTxType;
21+
import io.flamingock.internal.core.store.audit.LifecycleAuditWriter;
22+
import io.flamingock.internal.util.Result;
23+
24+
import javax.sql.DataSource;
25+
import java.sql.*;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
29+
public class SqlAuditor implements LifecycleAuditWriter, AuditReader {
30+
31+
private final DataSource dataSource;
32+
private final String auditTableName;
33+
private final boolean autoCreate;
34+
private final SqlAuditorDialectHelper dialectHelper;
35+
36+
public SqlAuditor(DataSource dataSource, String auditTableName, boolean autoCreate) {
37+
this.dataSource = dataSource;
38+
this.auditTableName = auditTableName;
39+
this.autoCreate = autoCreate;
40+
this.dialectHelper = new SqlAuditorDialectHelper(dataSource);
41+
}
42+
43+
public void initialize() {
44+
if (autoCreate) {
45+
try (Connection conn = dataSource.getConnection();
46+
Statement stmt = conn.createStatement()) {
47+
stmt.executeUpdate(dialectHelper.getCreateTableSqlString(auditTableName));
48+
} catch (SQLException e) {
49+
throw new RuntimeException("Failed to initialize audit table", e);
50+
}
51+
}
52+
}
53+
54+
@Override
55+
public Result writeEntry(AuditEntry auditEntry) {
56+
try (Connection conn = dataSource.getConnection();
57+
PreparedStatement ps = conn.prepareStatement(
58+
dialectHelper.getInsertSqlString(auditTableName))) {
59+
ps.setString(1, auditEntry.getExecutionId());
60+
ps.setString(2, auditEntry.getStageId());
61+
ps.setString(3, auditEntry.getTaskId());
62+
ps.setString(4, auditEntry.getAuthor());
63+
ps.setTimestamp(5, Timestamp.valueOf(auditEntry.getCreatedAt()));
64+
ps.setString(6, auditEntry.getState() != null ? auditEntry.getState().name() : null);
65+
ps.setString(7, auditEntry.getClassName());
66+
ps.setString(8, auditEntry.getMethodName());
67+
ps.setString(9, auditEntry.getMetadata() != null ? auditEntry.getMetadata().toString() : null);
68+
ps.setLong(10, auditEntry.getExecutionMillis());
69+
ps.setString(11, auditEntry.getExecutionHostname());
70+
ps.setString(12, auditEntry.getErrorTrace());
71+
ps.setString(13, auditEntry.getType() != null ? auditEntry.getType().name() : null);
72+
ps.setString(14, auditEntry.getTxType() != null ? auditEntry.getTxType().name() : null);
73+
ps.setString(15, auditEntry.getTargetSystemId());
74+
ps.setString(16, auditEntry.getOrder());
75+
ps.setString(17, auditEntry.getRecoveryStrategy() != null ? auditEntry.getRecoveryStrategy().name() : null);
76+
ps.setObject(18, auditEntry.getTransactionFlag());
77+
ps.setObject(19, auditEntry.getSystemChange());
78+
ps.executeUpdate();
79+
return Result.OK();
80+
} catch (SQLException e) {
81+
return new Result.Error(e);
82+
}
83+
}
84+
85+
@Override
86+
public List<AuditEntry> getAuditHistory() {
87+
List<AuditEntry> entries = new ArrayList<>();
88+
try (Connection conn = dataSource.getConnection();
89+
Statement stmt = conn.createStatement();
90+
ResultSet rs = stmt.executeQuery(dialectHelper.getSelectHistorySqlString(auditTableName))) {
91+
while (rs.next()) {
92+
AuditEntry entry = new AuditEntry(
93+
rs.getString("execution_id"),
94+
rs.getString("stage_id"),
95+
rs.getString("task_id"),
96+
rs.getString("author"),
97+
rs.getTimestamp("created_at").toLocalDateTime(),
98+
rs.getString("state") != null ? AuditEntry.Status.valueOf(rs.getString("state")) : null,
99+
rs.getString("type") != null ? AuditEntry.ExecutionType.valueOf(rs.getString("type")) : null,
100+
rs.getString("class_name"),
101+
rs.getString("method_name"),
102+
rs.getLong("execution_millis"),
103+
rs.getString("execution_hostname"),
104+
rs.getString("metadata"),
105+
rs.getBoolean("system_change"),
106+
rs.getString("error_trace"),
107+
AuditTxType.fromString(rs.getString("tx_type")),
108+
rs.getString("target_system_id"),
109+
rs.getString("order_col"),
110+
rs.getString("recovery_strategy") != null ? io.flamingock.api.RecoveryStrategy.valueOf(rs.getString("recovery_strategy")) : null,
111+
rs.getObject("transaction_flag") != null ? rs.getBoolean("transaction_flag") : null
112+
);
113+
entries.add(entry);
114+
}
115+
} catch (SQLException e) {
116+
throw new RuntimeException("Failed to read audit history", e);
117+
}
118+
return entries;
119+
}
120+
}

0 commit comments

Comments
 (0)