Skip to content

Commit 62e767f

Browse files
committed
async binaries lo_unlink
* Added new binaries_lo_unlink_queue table to temporary store large object OIDs. * ExecutorService in BinaryDaoJdbc is used to asynchronously delete rows from binaries_lo_unlink_queue table. Table uses before delete trigger to call lo_unlink.
1 parent 232a148 commit 62e767f

File tree

9 files changed

+124
-11
lines changed

9 files changed

+124
-11
lines changed

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/dao/BinaryDao.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,8 @@ public interface BinaryDao extends ResourceDao<Binary>
1515

1616
Optional<Binary> readVersion(UUID uuid, long version, RangeRequest rangeRequest)
1717
throws SQLException, ResourceDeletedException;
18+
19+
void startLargeObjectUnlink();
20+
21+
void stopLargeObjectUnlinker();
1822
}

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/dao/jdbc/BinaryDaoJdbc.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import java.util.List;
1010
import java.util.Optional;
1111
import java.util.UUID;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.TimeUnit;
1215

1316
import javax.sql.DataSource;
1417

@@ -34,6 +37,8 @@ public class BinaryDaoJdbc extends AbstractResourceDaoJdbc<Binary> implements Bi
3437

3538
private final String selectUpdateUser;
3639

40+
private final ExecutorService loUnlinker;
41+
3742
public BinaryDaoJdbc(DataSource dataSource, DataSource permanentDeleteDataSource, FhirContext fhirContext,
3843
String selectUpdateUser)
3944
{
@@ -44,6 +49,16 @@ public BinaryDaoJdbc(DataSource dataSource, DataSource permanentDeleteDataSource
4449
List.of());
4550

4651
this.selectUpdateUser = selectUpdateUser;
52+
53+
loUnlinker = Executors.newFixedThreadPool(1, r -> new Thread(r, "binaries-large-object-unlinker"));
54+
}
55+
56+
@Override
57+
public void afterPropertiesSet() throws Exception
58+
{
59+
super.afterPropertiesSet();
60+
61+
startLargeObjectUnlink();
4762
}
4863

4964
@Override
@@ -199,4 +214,49 @@ public Optional<Binary> readVersion(UUID uuid, long version, RangeRequest rangeR
199214

200215
return binary;
201216
}
217+
218+
@Override
219+
public void startLargeObjectUnlink()
220+
{
221+
loUnlinker.submit(this::doLargeObjectUnlink);
222+
}
223+
224+
private void doLargeObjectUnlink()
225+
{
226+
logger.debug("Deleting entries from binaries_lo_unlink_queue");
227+
228+
try (Connection connection = getPermanentDeleteDataSource().getConnection();
229+
PreparedStatement statement = connection.prepareStatement("DELETE FROM binaries_lo_unlink_queue"))
230+
{
231+
statement.execute();
232+
}
233+
catch (SQLException e)
234+
{
235+
logger.debug("Unable to delete entries from binaries_lo_unlink_queue table", e);
236+
logger.error("Unable to delete entries from binaries_lo_unlink_queue table: {} - {}", e.getClass().getName(),
237+
e.getMessage());
238+
}
239+
}
240+
241+
@Override
242+
public void stopLargeObjectUnlinker()
243+
{
244+
startLargeObjectUnlink();
245+
246+
logger.debug("Shutting down binaries-large-object-unlinker executor ...");
247+
248+
loUnlinker.shutdown();
249+
250+
try
251+
{
252+
if (!loUnlinker.awaitTermination(60, TimeUnit.SECONDS))
253+
{
254+
loUnlinker.shutdownNow();
255+
}
256+
}
257+
catch (InterruptedException ex)
258+
{
259+
loUnlinker.shutdownNow();
260+
}
261+
}
202262
}

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/spring/config/DaoConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public ActivityDefinitionDao activityDefinitionDao()
121121
return new ActivityDefinitionDaoJdbc(dataSource(), permanentDeleteDataSource(), fhirConfig.fhirContext());
122122
}
123123

