Skip to content

Commit 37ac101

Browse files
committed
feat: 增加opengauss支持
1 parent 5e85f93 commit 37ac101

File tree

7 files changed

+442
-0
lines changed

7 files changed

+442
-0
lines changed
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package org.hswebframework.ezorm.rdb.supports.opengauss;
2+
3+
import lombok.AllArgsConstructor;
4+
import org.apache.commons.collections4.CollectionUtils;
5+
import org.hswebframework.ezorm.core.param.Term;
6+
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
7+
import org.hswebframework.ezorm.rdb.executor.SyncSqlExecutor;
8+
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
9+
import org.hswebframework.ezorm.rdb.mapping.defaults.SaveResult;
10+
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
11+
import org.hswebframework.ezorm.rdb.metadata.RDBIndexMetadata;
12+
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
13+
import org.hswebframework.ezorm.rdb.operator.builder.fragments.AppendableSqlFragments;
14+
import org.hswebframework.ezorm.rdb.operator.builder.fragments.EmptySqlFragments;
15+
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SimpleTermsFragmentBuilder;
16+
import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments;
17+
import org.hswebframework.ezorm.rdb.operator.builder.fragments.insert.BatchInsertSqlBuilder;
18+
import org.hswebframework.ezorm.rdb.operator.dml.insert.InsertColumn;
19+
import org.hswebframework.ezorm.rdb.operator.dml.insert.InsertOperatorParameter;
20+
import org.hswebframework.ezorm.rdb.operator.dml.upsert.*;
21+
import org.hswebframework.ezorm.rdb.utils.ExceptionUtils;
22+
import reactor.core.publisher.Mono;
23+
24+
import java.util.HashSet;
25+
import java.util.List;
26+
import java.util.Objects;
27+
import java.util.Set;
28+
import java.util.function.Supplier;
29+
import java.util.stream.Collectors;
30+
31+
@SuppressWarnings("all")
32+
public class OpengaussBatchUpsertOperator implements SaveOrUpdateOperator {
33+
34+
private RDBTableMetadata table;
35+
36+
private OpengaussUpsertBatchInsertSqlBuilder builder;
37+
38+
private SqlFragments prefix;
39+
40+
private SaveOrUpdateOperator fallback;
41+
42+
private Set<String> primaryColumns;
43+
44+
public OpengaussBatchUpsertOperator(RDBTableMetadata table) {
45+
this.table = table;
46+
this.fallback = new DefaultSaveOrUpdateOperator(table);
47+
this.builder = new OpengaussUpsertBatchInsertSqlBuilder(table);
48+
}
49+
50+
@Override
51+
public SaveResultOperator execute(UpsertOperatorParameter parameter) {
52+
if (getOrCreateOnConflict().isEmpty()) {
53+
return fallback.execute(parameter);
54+
}
55+
return new OpengaussSaveResultOperator(() -> builder.build(new OpengaussUpsertOperatorParameter(parameter)));
56+
}
57+
58+
SqlFragments getOrCreateOnConflict() {
59+
if (prefix == null) {
60+
prefix = createOnConflict();
61+
}
62+
return prefix;
63+
}
64+
65+
SqlFragments createOnConflict() {
66+
return SqlFragments.of("on duplicate key");
67+
}
68+
69+
class OpengaussUpsertOperatorParameter extends InsertOperatorParameter {
70+
71+
private boolean doNoThingOnConflict;
72+
73+
private List<Term> where;
74+
75+
public OpengaussUpsertOperatorParameter(UpsertOperatorParameter parameter) {
76+
doNoThingOnConflict = parameter.isDoNothingOnConflict();
77+
setColumns(parameter.toInsertColumns());
78+
setValues(parameter.getValues());
79+
where = parameter.getWhere();
80+
}
81+
82+
}
83+
84+
@AllArgsConstructor
85+
private class OpengaussSaveResultOperator implements SaveResultOperator {
86+
87+
Supplier<SqlRequest> sqlRequest;
88+
89+
@Override
90+
public SaveResult sync() {
91+
return ExceptionUtils.translation(() -> {
92+
SyncSqlExecutor sqlExecutor = table.findFeatureNow(SyncSqlExecutor.ID);
93+
int updated = sqlExecutor.update(sqlRequest.get());
94+
return SaveResult.of(0, updated);
95+
}, table);
96+
}
97+
98+
@Override
99+
public Mono<SaveResult> reactive() {
100+
return Mono
101+
.fromSupplier(sqlRequest)
102+
.as(table.findFeatureNow(ReactiveSqlExecutor.ID)::update)
103+
.map(i -> SaveResult.of(0, i))
104+
.as(ExceptionUtils.translation(table));
105+
}
106+
}
107+
108+
static SqlFragments UPDATE_SET = SqlFragments.of("update");
109+
110+
private class OpengaussUpsertBatchInsertSqlBuilder extends BatchInsertSqlBuilder {
111+
112+
113+
public OpengaussUpsertBatchInsertSqlBuilder(RDBTableMetadata table) {
114+
super(table);
115+
}
116+
117+
@Override
118+
protected boolean isPrimaryKey(RDBColumnMetadata col) {
119+
getOrCreateOnConflict();
120+
if (primaryColumns != null && primaryColumns.contains(col.getName())) {
121+
return true;
122+
}
123+
return super.isPrimaryKey(col);
124+
}
125+
126+
@Override
127+
protected int computeSqlSize(int columnSize, int valueSize) {
128+
return super.computeSqlSize(columnSize, valueSize) + columnSize * 3 + 2;
129+
}
130+
131+
@Override
132+
protected AppendableSqlFragments afterBuild(Set<InsertColumn> columns, InsertOperatorParameter parameter, AppendableSqlFragments sql) {
133+
sql.add(createOnConflict());
134+
135+
if (((OpengaussUpsertOperatorParameter) parameter).doNoThingOnConflict) {
136+
sql.addSql("nothing");
137+
return sql;
138+
}
139+
140+
141+
int index = 0;
142+
boolean more = false;
143+
for (InsertColumn column : columns) {
144+
145+
index++;
146+
if (column instanceof UpsertColumn && ((UpsertColumn) column).isUpdateIgnore()) {
147+
continue;
148+
}
149+
RDBColumnMetadata columnMetadata = table.getColumn(column.getColumn()).orElse(null);
150+
if (columnMetadata == null
151+
|| columnMetadata.isPrimaryKey()
152+
|| !columnMetadata.isUpdatable()
153+
|| !columnMetadata.isSaveable()) {
154+
155+
continue;
156+
}
157+
if (more) {
158+
sql.add(SqlFragments.COMMA);
159+
} else {
160+
sql.add(UPDATE_SET);
161+
}
162+
more = true;
163+
sql.addSql(columnMetadata.getQuoteName())
164+
.add(SqlFragments.EQUAL);
165+
166+
sql.addSql(
167+
"coalesce(", columnMetadata.getFullName("excluded"), ",", columnMetadata.getFullName(), ")"
168+
);
169+
// sql.addSql(columnMetadata.getFullName("excluded"));
170+
}
171+
if (!more) {
172+
sql.addSql("nothing");
173+
} else {
174+
// FIXME: 2021/4/15 实现类似 table._time>excluded._time的条件控制功能
175+
List<Term> where = ((OpengaussUpsertOperatorParameter) parameter).where;
176+
if (CollectionUtils.isNotEmpty(where)) {
177+
SqlFragments fragments = SimpleTermsFragmentBuilder.instance().createTermFragments(table, where);
178+
if (fragments.isNotEmpty()) {
179+
sql.add(SqlFragments.WHERE).addFragments(fragments);
180+
}
181+
}
182+
}
183+
return sql;
184+
}
185+
186+
}
187+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.hswebframework.ezorm.rdb.supports.opengauss;
2+
3+
import org.hswebframework.ezorm.rdb.metadata.dialect.Dialect;
4+
import org.hswebframework.ezorm.rdb.supports.postgres.PostgresqlDialect;
5+
6+
/**
7+
* @author zhouhao
8+
* @since 3.0
9+
*/
10+
public class OpengaussDialect extends PostgresqlDialect {
11+
12+
public static final Dialect global = new OpengaussDialect();
13+
14+
public OpengaussDialect() {
15+
super();
16+
}
17+
18+
@Override
19+
public String getId() {
20+
return "opengauss";
21+
}
22+
23+
@Override
24+
public String getName() {
25+
return "Opengauss";
26+
}
27+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.hswebframework.ezorm.rdb.supports.opengauss;
2+
3+
import org.hswebframework.ezorm.rdb.codec.EnumValueCodec;
4+
import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
5+
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
6+
import org.hswebframework.ezorm.rdb.metadata.dialect.Dialect;
7+
import org.hswebframework.ezorm.rdb.operator.CompositeExceptionTranslation;
8+
import org.hswebframework.ezorm.rdb.supports.postgres.*;
9+
import org.hswebframework.ezorm.rdb.utils.FeatureUtils;
10+
11+
public class OpengaussSchemaMetadata extends RDBSchemaMetadata {
12+
13+
public OpengaussSchemaMetadata(String name) {
14+
super(name);
15+
addFeature(new PostgresqlPaginator());
16+
addFeature(PostgresqlDropIndexSqlBuilder.INSTANCE);
17+
addFeature(new PostgresqlAlterTableSqlBuilder());
18+
19+
addFeature(new PostgresqlTableMetadataParser(this));
20+
addFeature(new PostgresqlIndexMetadataParser(this));
21+
addFeature(Dialect.POSTGRES);
22+
23+
addFeature(new CompositeExceptionTranslation()
24+
.add(FeatureUtils.r2dbcIsAlive(), () -> PostgresqlR2DBCExceptionTranslation.of(this))
25+
);
26+
}
27+
28+
@Override
29+
public void addTable(RDBTableMetadata metadata) {
30+
metadata.addFeature(new OpengaussBatchUpsertOperator(metadata));
31+
super.addTable(metadata);
32+
}
33+
34+
@Override
35+
public RDBTableMetadata newTable(String name) {
36+
RDBTableMetadata metadata = super.newTable(name);
37+
metadata.addFeature(new OpengaussBatchUpsertOperator(metadata));
38+
metadata.setOnColumnAdded(column->{
39+
if(column.getValueCodec() instanceof EnumValueCodec &&((EnumValueCodec) column.getValueCodec()).isToMask()){
40+
column.addFeature(PostgresqlEnumInFragmentBuilder.in);
41+
column.addFeature(PostgresqlEnumInFragmentBuilder.notIn);
42+
}
43+
});
44+
return metadata;
45+
}
46+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.hswebframework.ezorm.rdb.supports.opengauss;
2+
3+
import org.hswebframework.ezorm.rdb.TestSyncSqlExecutor;
4+
import org.hswebframework.ezorm.rdb.executor.SyncSqlExecutor;
5+
import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
6+
import org.hswebframework.ezorm.rdb.metadata.dialect.Dialect;
7+
import org.hswebframework.ezorm.rdb.supports.BasicCommonTests;
8+
import org.hswebframework.ezorm.rdb.supports.postgres.PostgresqlConnectionProvider;
9+
import org.hswebframework.ezorm.rdb.supports.postgres.PostgresqlSchemaMetadata;
10+
11+
public class OpengaussBasicTest extends BasicCommonTests {
12+
@Override
13+
protected RDBSchemaMetadata getSchema() {
14+
return new OpengaussSchemaMetadata("gaussdb");
15+
}
16+
17+
@Override
18+
protected Dialect getDialect() {
19+
return new OpengaussDialect();
20+
}
21+
22+
@Override
23+
protected SyncSqlExecutor getSqlExecutor() {
24+
return new TestSyncSqlExecutor(new OpengaussConnectionProvider());
25+
}
26+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.hswebframework.ezorm.rdb.supports.opengauss;
2+
3+
import lombok.SneakyThrows;
4+
import org.hswebframework.ezorm.rdb.ConnectionProvider;
5+
import org.hswebframework.ezorm.rdb.Containers;
6+
import org.junit.Assert;
7+
import org.postgresql.Driver;
8+
import org.slf4j.LoggerFactory;
9+
import org.testcontainers.containers.GenericContainer;
10+
import org.testcontainers.containers.output.Slf4jLogConsumer;
11+
import org.testcontainers.containers.wait.strategy.Wait;
12+
import org.testcontainers.utility.DockerImageName;
13+
14+
import java.sql.Connection;
15+
import java.sql.DriverManager;
16+
17+
public class OpengaussConnectionProvider implements ConnectionProvider {
18+
19+
static int port;
20+
21+
static void load(){}
22+
static {
23+
Assert.assertTrue(Driver.isRegistered());
24+
GenericContainer<?> container =
25+
new GenericContainer<>(
26+
DockerImageName.parse("enmotech/opengauss-lite:5.0.1"))
27+
.withEnv("TZ", "Asia/Shanghai")
28+
.withEnv("GS_PASSWORD", "Admin@1234Hs")
29+
.withExposedPorts(5432)
30+
.withPrivilegedMode(true)
31+
.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(OpengaussConnectionProvider.class)))
32+
.waitingFor(Wait.forListeningPort());;
33+
34+
container.waitingFor(Wait.forListeningPort());
35+
container.start();
36+
port = container.getMappedPort(5432);
37+
try {
38+
Thread.sleep(5000);
39+
} catch (InterruptedException ignore) {
40+
41+
}
42+
}
43+
44+
@SneakyThrows
45+
public Connection getConnection() {
46+
47+
String username = System.getProperty("gauss.username", "gaussdb");
48+
String password = System.getProperty("gauss.password", "Admin@1234Hs");
49+
String url = System.getProperty("gauss.url", "127.0.0.1:" + port);
50+
String db = System.getProperty("gauss.db", "postgres");
51+
return DriverManager.getConnection("jdbc:postgresql://" + url + "/" + db, username, password);
52+
53+
}
54+
55+
@Override
56+
@SneakyThrows
57+
public void releaseConnect(Connection connection) {
58+
connection.close();
59+
}
60+
}

0 commit comments

Comments
 (0)