Skip to content

Commit c4c6858

Browse files
committed
Merge remote-tracking branch 'origin/issue/349_Data_Compression_Service'
into develop_2
2 parents 9510f6b + 5fdb1df commit c4c6858

File tree

15 files changed

+540
-57
lines changed

15 files changed

+540
-57
lines changed

dsf-bpe/dsf-bpe-process-api-v2-impl/pom.xml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@
4848
<groupId>org.apache.commons</groupId>
4949
<artifactId>commons-lang3</artifactId>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.apache.commons</groupId>
53+
<artifactId>commons-compress</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.tukaani</groupId>
57+
<artifactId>xz</artifactId>
58+
</dependency>
5159
<dependency>
5260
<groupId>org.apache.tika</groupId>
5361
<artifactId>tika-core</artifactId>
@@ -196,10 +204,14 @@
196204
<artifactId>jakarta-regexp</artifactId>
197205
<version>1.4</version>
198206
</artifactItem>
199-
<!--<artifactItem>
207+
<artifactItem>
200208
<groupId>org.apache.commons</groupId>
201209
<artifactId>commons-compress</artifactId>
202-
</artifactItem>-->
210+
</artifactItem>
211+
<artifactItem>
212+
<groupId>org.tukaani</groupId>
213+
<artifactId>xz</artifactId>
214+
</artifactItem>
203215
<artifactItem>
204216
<groupId>org.fhir</groupId>
205217
<artifactId>ucum</artifactId>

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/ProcessPluginApiFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import ca.uhn.fhir.context.FhirContext;
1111
import dev.dsf.bpe.v2.config.ProxyConfig;
12+
import dev.dsf.bpe.v2.service.CompressionService;
1213
import dev.dsf.bpe.v2.service.CryptoService;
1314
import dev.dsf.bpe.v2.service.DataLogger;
1415
import dev.dsf.bpe.v2.service.DsfClientProvider;
@@ -50,7 +51,7 @@ public ProcessPluginApi get()
5051
fromParent(OidcClientProvider.class), fromParent(MailService.class), fromParent(MimeTypeService.class),
5152
fromParent(ObjectMapper.class), fromParent(OrganizationProvider.class),
5253
fromParent(ProcessAuthorizationHelper.class), fromParent(QuestionnaireResponseHelper.class),
53-
fromParent(ReadAccessHelper.class), fromParent(TaskHelper.class), fromParent(CryptoService.class),
54-
fromParent(TargetProvider.class), fromParent(DataLogger.class));
54+
fromParent(ReadAccessHelper.class), fromParent(TaskHelper.class), fromParent(CompressionService.class),
55+
fromParent(CryptoService.class), fromParent(TargetProvider.class), fromParent(DataLogger.class));
5556
}
5657
}

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/ProcessPluginApiImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import ca.uhn.fhir.context.FhirContext;
1010
import dev.dsf.bpe.v2.config.ProxyConfig;
11+
import dev.dsf.bpe.v2.service.CompressionService;
1112
import dev.dsf.bpe.v2.service.CryptoService;
1213
import dev.dsf.bpe.v2.service.DataLogger;
1314
import dev.dsf.bpe.v2.service.DsfClientProvider;
@@ -42,6 +43,7 @@ public class ProcessPluginApiImpl implements ProcessPluginApi, InitializingBean
4243
private final QuestionnaireResponseHelper questionnaireResponseHelper;
4344
private final ReadAccessHelper readAccessHelper;
4445
private final TaskHelper taskHelper;
46+
private final CompressionService compressionService;
4547
private final CryptoService cryptoService;
4648
private final TargetProvider targetProvider;
4749
private final DataLogger dataLogger;
@@ -53,7 +55,8 @@ public ProcessPluginApiImpl(ProcessPluginDefinition processPluginDefinition, Pro
5355
ObjectMapper objectMapper, OrganizationProvider organizationProvider,
5456
ProcessAuthorizationHelper processAuthorizationHelper,
5557
QuestionnaireResponseHelper questionnaireResponseHelper, ReadAccessHelper readAccessHelper,
56-
TaskHelper taskHelper, CryptoService cryptoService, TargetProvider targetProvider, DataLogger dataLogger)
58+
TaskHelper taskHelper, CompressionService compressionService, CryptoService cryptoService,
59+
TargetProvider targetProvider, DataLogger dataLogger)
5760
{
5861
this.processPluginDefinition = processPluginDefinition;
5962
this.proxyConfig = proxyConfig;
@@ -71,6 +74,7 @@ public ProcessPluginApiImpl(ProcessPluginDefinition processPluginDefinition, Pro
7174
this.questionnaireResponseHelper = questionnaireResponseHelper;
7275
this.readAccessHelper = readAccessHelper;
7376
this.taskHelper = taskHelper;
77+
this.compressionService = compressionService;
7478
this.cryptoService = cryptoService;
7579
this.targetProvider = targetProvider;
7680
this.dataLogger = dataLogger;
@@ -95,6 +99,7 @@ public void afterPropertiesSet() throws Exception
9599
Objects.requireNonNull(questionnaireResponseHelper, "questionnaireResponseHelper");
96100
Objects.requireNonNull(readAccessHelper, "readAccessHelper");
97101
Objects.requireNonNull(taskHelper, "taskHelper");
102+
Objects.requireNonNull(compressionService, "compressionService");
98103
Objects.requireNonNull(cryptoService, "cryptoService");
99104
Objects.requireNonNull(targetProvider, "targetProvider");
100105
Objects.requireNonNull(dataLogger, "dataLogger");
@@ -196,6 +201,12 @@ public TaskHelper getTaskHelper()
196201
return taskHelper;
197202
}
198203

204+
@Override
205+
public CompressionService getCompressionService()
206+
{
207+
return compressionService;
208+
}
209+
199210
@Override
200211
public CryptoService getCryptoService()
201212
{
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package dev.dsf.bpe.v2.service;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import java.io.PipedInputStream;
7+
import java.io.PipedOutputStream;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Objects;
11+
import java.util.zip.GZIPInputStream;
12+
import java.util.zip.GZIPOutputStream;
13+
14+
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
15+
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
16+
import org.tukaani.xz.LZMA2Options;
17+
import org.tukaani.xz.XZInputStream;
18+
import org.tukaani.xz.XZOutputStream;
19+
20+
public class CompressionServiceImpl implements CompressionService
21+
{
22+
private static final int BUFFER_SIZE = 8192;
23+
24+
@FunctionalInterface
25+
private static interface Compressor
26+
{
27+
OutputStream withCompression(OutputStream o) throws IOException;
28+
}
29+
30+
@Override
31+
public InputStream toGzip(InputStream in) throws IOException
32+
{
33+
Objects.requireNonNull(in, "in");
34+
35+
return to(in, GZIPOutputStream::new);
36+
}
37+
38+
@Override
39+
public InputStream toBzip2(InputStream in) throws IOException
40+
{
41+
Objects.requireNonNull(in, "in");
42+
43+
return toBzip2(in, 9);
44+
}
45+
46+
@Override
47+
public InputStream toBzip2(InputStream in, int blockSize) throws IOException
48+
{
49+
Objects.requireNonNull(in, "in");
50+
if (blockSize < 1 || blockSize > 9)
51+
throw new IllegalArgumentException("blockSize < 1 or > 9");
52+
53+
return to(in, out -> new BZip2CompressorOutputStream(out, blockSize));
54+
}
55+
56+
@Override
57+
public InputStream toLzma2(InputStream in) throws IOException
58+
{
59+
Objects.requireNonNull(in, "in");
60+
61+
return toLzma2(in, 6);
62+
}
63+
64+
@Override
65+
public InputStream toLzma2(InputStream in, int preset) throws IOException
66+
{
67+
Objects.requireNonNull(in, "in");
68+
if (preset < 0 || preset > 9)
69+
throw new IllegalArgumentException("preset < 0 or > 9");
70+
71+
return to(in, out -> new XZOutputStream(out, new LZMA2Options(preset)));
72+
}
73+
74+
private InputStream to(InputStream in, Compressor compressor) throws IOException
75+
{
76+
PipedOutputStream pipedOut = new PipedOutputStream();
77+
PipedInputStream pipedIn = new PipedInputStream(pipedOut, BUFFER_SIZE);
78+
79+
List<IOException> capturedExceptions = new ArrayList<>();
80+
Thread worker = new Thread(() ->
81+
{
82+
try (in; OutputStream out = compressor.withCompression(pipedOut))
83+
{
84+
byte[] buffer = new byte[BUFFER_SIZE];
85+
int bytesRead;
86+
while ((bytesRead = in.read(buffer)) != -1)
87+
out.write(buffer, 0, bytesRead);
88+
}
89+
catch (IOException e)
90+
{
91+
capturedExceptions.add(e);
92+
}
93+
});
94+
worker.setDaemon(true);
95+
worker.start();
96+
97+
if (!capturedExceptions.isEmpty())
98+
{
99+
IOException e = capturedExceptions.removeFirst();
100+
capturedExceptions.stream().forEach(e::addSuppressed);
101+
throw e;
102+
}
103+
104+
return pipedIn;
105+
}
106+
107+
@Override
108+
public InputStream fromGzip(InputStream in) throws IOException
109+
{
110+
return new GZIPInputStream(in);
111+
}
112+
113+
@Override
114+
public InputStream fromBzip2(InputStream in) throws IOException
115+
{
116+
Objects.requireNonNull(in, "in");
117+
118+
return new BZip2CompressorInputStream(in);
119+
}
120+
121+
@Override
122+
public InputStream fromLzma2(InputStream in) throws IOException
123+
{
124+
return new XZInputStream(in);
125+
}
126+
}

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/spring/ApiServiceConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import dev.dsf.bpe.v2.listener.ListenerVariables;
3636
import dev.dsf.bpe.v2.listener.StartListener;
3737
import dev.dsf.bpe.v2.plugin.ProcessPluginFactoryImpl;
38+
import dev.dsf.bpe.v2.service.CompressionService;
39+
import dev.dsf.bpe.v2.service.CompressionServiceImpl;
3840
import dev.dsf.bpe.v2.service.CryptoService;
3941
import dev.dsf.bpe.v2.service.CryptoServiceImpl;
4042
import dev.dsf.bpe.v2.service.DataLogger;
@@ -281,6 +283,12 @@ public ListenerFactory listenerFactory()
281283
continueListener());
282284
}
283285