124-
@Bean
124+
@Bean(destroyMethod = "stopLargeObjectUnlinker")
125125
public BinaryDao binaryDao()
126126
{
127127
return new BinaryDaoJdbc(dataSource(), permanentDeleteDataSource(), fhirConfig.fhirContext(),

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/webservice/impl/BinaryServiceImpl.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,19 @@ protected MediaType getMediaTypeForRead(UriInfo uri, HttpHeaders headers)
167167
return parameterConverter.getMediaTypeIfSupported(uri, headers).orElseGet(() -> getMediaType(headers));
168168
}
169169

170-
// @Override
171-
// protected MediaType getMediaTypeForRead(UriInfo uri, HttpHeaders headers)
172-
// {
173-
// if (uri.getQueryParameters().containsKey(Constants.PARAM_FORMAT))
174-
// return super.getMediaTypeForRead(uri, headers);
175-
// else
176-
// return getMediaType(headers);
177-
// }
178-
//
179170
private MediaType getMediaType(HttpHeaders headers)
180171
{
181172
List<MediaType> types = headers.getAcceptableMediaTypes();
182173
return types == null ? null : types.get(0);
183174
}
175+
176+
@Override
177+
public Response deletePermanently(String deletePath, String id, UriInfo uri, HttpHeaders headers)
178+
{
179+
Response response = super.deletePermanently(deletePath, id, uri, headers);
180+
181+
dao.startLargeObjectUnlink();
182+
183+
return response;
184+
}
184185
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
4+
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.6.xsd">
5+
6+
<changeSet author="hhund" id="db.binaries_lo_unlink_queue.changelog-2.0.0">
7+
<createTable tableName="binaries_lo_unlink_queue">
8+
<column name="binary_oid" type="oid">
9+
<constraints nullable="false" />
10+
</column>
11+
<column name="queued_at" type="TIMESTAMP" defaultValue="now()" />
12+
</createTable>
13+
14+
<addPrimaryKey tableName="binaries_lo_unlink_queue" columnNames="binary_oid" />
15+
16+
<sql dbms="postgresql">
17+
ALTER TABLE binaries_lo_unlink_queue OWNER TO ${db.liquibase_user};
18+
GRANT ALL ON TABLE binaries_lo_unlink_queue TO ${db.liquibase_user};
19+
GRANT SELECT, INSERT, DELETE ON TABLE binaries_lo_unlink_queue TO ${db.server_permanent_delete_users_group};
20+
CREATE INDEX binary_oid_index ON binaries_lo_unlink_queue USING btree (binary_oid);
21+
</sql>
22+
</changeSet>
23+
24+
<changeSet author="hhund" id="db.binaries_lo_unlink_queue.changelog-2.0.0.on_binaries_lo_unlink_queue_delete_function" runOnChange="true">
25+
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="trigger_functions/on_binaries_lo_unlink_queue_delete.sql" splitStatements="false" />
26+
</changeSet>
27+
28+
<changeSet author="hhund" id="db.binaries_lo_unlink_queue.changelog-2.0.0.on_delete_trigger">
29+
<sql dbms="postgresql">
30+
CREATE TRIGGER binaries_lo_unlink_queue_delete BEFORE DELETE ON binaries_lo_unlink_queue FOR EACH ROW EXECUTE PROCEDURE on_binaries_lo_unlink_queue_delete();
31+
</sql>
32+
</changeSet>
33+
</databaseChangeLog>

dsf-fhir/dsf-fhir-server/src/main/resources/fhir/db/db.changelog.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@
4444
<include file="fhir/db/db.delete_duplicate_resources.changelog-1.6.1.xml" />
4545

4646
<include file="fhir/db/db.binaries.changelog-2.0.0.xml" />
47+
<include file="fhir/db/db.binaries_lo_unlink_queue.changelog-2.0.0.xml" />
4748
</databaseChangeLog>
Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
CREATE OR REPLACE FUNCTION on_binaries_delete() RETURNS TRIGGER AS $$
22
BEGIN
33
PERFORM on_resources_delete(OLD.binary_id);
4-
PERFORM lo_unlink(OLD.binary_oid);
4+
5+
IF (OLD.binary_oid IS NOT NULL) THEN
6+
INSERT INTO binaries_lo_unlink_queue (binary_oid) VALUES (OLD.binary_oid) ON CONFLICT DO NOTHING;
7+
END IF;
8+
59
RETURN OLD;
610
END;
711
$$ LANGUAGE PLPGSQL
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE OR REPLACE FUNCTION on_binaries_lo_unlink_queue_delete() RETURNS TRIGGER AS $$
2+
BEGIN
3+
PERFORM lo_unlink(OLD.binary_oid);
4+
RETURN OLD;
5+
END;
6+
$$ LANGUAGE PLPGSQL

dsf-fhir/dsf-fhir-server/src/test/java/dev/dsf/fhir/integration/BinaryIntegrationTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.io.IOException;
1313
import java.io.InputStream;
1414
import java.nio.charset.StandardCharsets;
15+
import java.time.Duration;
1516
import java.util.Arrays;
1617
import java.util.Date;
1718
import java.util.List;
@@ -2991,6 +2992,9 @@ public void testCreateReadDelete4GiB() throws Exception
29912992
long t1pd = System.currentTimeMillis();
29922993

29932994
logger.info("Permanent delete call finished in {} ms", (t1pd - t0pd));
2995+
2996+
// wait for binaries_lo_unlink_queue delete to finish
2997+
Thread.sleep(Duration.ofSeconds(2));
29942998
}
29952999

29963000
@Test

0 commit comments

Comments
 (0)