Skip to content

Commit 641fdb4

Browse files
committed
feat: update sql dialect helpers for sqlserver, postgresql and oracle. tests updated
1 parent a841a97 commit 641fdb4

File tree

41 files changed

+1886
-403
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1886
-403
lines changed

community/flamingock-auditstore-sql/build.gradle.kts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,14 @@ dependencies {
44
implementation(project(":utils:sql-util"))
55

66
testImplementation("mysql:mysql-connector-java:8.0.33")
7-
testImplementation(project(":utils:test-util"))
7+
testImplementation("com.microsoft.sqlserver:mssql-jdbc:12.4.2.jre8")
8+
testImplementation("com.oracle.database.jdbc:ojdbc8:21.9.0.0")
9+
testImplementation("org.postgresql:postgresql:42.7.3")
810
testImplementation("org.testcontainers:mysql:1.21.3")
11+
testImplementation("org.testcontainers:mssqlserver:1.21.3")
12+
testImplementation("org.testcontainers:oracle-xe:1.21.3")
13+
testImplementation("org.testcontainers:postgresql:1.21.3")
14+
testImplementation(project(":utils:test-util"))
915
testImplementation("com.zaxxer:HikariCP:3.4.5")
1016
testImplementation("org.testcontainers:junit-jupiter:1.21.3")
1117
testImplementation("com.h2database:h2:2.2.224")
@@ -22,4 +28,4 @@ java {
2228

2329
configurations.testImplementation {
2430
extendsFrom(configurations.compileOnly.get())
25-
}
31+
}

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlAuditorDialectHelper.java

Lines changed: 181 additions & 120 deletions
Large diffs are not rendered by default.

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlLockDialectHelper.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,17 @@ public String getCreateTableSqlString(String tableName) {
8080
}
8181

8282
public String getSelectLockSqlString(String tableName) {
83-
return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName);
83+
switch (sqlDialect) {
84+
case POSTGRESQL:
85+
return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ?", tableName);
86+
case SQLSERVER:
87+
case SYBASE:
88+
return String.format("SELECT [key], status, owner, expires_at FROM %s WITH (UPDLOCK, ROWLOCK) WHERE [key] = ?", tableName);
89+
case ORACLE:
90+
return String.format("SELECT \"key\", status, owner, expires_at FROM %s WHERE \"key\" = ? FOR UPDATE", tableName);
91+
default:
92+
return String.format("SELECT `key`, status, owner, expires_at FROM %s WHERE `key` = ?", tableName);
93+
}
8494
}
8595

8696
public String getInsertOrUpdateLockSqlString(String tableName) {
@@ -104,11 +114,14 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
104114
case SQLSERVER:
105115
case SYBASE:
106116
return String.format(
107-
"MERGE INTO %s AS target USING (SELECT ? AS [key], ? AS status, ? AS owner, ? AS expires_at) AS source " +
108-
"ON (target.[key] = source.[key]) " +
109-
"WHEN MATCHED THEN UPDATE SET status = source.status, owner = source.owner, expires_at = source.expires_at " +
110-
"WHEN NOT MATCHED THEN INSERT ([key], status, owner, expires_at) VALUES (source.[key], source.status, source.owner, source.expires_at);",
111-
tableName);
117+
"BEGIN TRANSACTION; " +
118+
"UPDATE %s SET status = ?, owner = ?, expires_at = ? WHERE [key] = ?; " +
119+
"IF @@ROWCOUNT = 0 " +
120+
"BEGIN " +
121+
"INSERT INTO %s ([key], status, owner, expires_at) VALUES (?, ?, ?, ?) " +
122+
"END; " +
123+
"COMMIT TRANSACTION;",
124+
tableName, tableName);
112125
case ORACLE:
113126
return String.format(
114127
"MERGE INTO %s t USING (SELECT ? AS \"key\", ? AS status, ? AS owner, ? AS expires_at FROM dual) s " +
@@ -141,5 +154,8 @@ public String getInsertOrUpdateLockSqlString(String tableName) {
141154
public String getDeleteLockSqlString(String tableName) {
142155
return String.format("DELETE FROM %s WHERE `key` = ?", tableName);
143156
}
144-
}
145157

158+
public SqlDialect getSqlDialect() {
159+
return sqlDialect;
160+
}
161+
}

community/flamingock-auditstore-sql/src/main/java/io/flamingock/community/sql/internal/SqlLockService.java

Lines changed: 66 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package io.flamingock.community.sql.internal;
1717