286+
@Bean
287+
public CompressionService compressionService()
288+
{
289+
return new CompressionServiceImpl();
290+
}
291+
284292
@Bean
285293
public CryptoService cryptoService()
286294
{
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package dev.dsf.bpe.v2.service;
2+
3+
import static org.junit.Assert.assertArrayEquals;
4+
import static org.junit.Assert.assertEquals;
5+
import static org.junit.Assert.assertNotNull;
6+
7+
import java.io.ByteArrayInputStream;
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.nio.charset.StandardCharsets;
11+
12+
import org.junit.Test;
13+
14+
public class CompressionServiceTest
15+
{
16+
@FunctionalInterface
17+
private static interface Converter
18+
{
19+
InputStream convert(InputStream i) throws IOException;
20+
}
21+
22+
private static final byte[] TEST_DATA_1 = "Hello compression World!".getBytes(StandardCharsets.UTF_8);
23+
private static final byte[] TEST_DATA_2 = new byte[1_000_000];
24+
private static final byte[] TEST_DATA_3 = """
25+
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nulla a mauris ligula. Maecenas finibus magna mi, at placerat neque sollicitudin et. Nunc iaculis eros sagittis, rutrum lectus quis, eleifend nunc. Interdum et malesuada fames ac ante ipsum primis in faucibus. Aenean quam risus, blandit in mauris eu, imperdiet tristique neque. Curabitur varius nisl risus, mattis dapibus est vulputate quis. Mauris cursus vel justo id consectetur. Vivamus mollis orci sit amet erat sagittis sodales.
26+
Maecenas interdum erat et ipsum fermentum lobortis. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Proin et ipsum velit. Donec efficitur commodo ipsum, in condimentum dui facilisis non. Donec urna ipsum, tempus non finibus quis, condimentum at mi. Aliquam at porta ante. Integer luctus nec tellus in ornare. Quisque vel elit posuere, eleifend turpis eget, molestie nulla. Sed volutpat porttitor aliquet. Quisque eu mauris eget nisl porta cursus non sed nisl.
27+
Mauris mi nisi, pellentesque malesuada magna interdum, pretium feugiat erat. Mauris et velit sed erat efficitur pulvinar a in dui. Mauris id sem purus. In eget efficitur elit. Mauris a eros sodales, interdum libero at, rutrum sapien. Ut vitae nibh at augue mollis hendrerit a ut odio. Phasellus quis diam id est interdum facilisis. Nullam vitae turpis nisl. Pellentesque eu venenatis diam, ac mattis nisl. Maecenas libero dui, volutpat id eros ut, pulvinar fringilla ligula. Nam maximus orci a lectus suscipit ornare. Praesent vehicula, eros ut scelerisque faucibus, sem sapien pellentesque enim, nec suscipit risus lorem eget odio. Vivamus condimentum urna eget urna feugiat ornare. Donec vitae tempor sem. Duis scelerisque ipsum ac nunc elementum maximus id sed turpis.
28+
Proin finibus sapien lacus, eu iaculis neque tempus sed. Nam aliquam odio quis vestibulum sodales. Nam vehicula dictum libero id venenatis. Duis vulputate ligula in risus ultricies tincidunt. Nulla dignissim augue enim, eget interdum sapien interdum at. Nulla enim ligula, facilisis finibus elit euismod, tempor aliquet ante. Vivamus malesuada quis nisi vitae varius. Maecenas id velit convallis, vehicula nisl et, volutpat dui.
29+
Pellentesque et dignissim arcu. Cras eleifend egestas tellus, sit amet pellentesque quam vestibulum in. Duis id hendrerit tortor, eget dapibus sapien. Nulla massa tellus, sollicitudin sit amet mauris vitae, iaculis commodo sem. Fusce gravida eros in lorem imperdiet vestibulum. Nullam quis dolor nec est dictum euismod. Aliquam id tempus quam. Phasellus sed fringilla dui, aliquet semper nisi.
30+
"""
31+
.getBytes(StandardCharsets.UTF_8);
32+
33+
private CompressionService compressionService = new CompressionServiceImpl();
34+
35+
@Test
36+
public void testGzip() throws Exception
37+
{
38+
test(compressionService::toGzip, 44, compressionService::fromGzip, TEST_DATA_1);
39+
test(compressionService::toGzip, 1003, compressionService::fromGzip, TEST_DATA_2);
40+
test(compressionService::toGzip, 1120, compressionService::fromGzip, TEST_DATA_3);
41+
}
42+
43+
@Test
44+
public void testBzip2() throws Exception
45+
{
46+
test(compressionService::toBzip2, 63, compressionService::fromBzip2, TEST_DATA_1);
47+
test(compressionService::toBzip2, 48, compressionService::fromBzip2, TEST_DATA_2);
48+
test(compressionService::toBzip2, 1078, compressionService::fromBzip2, TEST_DATA_3);
49+
}
50+
51+
@Test
52+
public void testLzma2() throws Exception
53+
{
54+
test(compressionService::toLzma2, 80, compressionService::fromLzma2, TEST_DATA_1);
55+
test(compressionService::toLzma2, 276, compressionService::fromLzma2, TEST_DATA_2);
56+
test(compressionService::toLzma2, 1228, compressionService::fromLzma2, TEST_DATA_3);
57+
}
58+
59+
private void test(Converter compressor, int compressedSize, Converter decompressor, byte[] testData)
60+
throws IOException
61+
{
62+
byte[] compressed;
63+
try (InputStream in = compressor.convert(new ByteArrayInputStream(testData)))
64+
{
65+
compressed = in.readAllBytes();
66+
}
67+
68+
assertNotNull(compressed);
69+
assertEquals(compressedSize, compressed.length);
70+
71+
byte[] uncompressed;
72+
try (InputStream in = decompressor.convert(new ByteArrayInputStream(compressed)))
73+
{
74+
uncompressed = in.readAllBytes();
75+
}
76+
77+
assertNotNull(uncompressed);
78+
assertEquals(testData.length, uncompressed.length);
79+
assertArrayEquals(testData, uncompressed);
80+
}
81+
}

dsf-bpe/dsf-bpe-process-api-v2/src/main/java/dev/dsf/bpe/v2/ProcessPluginApi.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import ca.uhn.fhir.context.FhirContext;
99
import dev.dsf.bpe.v2.config.ProxyConfig;
10+
import dev.dsf.bpe.v2.service.CompressionService;
1011
import dev.dsf.bpe.v2.service.CryptoService;
1112
import dev.dsf.bpe.v2.service.DataLogger;
1213
import dev.dsf.bpe.v2.service.DsfClientProvider;
@@ -64,6 +65,8 @@ public interface ProcessPluginApi
6465

6566
TaskHelper getTaskHelper();
6667

68+
CompressionService getCompressionService();
69+
6770
CryptoService getCryptoService();
6871

6972
TargetProvider getTargetProvider();

0 commit comments

Comments
 (0)