Skip to content

Commit 5135d15

Browse files
authored
[core] Support lock for postgres catalog (#5005)
1 parent bd9317e commit 5135d15

File tree

4 files changed

+241
-1
lines changed

4 files changed

+241
-1
lines changed

paimon-core/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,20 @@ under the License.
233233
<scope>test</scope>
234234
</dependency>
235235

236+
<dependency>
237+
<groupId>org.testcontainers</groupId>
238+
<artifactId>postgresql</artifactId>
239+
<version>${testcontainers.version}</version>
240+
<scope>test</scope>
241+
</dependency>
242+
243+
<dependency>
244+
<groupId>org.postgresql</groupId>
245+
<artifactId>postgresql</artifactId>
246+
<version>42.7.3</version>
247+
</dependency>
248+
249+
236250
</dependencies>
237251

238252
<build>

paimon-core/src/main/java/org/apache/paimon/jdbc/DistributedLockDialectFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ static JdbcDistributedLockDialect create(String protocol) {
2626
return new SqlLiteDistributedLockDialect();
2727
case MYSQL:
2828
return new MysqlDistributedLockDialect();
29+
case POSTGRESQL:
30+
return new PostgresqlDistributedLockDialect();
2931
default:
3032
throw new UnsupportedOperationException(
3133
String.format("Distributed locks based on %s are not supported", protocol));
@@ -36,6 +38,7 @@ static JdbcDistributedLockDialect create(String protocol) {
3638
enum JdbcProtocol {
3739
SQLITE,
3840
MARIADB,
39-
MYSQL
41+
MYSQL,
42+
POSTGRESQL
4043
}
4144
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.jdbc;
20+
21+
/** Distributed lock implementation based on postgres table. */
22+
public class PostgresqlDistributedLockDialect extends AbstractDistributedLockDialect {
23+
24+
@Override
25+
public String getCreateTableSql() {
26+
return "CREATE TABLE "
27+
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
28+
+ "("
29+
+ JdbcUtils.LOCK_ID
30+
+ " VARCHAR(%s) NOT NULL,"
31+
+ JdbcUtils.ACQUIRED_AT
32+
+ " TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,"
33+
+ JdbcUtils.EXPIRE_TIME
34+
+ " BIGINT DEFAULT 0 NOT NULL,"
35+
+ "PRIMARY KEY ("
36+
+ JdbcUtils.LOCK_ID
37+
+ ")"
38+
+ ")";
39+
}
40+
41+
@Override
42+
public String getLockAcquireSql() {
43+
return "INSERT INTO "
44+
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
45+
+ " ("
46+
+ JdbcUtils.LOCK_ID
47+
+ ","
48+
+ JdbcUtils.EXPIRE_TIME
49+
+ ") VALUES (?,?)";
50+
}
51+
52+
@Override
53+
public String getReleaseLockSql() {
54+
return "DELETE FROM "
55+
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
56+
+ " WHERE "
57+
+ JdbcUtils.LOCK_ID
58+
+ " = ?";
59+
}
60+
61+
@Override
62+
public String getTryReleaseTimedOutLock() {
63+
return "DELETE FROM "
64+
+ JdbcUtils.DISTRIBUTED_LOCKS_TABLE_NAME
65+
+ " WHERE EXTRACT(EPOCH FROM AGE(NOW(), "
66+
+ JdbcUtils.ACQUIRED_AT
67+
+ ")) >"
68+
+ JdbcUtils.EXPIRE_TIME
69+
+ " and "
70+
+ JdbcUtils.LOCK_ID
71+
+ " = ?";
72+
}
73+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.jdbc;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.CatalogContext;
23+
import org.apache.paimon.fs.FileIO;
24+
import org.apache.paimon.fs.Path;
25+
import org.apache.paimon.options.CatalogOptions;
26+
import org.apache.paimon.options.Options;
27+
28+
import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
29+
30+
import org.junit.jupiter.api.AfterAll;
31+
import org.junit.jupiter.api.BeforeAll;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.api.Test;
34+
import org.junit.jupiter.api.io.TempDir;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
import org.testcontainers.containers.PostgreSQLContainer;
38+
import org.testcontainers.containers.output.Slf4jLogConsumer;
39+
40+
import java.sql.SQLException;
41+
import java.util.Map;
42+
43+
import static org.assertj.core.api.Assertions.assertThat;
44+
45+
/*
46+
* Licensed to the Apache Software Foundation (ASF) under one
47+
* or more contributor license agreements. See the NOTICE file
48+
* distributed with this work for additional information
49+
* regarding copyright ownership. The ASF licenses this file
50+
* to you under the Apache License, Version 2.0 (the
51+
* "License"); you may not use this file except in compliance
52+
* with the License. You may obtain a copy of the License at
53+
*
54+
* http://www.apache.org/licenses/LICENSE-2.0
55+
*
56+
* Unless required by applicable law or agreed to in writing, software
57+
* distributed under the License is distributed on an "AS IS" BASIS,
58+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
59+
* See the License for the specific language governing permissions and
60+
* limitations under the License.
61+
*/
62+
63+
/** Tests for {@link JdbcCatalog} with Postgres. */
64+
public class PostgresqlCatalogTest {
65+
private static final Logger LOG = LoggerFactory.getLogger(PostgresqlCatalogTest.class);
66+
67+
public static final String DEFAULT_DB = "postgres";
68+
69+
private static final String USER = "paimonuser";
70+
private static final String PASSWORD = "paimonpw";
71+
72+
@TempDir java.nio.file.Path tempFile;
73+
protected String warehouse;
74+
protected FileIO fileIO;
75+
protected Catalog catalog;
76+
77+
protected static final PostgreSQLContainer<?> POSTGRES_CONTAINER =
78+
new PostgreSQLContainer<>("postgres:13-alpine")
79+
.withDatabaseName(DEFAULT_DB)
80+
.withUsername(USER)
81+
.withPassword(PASSWORD)
82+
.withLogConsumer(new Slf4jLogConsumer(LOG));
83+
84+
@BeforeAll
85+
protected static void start() {
86+
LOG.info("Starting containers...");
87+
POSTGRES_CONTAINER.start();
88+
LOG.info("Containers are started.");
89+
}
90+
91+
@AfterAll
92+
public static void stopContainers() {
93+
LOG.info("Stopping containers...");
94+
POSTGRES_CONTAINER.stop();
95+
LOG.info("Containers are stopped.");
96+
}
97+
98+
@BeforeEach
99+
public void setUp() throws Exception {
100+
warehouse = tempFile.toUri().toString();
101+
Options catalogOptions = new Options();
102+
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
103+
CatalogContext catalogContext = CatalogContext.create(catalogOptions);
104+
fileIO = FileIO.get(new Path(warehouse), catalogContext);
105+
catalog = initCatalog(Maps.newHashMap());
106+
}
107+
108+
private JdbcCatalog initCatalog(Map<String, String> props) {
109+
LOG.info("Init catalog {}", POSTGRES_CONTAINER.getJdbcUrl());
110+
111+
Map<String, String> properties = Maps.newHashMap();
112+
properties.put(CatalogOptions.URI.key(), POSTGRES_CONTAINER.getJdbcUrl());
113+
114+
properties.put(JdbcCatalog.PROPERTY_PREFIX + "user", USER);
115+
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", PASSWORD);
116+
properties.put(CatalogOptions.WAREHOUSE.key(), warehouse);
117+
properties.put(CatalogOptions.LOCK_ENABLED.key(), "true");
118+
properties.put(CatalogOptions.LOCK_TYPE.key(), "jdbc");
119+
properties.putAll(props);
120+
JdbcCatalog catalog =
121+
new JdbcCatalog(
122+
fileIO,
123+
"test-jdbc-postgres-catalog",
124+
Options.fromMap(properties),
125+
warehouse);
126+
assertThat(catalog.warehouse()).isEqualTo(warehouse);
127+
return catalog;
128+
}
129+
130+
@Test
131+
public void testAcquireLockFail() throws SQLException, InterruptedException {
132+
String lockId = "jdbc.testDb.testTable";
133+
assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000))
134+
.isTrue();
135+
assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 3000))
136+
.isFalse();
137+
JdbcUtils.release(((JdbcCatalog) catalog).getConnections(), lockId);
138+
}
139+
140+
@Test
141+
public void testCleanTimeoutLockAndAcquireLock() throws SQLException, InterruptedException {
142+
String lockId = "jdbc.testDb.testTable";
143+
assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000))
144+
.isTrue();
145+
Thread.sleep(2000);
146+
assertThat(JdbcUtils.acquire(((JdbcCatalog) catalog).getConnections(), lockId, 1000))
147+
.isTrue();
148+
JdbcUtils.release(((JdbcCatalog) catalog).getConnections(), lockId);
149+
}
150+
}

0 commit comments

Comments
 (0)