Skip to content

Commit d5a76a2

Browse files
ARTEMIS-5829 Cleanup Postgres pg_largeobjects
When paging or large message is used, pg_largeobjects should be cleared. this is a feature from Postgres that the FileDriver is using on postgresql
1 parent 94b0da9 commit d5a76a2

File tree

7 files changed

+131
-9
lines changed

7 files changed

+131
-9
lines changed

artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/JDBCSequentialFile.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,13 @@ public class JDBCSequentialFile implements SequentialFile {
5858

5959
private AtomicBoolean isLoaded = new AtomicBoolean(false);
6060

61+
private volatile boolean deleted = false;
62+
6163
private long id = -1;
6264

65+
// used in postgres
66+
private long oid = -1;
67+
6368
private long readPosition = 0;
6469

6570
private long writePosition = 0;
@@ -172,9 +177,13 @@ public ByteBuffer map(int position, long size) throws IOException {
172177
public void delete() throws IOException, InterruptedException, ActiveMQException {
173178
try {
174179
synchronized (this) {
180+
if (deleted) {
181+
return;
182+
}
175183
if (load()) {
176184
dbDriver.deleteFile(this);
177185
}
186+
deleted = true;
178187
}
179188
} catch (SQLException e) {
180189
// file is already gone from a drop somewhere
@@ -473,6 +482,14 @@ public long getId() {
473482
return id;
474483
}
475484

485+
public long getOid() {
486+
return oid;
487+
}
488+
489+
public void setOid(long oid) {
490+
this.oid = oid;
491+
}
492+
476493
public void setId(long id) {
477494
this.id = id;
478495
}

artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresLargeObjectManager.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.postgresql.PGConnection;
2525
import org.postgresql.largeobject.LargeObject;
26+
import org.postgresql.largeobject.LargeObjectManager;
2627

2728
/**
2829
* Helper class for when the postresql driver is not directly availalbe.
@@ -70,6 +71,23 @@ public final Long createLO(Connection connection) throws SQLException {
7071
}
7172
}
7273

74+
public final void deleteLO(Connection connection, long oid) throws SQLException {
75+
Object largeObjectManager = getLargeObjectManager(connection);
76+
if (shouldUseReflection) {
77+
try {
78+
Method method = largeObjectManager.getClass().getMethod("delete", long.class);
79+
method.invoke(largeObjectManager, oid);
80+
} catch (Exception ex) {
81+
throw new SQLException("Couldn't access org.postgresql.largeobject.LargeObjectManager", ex);
82+
}
83+
} else {
84+
if (largeObjectManager != null) {
85+
((LargeObjectManager) largeObjectManager).delete(oid);
86+
}
87+
}
88+
89+
}
90+
7391
public Object open(Connection connection, long oid, int mode) throws SQLException {
7492
if (shouldUseReflection) {
7593
Object largeObjectManager = getLargeObjectManager(connection);

artemis-jdbc-store/src/main/java/org/apache/activemq/artemis/jdbc/store/file/PostgresSequentialSequentialFileDriver.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.activemq.artemis.jdbc.store.file;
1818

19+
import java.lang.invoke.MethodHandles;
1920
import java.nio.ByteBuffer;
2021
import java.sql.Connection;
2122
import java.sql.PreparedStatement;
@@ -25,10 +26,14 @@
2526

2627
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCConnectionProvider;
2728
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
@SuppressWarnings("SynchronizeOnNonFinalField")
3033
public final class PostgresSequentialSequentialFileDriver extends JDBCSequentialFileFactoryDriver {
3134

35+
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
36+
3237
private static final String POSTGRES_OID_KEY = "POSTGRES_OID_KEY";
3338
private PostgresLargeObjectManager largeObjectManager;
3439

@@ -62,6 +67,11 @@ public void createFile(JDBCSequentialFile file) throws SQLException {
6267
try (PreparedStatement createFile = connection.prepareStatement(this.createFile, this.createFileAutogeneratedKeys)) {
6368
connection.setAutoCommit(false);
6469
Long oid = largeObjectManager.createLO(connection);
70+
file.setOid(oid);
71+
72+
if (logger.isDebugEnabled()) {
73+
logger.debug("Creating file {} with oid={}", file.getFileName(), oid);
74+
}
6575

6676
createFile.setString(1, file.getFileName());
6777
createFile.setString(2, file.getExtension());
@@ -89,7 +99,7 @@ public void loadFile(JDBCSequentialFile file) throws SQLException {
8999

90100
try (ResultSet rs = readLargeObject.executeQuery()) {
91101
if (rs.next()) {
92-
file.setWritePosition(getPostGresLargeObjectSize(file));
102+
file.setWritePosition(getPostgresLargeObjectSize(file));
93103
}
94104
connection.commit();
95105
} catch (SQLException e) {
@@ -152,6 +162,9 @@ public int readFromFile(JDBCSequentialFile file, ByteBuffer bytes) throws SQLExc
152162
}
153163

154164
private Long getOID(JDBCSequentialFile file) throws SQLException {
165+
if (file.getOid() != -1L) {
166+
return file.getOid();
167+
}
155168
Long oid = (Long) file.getMetaData(POSTGRES_OID_KEY);
156169
if (oid == null) {
157170
try (Connection connection = connectionProvider.getConnection()) {
@@ -170,13 +183,14 @@ private Long getOID(JDBCSequentialFile file) throws SQLException {
170183
}
171184
}
172185
}
173-
if ((Long) file.getMetaData(POSTGRES_OID_KEY) == 0) {
174-
System.out.println("FD");
186+
Long value = (Long) file.getMetaData(POSTGRES_OID_KEY);
187+
if (value != null) {
188+
file.setOid(value);
175189
}
176-
return (Long) file.getMetaData(POSTGRES_OID_KEY);
190+
return value;
177191
}
178192

179-
private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
193+
private int getPostgresLargeObjectSize(JDBCSequentialFile file) throws SQLException {
180194
int size = 0;
181195
Long oid = getOID(file);
182196
if (oid != null) {
@@ -195,4 +209,22 @@ private int getPostGresLargeObjectSize(JDBCSequentialFile file) throws SQLExcept
195209
}
196210
return size;
197211
}
212+
213+
@Override
214+
public void deleteFile(JDBCSequentialFile file) throws SQLException {
215+
Long oid = getOID(file);
216+
if (logger.isDebugEnabled()) {
217+
logger.debug("Deleting file {} with oid={}", file.getFileName(), oid);
218+
}
219+
if (oid != null) {
220+
try (Connection connection = connectionProvider.getConnection()) {
221+
largeObjectManager.deleteLO(connection, oid);
222+
} catch (Exception e) {
223+
logger.warn(e.getMessage(), e);
224+
}
225+
}
226+
super.deleteFile(file);
227+
}
228+
229+
198230
}

tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/ParameterDBTestBase.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@
2828
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
2929
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
3030
import org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
31+
import org.apache.activemq.artemis.utils.Wait;
3132
import org.junit.jupiter.api.BeforeEach;
3233
import org.junit.jupiter.api.extension.ExtendWith;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

37+
3638
@ExtendWith(ParameterizedTestExtension.class)
3739
public abstract class ParameterDBTestBase extends DBTestBase {
3840
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -47,6 +49,27 @@ protected final String getTestJDBCConnectionUrl() {
4749
return database.getJdbcURI();
4850
}
4951

52+
int postgres_loCount;
53+
54+
protected int countPostgresLargeObjects() {
55+
try (java.sql.Connection jdbcConnection = database.getConnection()) {
56+
ResultSet resultSet = jdbcConnection.createStatement().executeQuery("select count(*) from pg_largeobject");
57+
resultSet.next();
58+
return resultSet.getInt(1);
59+
} catch (Exception e) {
60+
// not throwing a SQLException as this method is used on Supplier<String>
61+
// however if an exception still happened, the test should receive an error
62+
throw new RuntimeException(e.getMessage(), e);
63+
}
64+
}
65+
public void prepareCheckPostgres() throws Exception {
66+
postgres_loCount = countPostgresLargeObjects();
67+
}
68+
69+
public void checkPostgres() throws Exception {
70+
Wait.assertTrue(() -> "There are still " + countPostgresLargeObjects() + " pg_largeObject records", () -> countPostgresLargeObjects() - postgres_loCount < 10, 60_000, 100);
71+
}
72+
5073

5174
@Parameter(index = 0)
5275
public Database database;
@@ -99,6 +122,15 @@ public Connection getConnection() throws Exception {
99122
}
100123

101124
public int dropDatabase() {
125+
if (database == Database.POSTGRES) {
126+
try (Connection connection = getConnection()) {
127+
logger.info("Removing large objects on postgres");
128+
connection.createStatement().execute("DELETE FROM pg_largeobject");
129+
} catch (Exception e) {
130+
logger.warn(e.getMessage(), e);
131+
}
132+
}
133+
102134
return switch (database) {
103135
case JOURNAL -> 0;
104136
case DERBY -> {

tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/largeMessages/RealServerDatabaseLargeMessageTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public void testLargeMessage() throws Exception {
8989
}
9090

9191
public void testLargeMessage(String protocol) throws Exception {
92+
if (database == Database.POSTGRES) {
93+
prepareCheckPostgres();
94+
}
9295
logger.info("testLargeMessage({})", protocol);
9396
final String queueName = "QUEUE_" + RandomUtil.randomUUIDString() + "_" + protocol + "_" + database;
9497

@@ -157,6 +160,10 @@ public void testLargeMessage(String protocol) throws Exception {
157160
assertTrue(done.await(120, TimeUnit.SECONDS));
158161
assertEquals(0, errors.get());
159162

163+
if (database == Database.POSTGRES) {
164+
checkPostgres();
165+
}
166+
160167
}
161168

162169
}

tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,9 @@ public static Collection<Object[]> parameters() {
170170
List<Object[]> databases = new ArrayList<>();
171171
databases.add(new Object[] {Database.JOURNAL, true});
172172
databases.add(new Object[] {Database.JOURNAL, false});
173-
Database database = Database.randomDB();
174-
if (database != null) {
175-
databases.add(new Object[]{database, false});
173+
List<Database> selectedList = Database.selectedList();
174+
if (selectedList != null && !selectedList.isEmpty()) {
175+
selectedList.forEach(d -> databases.add(new Object[]{d, false}));
176176
}
177177
return databases;
178178
}

tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/RealServerDatabasePagingTest.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,13 @@ public class RealServerDatabasePagingTest extends ParameterDBTestBase {
5454

5555
private static final String TEST_NAME = "PGDB";
5656

57-
private static final int MAX_MESSAGES = Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "200"));
57+
private static final int MAX_MESSAGES = Integer.parseInt(testProperty(TEST_NAME, "MAX_MESSAGES", "1000"));
58+
private static final int MAX_LARGE_MESSAGES = Integer.parseInt(testProperty(TEST_NAME, "MAX_LARGE_MESSAGES", "200"));
5859

5960
private static final int SOAK_MAX_MESSAGES = Integer.parseInt(testProperty(TEST_NAME, "SOAK_MAX_MESSAGES", "100000"));
6061

6162
private static final int MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "MESSAGE_SIZE", "1000"));
63+
private static final int LARGE_MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "LARGE_MESSAGE_SIZE", "500000"));
6264
private static final int SOAK_MESSAGE_SIZE = Integer.parseInt(testProperty(TEST_NAME, "SOAK_MESSAGE_SIZE", "1000"));
6365

6466
private static final int COMMIT_INTERVAL = Integer.parseInt(testProperty(TEST_NAME, "COMMIT_INTERVAL", "1000"));
@@ -85,6 +87,12 @@ public void testPaging() throws Exception {
8587
}
8688

8789

90+
@TestTemplate
91+
public void testPagingWithLargeMessages() throws Exception {
92+
testPaging("CORE", MAX_LARGE_MESSAGES, LARGE_MESSAGE_SIZE);
93+
testPaging("AMQP", MAX_LARGE_MESSAGES, LARGE_MESSAGE_SIZE);
94+
}
95+
8896
@TestTemplate
8997
public void testSoakPaging() throws Exception {
9098
testPaging("AMQP", SOAK_MAX_MESSAGES, SOAK_MESSAGE_SIZE);
@@ -93,6 +101,10 @@ public void testSoakPaging() throws Exception {
93101
private void testPaging(String protocol, int messages, int messageSize) throws Exception {
94102
logger.info("performing paging test on protocol={} and db={}", protocol, database);
95103

104+
if (database == Database.POSTGRES) {
105+
prepareCheckPostgres();
106+
}
107+
96108
final String queueName = "QUEUE_" + RandomUtil.randomUUIDString() + "_" + protocol + "_" + database;
97109

98110
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
@@ -144,6 +156,10 @@ private void testPaging(String protocol, int messages, int messageSize) throws E
144156
assertNull(consumer.receiveNoWait());
145157
}
146158

159+
if (database == Database.POSTGRES) {
160+
checkPostgres();
161+
}
162+
147163

148164
}
149165

0 commit comments

Comments
 (0)