18+
import io.flamingock.internal.common.sql.SqlDialect;
1819
import io.flamingock.internal.core.store.lock.community.CommunityLockService;
1920
import io.flamingock.internal.core.store.lock.community.CommunityLockEntry;
2021
import io.flamingock.internal.core.store.lock.LockAcquisition;
@@ -59,15 +60,28 @@ public void initialize(boolean autoCreate) {
5960
public LockAcquisition upsert(LockKey key, RunnerId owner, long leaseMillis) {
6061
String keyStr = key.toString();
6162
LocalDateTime expiresAt = LocalDateTime.now().plusNanos(leaseMillis * 1_000_000);
63+
6264
try (Connection conn = dataSource.getConnection()) {
63-
CommunityLockEntry existing = getLockEntry(conn, keyStr);
64-
if (existing == null ||
65-
owner.toString().equals(existing.getOwner()) ||
66-
LocalDateTime.now().isAfter(existing.getExpiresAt())) {
67-
upsertLockEntry(conn, keyStr, owner.toString(), expiresAt);
68-
} else {
69-
throw new LockServiceException("upsert", keyStr,
70-
"Still locked by " + existing.getOwner() + " until " + existing.getExpiresAt());
65+
conn.setAutoCommit(false);
66+
try {
67+
CommunityLockEntry existing = getLockEntry(conn, keyStr);
68+
if (existing == null ||
69+
owner.toString().equals(existing.getOwner()) ||
70+
LocalDateTime.now().isAfter(existing.getExpiresAt())) {
71+
upsertLockEntry(conn, keyStr, owner.toString(), expiresAt);
72+
if (dialectHelper.getSqlDialect() != SqlDialect.SQLSERVER && dialectHelper.getSqlDialect() != SqlDialect.SYBASE) {
73+
conn.commit();
74+
}
75+
} else {
76+
conn.rollback();
77+
throw new LockServiceException("upsert", keyStr,
78+
"Still locked by " + existing.getOwner() + " until " + existing.getExpiresAt());
79+
}
80+
} catch (Exception e) {
81+
conn.rollback();
82+
throw e;
83+
} finally {
84+
conn.setAutoCommit(true);
7185
}
7286
} catch (SQLException e) {
7387
throw new LockServiceException("upsert", keyStr, e.getMessage());
@@ -79,13 +93,26 @@ public LockAcquisition upsert(LockKey key, RunnerId owner, long leaseMillis) {
7993
public LockAcquisition extendLock(LockKey key, RunnerId owner, long leaseMillis) throws LockServiceException {
8094
String keyStr = key.toString();
8195
LocalDateTime expiresAt = LocalDateTime.now().plusNanos(leaseMillis * 1_000_000);
96+
8297
try (Connection conn = dataSource.getConnection()) {
83-
CommunityLockEntry existing = getLockEntry(conn, keyStr);
84-
if (existing != null && owner.toString().equals(existing.getOwner())) {
85-
upsertLockEntry(conn, keyStr, owner.toString(), expiresAt);
86-
} else {
87-
throw new LockServiceException("extendLock", keyStr,
88-
"Lock belongs to " + (existing != null ? existing.getOwner() : "none"));
98+
conn.setAutoCommit(false);
99+
try {
100+
CommunityLockEntry existing = getLockEntry(conn, keyStr);
101+
if (existing != null && owner.toString().equals(existing.getOwner())) {
102+
upsertLockEntry(conn, keyStr, owner.toString(), expiresAt);
103+
if (dialectHelper.getSqlDialect() != SqlDialect.SQLSERVER && dialectHelper.getSqlDialect() != SqlDialect.SYBASE) {
104+
conn.commit();
105+
}
106+
} else {
107+
conn.rollback();
108+
throw new LockServiceException("extendLock", keyStr,
109+
"Lock belongs to " + (existing != null ? existing.getOwner() : "none"));
110+
}
111+
} catch (Exception e) {
112+
conn.rollback();
113+
throw e;
114+
} finally {
115+
conn.setAutoCommit(true);
89116
}
90117
} catch (SQLException e) {
91118
throw new LockServiceException("extendLock", keyStr, e.getMessage());
@@ -139,7 +166,7 @@ private CommunityLockEntry getLockEntry(Connection conn, String key) throws SQLE
139166
try (ResultSet rs = ps.executeQuery()) {
140167
if (rs.next()) {
141168
return new CommunityLockEntry(
142-
rs.getString("key"),
169+
rs.getString(1), // key column
143170
LockStatus.valueOf(rs.getString("status")),
144171
rs.getString("owner"),
145172
rs.getTimestamp("expires_at").toLocalDateTime()
@@ -151,13 +178,30 @@ private CommunityLockEntry getLockEntry(Connection conn, String key) throws SQLE
151178
}
152179

153180
private void upsertLockEntry(Connection conn, String key, String owner, LocalDateTime expiresAt) throws SQLException {
154-
try (PreparedStatement ps = conn.prepareStatement(
155-
dialectHelper.getInsertOrUpdateLockSqlString(lockRepositoryName))) {
156-
ps.setString(1, key);
157-
ps.setString(2, LockStatus.LOCK_HELD.name());
158-
ps.setString(3, owner);
159-
ps.setTimestamp(4, Timestamp.valueOf(expiresAt));
160-
ps.executeUpdate();
181+
String sql = dialectHelper.getInsertOrUpdateLockSqlString(lockRepositoryName);
182+
183+
if (dialectHelper.getSqlDialect() == SqlDialect.SQLSERVER || dialectHelper.getSqlDialect() == SqlDialect.SYBASE) {
184+
// For SQL Server, the SQL already contains transaction management
185+
try (Statement stmt = conn.createStatement()) {
186+
String formattedSql = sql.replace("?", "'" + LockStatus.LOCK_HELD.name() + "'")
187+
.replaceFirst("'[^']*'", "'" + LockStatus.LOCK_HELD.name() + "'")
188+
.replaceFirst("'[^']*'", "'" + owner + "'")
189+
.replaceFirst("'[^']*'", "'" + Timestamp.valueOf(expiresAt) + "'")
190+
.replaceFirst("'[^']*'", "'" + key + "'")
191+
.replaceFirst("'[^']*'", "'" + key + "'")
192+
.replaceFirst("'[^']*'", "'" + LockStatus.LOCK_HELD.name() + "'")
193+
.replaceFirst("'[^']*'", "'" + owner + "'")
194+
.replaceFirst("'[^']*'", "'" + Timestamp.valueOf(expiresAt) + "'");
195+
stmt.execute(formattedSql);
196+
}
197+
} else {
198+
try (PreparedStatement ps = conn.prepareStatement(sql)) {
199+
ps.setString(1, key);
200+
ps.setString(2, LockStatus.LOCK_HELD.name());
201+
ps.setString(3, owner);
202+
ps.setTimestamp(4, Timestamp.valueOf(expiresAt));
203+
ps.executeUpdate();
204+
}
161205
}
162206
}
163207
}

0 commit comments

Comments
 (0)