Skip to content

Commit d5583a0

Browse files
committed
Merge remote-tracking branch
'origin/issue/296_Optimize_Binary_Resource_Handling' into develop_2
2 parents a9954f6 + 116ec70 commit d5583a0

File tree

13 files changed

+247
-31
lines changed

13 files changed

+247
-31
lines changed

dsf-bpe/dsf-bpe-server/src/main/java/dev/dsf/bpe/subscription/ConcurrentSubscriptionHandlerFactory.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package dev.dsf.bpe.subscription;
22

33
import java.util.Objects;
4-
import java.util.concurrent.BlockingQueue;
54
import java.util.concurrent.LinkedBlockingQueue;
65
import java.util.concurrent.ThreadPoolExecutor;
76
import java.util.concurrent.TimeUnit;
@@ -19,8 +18,6 @@ public class ConcurrentSubscriptionHandlerFactory<R extends Resource>
1918
private static final Logger logger = LoggerFactory.getLogger(ConcurrentSubscriptionHandlerFactory.class);
2019

2120
private final SubscriptionHandlerFactory<R> delegate;
22-
23-
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
2421
private final ThreadPoolExecutor executor;
2522

2623
/**
@@ -34,11 +31,11 @@ public ConcurrentSubscriptionHandlerFactory(int corePoolSize, SubscriptionHandle
3431
if (corePoolSize <= 0)
3532
throw new IllegalArgumentException("corePoolSize <= 0");
3633

37-
executor = new ThreadPoolExecutor(corePoolSize, corePoolSize, 30, TimeUnit.MINUTES, queue,
34+
this.delegate = delegate;
35+
36+
executor = new ThreadPoolExecutor(corePoolSize, corePoolSize, 30, TimeUnit.MINUTES, new LinkedBlockingQueue<>(),
3837
(r, executor) -> logger.error("Unable to handle Task - execution rejected"));
3938
executor.allowCoreThreadTimeOut(true);
40-
41-
this.delegate = delegate;
4239
}
4340

4441
@Override

dsf-fhir/dsf-fhir-server/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,11 @@
184184
<version>${crypto-utils.version}</version>
185185
</dependency>
186186

187+
<dependency>
188+
<groupId>commons-io</groupId>
189+
<artifactId>commons-io</artifactId>
190+
</dependency>
191+
187192
<dependency>
188193
<groupId>dev.dsf</groupId>
189194
<artifactId>dsf-common-jetty</artifactId>

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/dao/BinaryDao.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,8 @@ public interface BinaryDao extends ResourceDao<Binary>
1515

1616
Optional<Binary> readVersion(UUID uuid, long version, RangeRequest rangeRequest)
1717
throws SQLException, ResourceDeletedException;
18+
19+
void executeLargeObjectUnlink();
20+
21+
void stopLargeObjectUnlinker();
1822
}

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/dao/jdbc/BinaryDaoJdbc.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
import java.util.List;
1010
import java.util.Optional;
1111
import java.util.UUID;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
import java.util.concurrent.TimeUnit;
1215

1316
import javax.sql.DataSource;
1417

@@ -34,6 +37,8 @@ public class BinaryDaoJdbc extends AbstractResourceDaoJdbc<Binary> implements Bi
3437

3538
private final String selectUpdateUser;
3639

40+
private final ExecutorService loUnlinker;
41+
3742
public BinaryDaoJdbc(DataSource dataSource, DataSource permanentDeleteDataSource, FhirContext fhirContext,
3843
String selectUpdateUser)
3944
{
@@ -44,6 +49,16 @@ public BinaryDaoJdbc(DataSource dataSource, DataSource permanentDeleteDataSource
4449
List.of());
4550

4651
this.selectUpdateUser = selectUpdateUser;
52+
53+
loUnlinker = Executors.newFixedThreadPool(1, r -> new Thread(r, "binaries-large-object-unlinker"));
54+
}
55+
56+
@Override
57+
public void afterPropertiesSet() throws Exception
58+
{
59+
super.afterPropertiesSet();
60+
61+
executeLargeObjectUnlink();
4762
}
4863

4964
@Override
@@ -199,4 +214,49 @@ public Optional<Binary> readVersion(UUID uuid, long version, RangeRequest rangeR
199214

200215
return binary;
201216
}
217+
218+
@Override
219+
public void executeLargeObjectUnlink()
220+
{
221+
loUnlinker.submit(this::doExecuteLargeObjectUnlink);
222+
}
223+
224+
private void doExecuteLargeObjectUnlink()
225+
{
226+
logger.debug("Deleting entries from binaries_lo_unlink_queue");
227+
228+
try (Connection connection = getPermanentDeleteDataSource().getConnection();
229+
PreparedStatement statement = connection.prepareStatement("DELETE FROM binaries_lo_unlink_queue"))
230+
{
231+
statement.execute();
232+
}
233+
catch (SQLException e)
234+
{
235+
logger.debug("Unable to delete entries from binaries_lo_unlink_queue table", e);
236+
logger.error("Unable to delete entries from binaries_lo_unlink_queue table: {} - {}",
237+
e.getClass().getName(), e.getMessage());
238+
}
239+
}
240+
241+
@Override
242+
public void stopLargeObjectUnlinker()
243+
{
244+
executeLargeObjectUnlink();
245+
246+
logger.debug("Shutting down binaries-large-object-unlinker executor ...");
247+
248+
loUnlinker.shutdown();
249+
250+
try
251+
{
252+
if (!loUnlinker.awaitTermination(60, TimeUnit.SECONDS))
253+
{
254+
loUnlinker.shutdownNow();
255+
}
256+
}
257+
catch (InterruptedException ex)
258+
{
259+
loUnlinker.shutdownNow();
260+
}
261+
}
202262
}

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/spring/config/DaoConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public ActivityDefinitionDao activityDefinitionDao()
121121
return new ActivityDefinitionDaoJdbc(dataSource(), permanentDeleteDataSource(), fhirConfig.fhirContext());
122122
}
123123

124-
@Bean
124+
@Bean(destroyMethod = "stopLargeObjectUnlinker")
125125
public BinaryDao binaryDao()
126126
{
127127
return new BinaryDaoJdbc(dataSource(), permanentDeleteDataSource(), fhirConfig.fhirContext(),

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/spring/config/WebserviceConfig.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ public class WebserviceConfig
158158
@Autowired
159159
private HistoryConfig historyConfig;
160160

161+
@Autowired
162+
private AdapterConfig adapterConfig;
163+
161164
@Bean
162165
public BrowserPolicyHeaderResponseFilter browserPolicyHeaderResponseFilter()
163166
{
@@ -202,7 +205,8 @@ private ActivityDefinitionServiceImpl activityDefinitionServiceImpl()
202205
@Bean
203206
public BinaryService binaryService()
204207
{
205-
return new BinaryServiceJaxrs(binaryServiceSecure(), helperConfig.parameterConverter());
208+
return new BinaryServiceJaxrs(binaryServiceSecure(), helperConfig.parameterConverter(),
209+
adapterConfig.fhirAdapter());
206210
}
207211

208212
private BinaryServiceSecure binaryServiceSecure()

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/webservice/impl/BinaryServiceImpl.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,19 @@ protected MediaType getMediaTypeForRead(UriInfo uri, HttpHeaders headers)
167167
return parameterConverter.getMediaTypeIfSupported(uri, headers).orElseGet(() -> getMediaType(headers));
168168
}
169169

170-
// @Override
171-
// protected MediaType getMediaTypeForRead(UriInfo uri, HttpHeaders headers)
172-
// {
173-
// if (uri.getQueryParameters().containsKey(Constants.PARAM_FORMAT))
174-
// return super.getMediaTypeForRead(uri, headers);
175-
// else
176-
// return getMediaType(headers);
177-
// }
178-
//
179170
private MediaType getMediaType(HttpHeaders headers)
180171
{
181172
List<MediaType> types = headers.getAcceptableMediaTypes();
182173
return types == null ? null : types.get(0);
183174
}
175+
176+
@Override
177+
public Response deletePermanently(String deletePath, String id, UriInfo uri, HttpHeaders headers)
178+
{
179+
Response response = super.deletePermanently(deletePath, id, uri, headers);
180+
181+
dao.executeLargeObjectUnlink();
182+
183+
return response;
184+
}
184185
}

dsf-fhir/dsf-fhir-server/src/main/java/dev/dsf/fhir/webservice/jaxrs/BinaryServiceJaxrs.java

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@
77
import java.util.Arrays;
88
import java.util.List;
99
import java.util.Objects;
10+
import java.util.Optional;
1011

12+
import org.apache.commons.io.output.CountingOutputStream;
13+
import org.apache.commons.io.output.NullOutputStream;
14+
import org.hl7.fhir.r4.model.Base64BinaryType;
1115
import org.hl7.fhir.r4.model.Binary;
1216
import org.hl7.fhir.r4.model.Reference;
1317
import org.slf4j.Logger;
1418
import org.slf4j.LoggerFactory;
1519

1620
import ca.uhn.fhir.rest.api.Constants;
1721
import dev.dsf.fhir.adapter.DeferredBase64BinaryType;
22+
import dev.dsf.fhir.adapter.FhirAdapter;
1823
import dev.dsf.fhir.help.ParameterConverter;
1924
import dev.dsf.fhir.help.ResponseGenerator;
2025
import dev.dsf.fhir.model.StreamableBase64BinaryType;
@@ -72,12 +77,14 @@ public void write(OutputStream output) throws IOException, WebApplicationExcepti
7277
}
7378

7479
private final ParameterConverter parameterConverter;
80+
private final FhirAdapter fhirAdapter;
7581

76-
public BinaryServiceJaxrs(BinaryService delegate, ParameterConverter parameterConverter)
82+
public BinaryServiceJaxrs(BinaryService delegate, ParameterConverter parameterConverter, FhirAdapter fhirAdapter)
7783
{
7884
super(delegate);
7985

8086
this.parameterConverter = parameterConverter;
87+
this.fhirAdapter = fhirAdapter;
8188
}
8289

8390
@Override
@@ -86,6 +93,7 @@ public void afterPropertiesSet() throws Exception
8693
super.afterPropertiesSet();
8794

8895
Objects.requireNonNull(parameterConverter, "parameterConverter");
96+
Objects.requireNonNull(fhirAdapter, "fhirAdapter");
8997
}
9098

9199
@POST
@@ -231,7 +239,9 @@ public Response vreadHead(@PathParam("id") String id, @PathParam("version") long
231239

232240
private Response configureReadResponse(UriInfo uri, HttpHeaders headers, boolean head, Response read)
233241
{
234-
if (read.getEntity() instanceof Binary binary && !isValidFhirRequest(uri, headers))
242+
Optional<MediaType> fhirMediaType = getValidFhirMediaType(uri, headers);
243+
244+
if (read.getEntity() instanceof Binary binary && fhirMediaType.isEmpty())
235245
{
236246
if (mediaTypeMatches(headers, binary))
237247
{
@@ -276,25 +286,69 @@ private Response configureReadResponse(UriInfo uri, HttpHeaders headers, boolean
276286
else
277287
return Response.status(Status.NOT_ACCEPTABLE).build();
278288
}
289+
else if (read.getEntity() instanceof Binary binary && fhirMediaType.isPresent() && head)
290+
{
291+
ResponseBuilder b = Response.status(Status.OK);
292+
b.type(fhirMediaType.get());
293+
294+
if (binary.getMeta() != null && binary.getMeta().getLastUpdated() != null
295+
&& binary.getMeta().getVersionId() != null)
296+
{
297+
b.lastModified(binary.getMeta().getLastUpdated());
298+
b.tag(new EntityTag(binary.getMeta().getVersionId(), true));
299+
}
300+
301+
b.cacheControl(ResponseGenerator.PRIVATE_NO_CACHE_NO_TRANSFORM);
302+
303+
b.header(HttpHeaders.CONTENT_LENGTH, calculateFhirResponseSize(binary, fhirMediaType.get()));
304+
305+
return b.build();
306+
}
279307
else
280308
return read;
281309
}
282310

283-
private boolean isValidFhirRequest(UriInfo uri, HttpHeaders headers)
311+
private long calculateFhirResponseSize(Binary binary, MediaType mediaType)
312+
{
313+
long dataSize = (long) binary.getUserData(RangeRequest.USER_DATA_VALUE_DATA_SIZE);
314+
315+
// setting single byte to make sure data element is part of xml/json
316+
binary.setDataElement(new Base64BinaryType(new byte[1]));
317+
318+
try (CountingOutputStream out = new CountingOutputStream(NullOutputStream.INSTANCE))
319+
{
320+
fhirAdapter.writeTo(binary, Binary.class, null, null, mediaType, null, out);
321+
322+
// minus 4 to account for single byte in data element
323+
return out.getByteCount() - 4 + calculateBase64EncodedLength(dataSize);
324+
}
325+
catch (IOException e)
326+
{
327+
throw new RuntimeException(e);
328+
}
329+
}
330+
331+
private long calculateBase64EncodedLength(long dataSize)
332+
{
333+
return dataSize < 0 ? 0 : 4 * ((dataSize + 2) / 3);
334+
}
335+
336+
private Optional<MediaType> getValidFhirMediaType(UriInfo uri, HttpHeaders headers)
284337
{
285338
// _format parameter override present and valid
286339
if (uri.getQueryParameters().containsKey(Constants.PARAM_FORMAT))
287340
{
288-
parameterConverter.getMediaTypeThrowIfNotSupported(uri, headers);
289-
return true;
341+
MediaType mediaType = parameterConverter.getMediaTypeThrowIfNotSupported(uri, headers);
342+
return Optional.of(mediaType);
290343
}
291344
else
292345
{
293346
List<MediaType> types = headers.getAcceptableMediaTypes();
294347
MediaType accept = types == null ? null : types.get(0);
295348

296349
// accept header is FHIR mime-type
297-
return Arrays.stream(FHIR_MEDIA_TYPES).anyMatch(f -> f.equals(accept.toString()));
350+
return Arrays.stream(FHIR_MEDIA_TYPES).filter(f -> f.equals(accept.toString())).findFirst()
351+
.map(s -> accept);
298352
}
299353
}
300354

@@ -308,13 +362,13 @@ private boolean mediaTypeMatches(HttpHeaders headers, Binary binary)
308362
private ResponseBuilder toStreamResponse(Binary binary)
309363
{
310364
ResponseBuilder b = Response.status(Status.OK);
311-
b = b.type(binary.getContentType() != null ? binary.getContentType() : MediaType.APPLICATION_OCTET_STREAM);
365+
b.type(binary.getContentType() != null ? binary.getContentType() : MediaType.APPLICATION_OCTET_STREAM);
312366

313367
if (binary.getMeta() != null && binary.getMeta().getLastUpdated() != null
314368
&& binary.getMeta().getVersionId() != null)
315369
{
316-
b = b.lastModified(binary.getMeta().getLastUpdated());
317-
b = b.tag(new EntityTag(binary.getMeta().getVersionId(), true));
370+
b.lastModified(binary.getMeta().getLastUpdated());
371+
b.tag(new EntityTag(binary.getMeta().getVersionId(), true));
318372
}
319373

320374
if (binary.hasSecurityContext() && binary.getSecurityContext().hasReference())
@@ -323,9 +377,9 @@ private ResponseBuilder toStreamResponse(Binary binary)
323377
b.header(Constants.HEADER_X_SECURITY_CONTEXT, binary.getSecurityContext().getReference());
324378
}
325379

326-
b = b.cacheControl(ResponseGenerator.PRIVATE_NO_CACHE_NO_TRANSFORM);
327-
b = b.header(RangeRequest.ACCEPT_RANGES_HEADER, RangeRequest.ACCEPT_RANGES_HEADER_VALUE);
328-
b = b.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + toFileName(binary));
380+
b.cacheControl(ResponseGenerator.PRIVATE_NO_CACHE_NO_TRANSFORM);
381+
b.header(RangeRequest.ACCEPT_RANGES_HEADER, RangeRequest.ACCEPT_RANGES_HEADER_VALUE);
382+
b.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=" + toFileName(binary));
329383

330384
return b;
331385
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog
4+
http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.6.xsd">
5+
6+
<changeSet author="hhund" id="db.binaries_lo_unlink_queue.changelog-2.0.0">
7+
<createTable tableName="binaries_lo_unlink_queue">
8+
<column name="binary_oid" type="oid">
9+
<constraints nullable="false" />
10+
</column>
11+
<column name="queued_at" type="TIMESTAMP" defaultValue="now()" />
12+
</createTable>
13+
14+
<addPrimaryKey tableName="binaries_lo_unlink_queue" columnNames="binary_oid" />
15+
16+
<sql dbms="postgresql">
17+
ALTER TABLE binaries_lo_unlink_queue OWNER TO ${db.liquibase_user};
18+
GRANT ALL ON TABLE binaries_lo_unlink_queue TO ${db.liquibase_user};
19+
GRANT SELECT, INSERT, DELETE ON TABLE binaries_lo_unlink_queue TO ${db.server_permanent_delete_users_group};
20+
CREATE INDEX binary_oid_index ON binaries_lo_unlink_queue USING btree (binary_oid);
21+
</sql>
22+
</changeSet>
23+
24+
<changeSet author="hhund" id="db.binaries_lo_unlink_queue.changelog-2.0.0.on_binaries_lo_unlink_queue_delete_function" runOnChange="true">
25+
<sqlFile dbms="postgresql" relativeToChangelogFile="true" path="trigger_functions/on_binaries_lo_unlink_queue_delete.sql" splitStatements="false" />
26+
</changeSet>
27+
28+
<changeSet author="hhund" id="db.binaries_lo_unlink_queue.changelog-2.0.0.on_delete_trigger">
29+
<sql dbms="postgresql">
30+
CREATE TRIGGER binaries_lo_unlink_queue_delete BEFORE DELETE ON binaries_lo_unlink_queue FOR EACH ROW EXECUTE PROCEDURE on_binaries_lo_unlink_queue_delete();
31+
</sql>
32+
</changeSet>
33+
</databaseChangeLog>

dsf-fhir/dsf-fhir-server/src/main/resources/fhir/db/db.changelog.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,5 @@
4444
<include file="fhir/db/db.delete_duplicate_resources.changelog-1.6.1.xml" />
4545

4646
<include file="fhir/db/db.binaries.changelog-2.0.0.xml" />
47+
<include file="fhir/db/db.binaries_lo_unlink_queue.changelog-2.0.0.xml" />
4748
</databaseChangeLog>

0 commit comments

Comments
 (0)