Skip to content

Commit 89d0c71

Browse files
committed
compression service to use with InputStreams
Adds compression service with methods for gzip, bzip2 and lzma2. The service is designed to be used with InputStreams. For the compression methods this means that the incoming InputStream needs to be read, converted to an OuputStream, used in the compression implementations and than converted back to an InputStream. Adds v2 plugin integration tests and a direct service test.
1 parent b41cd81 commit 89d0c71

File tree

13 files changed

+519
-55
lines changed

13 files changed

+519
-55
lines changed

dsf-bpe/dsf-bpe-process-api-v2-impl/pom.xml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@
4848
<groupId>org.apache.commons</groupId>
4949
<artifactId>commons-lang3</artifactId>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.apache.commons</groupId>
53+
<artifactId>commons-compress</artifactId>
54+
</dependency>
55+
<dependency>
56+
<groupId>org.tukaani</groupId>
57+
<artifactId>xz</artifactId>
58+
</dependency>
5159
<dependency>
5260
<groupId>org.apache.tika</groupId>
5361
<artifactId>tika-core</artifactId>
@@ -196,10 +204,14 @@
196204
<artifactId>jakarta-regexp</artifactId>
197205
<version>1.4</version>
198206
</artifactItem>
199-
<!--<artifactItem>
207+
<artifactItem>
200208
<groupId>org.apache.commons</groupId>
201209
<artifactId>commons-compress</artifactId>
202-
</artifactItem>-->
210+
</artifactItem>
211+
<artifactItem>
212+
<groupId>org.tukaani</groupId>
213+
<artifactId>xz</artifactId>
214+
</artifactItem>
203215
<artifactItem>
204216
<groupId>org.fhir</groupId>
205217
<artifactId>ucum</artifactId>

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/ProcessPluginApiImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import ca.uhn.fhir.context.FhirContext;
1010
import dev.dsf.bpe.v2.config.ProxyConfig;
11+
import dev.dsf.bpe.v2.service.CompressionService;
1112
import dev.dsf.bpe.v2.service.CryptoService;
1213
import dev.dsf.bpe.v2.service.DataLogger;
1314
import dev.dsf.bpe.v2.service.DsfClientProvider;
@@ -39,6 +40,7 @@ public class ProcessPluginApiImpl implements ProcessPluginApi, InitializingBean
3940
private final QuestionnaireResponseHelper questionnaireResponseHelper;
4041
private final ReadAccessHelper readAccessHelper;
4142
private final TaskHelper taskHelper;
43+
private final CompressionService compressionService;
4244
private final CryptoService cryptoService;
4345
private final TargetProvider targetProvider;
4446
private final DataLogger dataLogger;
@@ -49,7 +51,8 @@ public ProcessPluginApiImpl(ProxyConfig proxyConfig, EndpointProvider endpointPr
4951
ObjectMapper objectMapper, OrganizationProvider organizationProvider,
5052
ProcessAuthorizationHelper processAuthorizationHelper,
5153
QuestionnaireResponseHelper questionnaireResponseHelper, ReadAccessHelper readAccessHelper,
52-
TaskHelper taskHelper, CryptoService cryptoService, TargetProvider targetProvider, DataLogger dataLogger)
54+
TaskHelper taskHelper, CompressionService compressionService, CryptoService cryptoService,
55+
TargetProvider targetProvider, DataLogger dataLogger)
5356
{
5457
this.proxyConfig = proxyConfig;
5558
this.endpointProvider = endpointProvider;
@@ -65,6 +68,7 @@ public ProcessPluginApiImpl(ProxyConfig proxyConfig, EndpointProvider endpointPr
6568
this.questionnaireResponseHelper = questionnaireResponseHelper;
6669
this.readAccessHelper = readAccessHelper;
6770
this.taskHelper = taskHelper;
71+
this.compressionService = compressionService;
6872
this.cryptoService = cryptoService;
6973
this.targetProvider = targetProvider;
7074
this.dataLogger = dataLogger;
@@ -87,6 +91,7 @@ public void afterPropertiesSet() throws Exception
8791
Objects.requireNonNull(questionnaireResponseHelper, "questionnaireResponseHelper");
8892
Objects.requireNonNull(readAccessHelper, "readAccessHelper");
8993
Objects.requireNonNull(taskHelper, "taskHelper");
94+
Objects.requireNonNull(compressionService, "compressionService");
9095
Objects.requireNonNull(cryptoService, "cryptoService");
9196
Objects.requireNonNull(targetProvider, "targetProvider");
9297
Objects.requireNonNull(dataLogger, "dataLogger");
@@ -176,6 +181,12 @@ public TaskHelper getTaskHelper()
176181
return taskHelper;
177182
}
178183

184+
@Override
185+
public CompressionService getCompressionService()
186+
{
187+
return compressionService;
188+
}
189+
179190
@Override
180191
public CryptoService getCryptoService()
181192
{
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package dev.dsf.bpe.v2.service;
2+
3+
import java.io.IOException;
4+
import java.io.InputStream;
5+
import java.io.OutputStream;
6+
import java.io.PipedInputStream;
7+
import java.io.PipedOutputStream;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
import java.util.Objects;
11+
import java.util.zip.GZIPInputStream;
12+
import java.util.zip.GZIPOutputStream;
13+
14+
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
15+
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
16+
import org.tukaani.xz.LZMA2Options;
17+
import org.tukaani.xz.XZInputStream;
18+
import org.tukaani.xz.XZOutputStream;
19+
20+
public class CompressionServiceImpl implements CompressionService
21+
{
22+
private static final int BUFFER_SIZE = 8192;
23+
24+
@FunctionalInterface
25+
private static interface Compressor
26+
{
27+
OutputStream withCompression(OutputStream o) throws IOException;
28+
}
29+
30+
@Override
31+
public InputStream toGzip(InputStream in) throws IOException
32+
{
33+
Objects.requireNonNull(in, "in");
34+
35+
return to(in, GZIPOutputStream::new);
36+
}
37+
38+
@Override
39+
public InputStream toBzip2(InputStream in) throws IOException
40+
{
41+
Objects.requireNonNull(in, "in");
42+
43+
return toBzip2(in, 9);
44+
}
45+
46+
@Override
47+
public InputStream toBzip2(InputStream in, int blockSize) throws IOException
48+
{
49+
Objects.requireNonNull(in, "in");
50+
if (blockSize < 1 || blockSize > 9)
51+
throw new IllegalArgumentException("blockSize < 1 or > 9");
52+
53+
return to(in, out -> new BZip2CompressorOutputStream(out, blockSize));
54+
}
55+
56+
@Override
57+
public InputStream toLzma2(InputStream in) throws IOException
58+
{
59+
Objects.requireNonNull(in, "in");
60+
61+
return toLzma2(in, 6);
62+
}
63+
64+
@Override
65+
public InputStream toLzma2(InputStream in, int preset) throws IOException
66+
{
67+
Objects.requireNonNull(in, "in");
68+
if (preset < 0 || preset > 9)
69+
throw new IllegalArgumentException("preset < 0 or > 9");
70+
71+
return to(in, out -> new XZOutputStream(out, new LZMA2Options(preset)));
72+
}
73+
74+
private InputStream to(InputStream in, Compressor compressor) throws IOException
75+
{
76+
PipedOutputStream pipedOut = new PipedOutputStream();
77+
PipedInputStream pipedIn = new PipedInputStream(pipedOut, BUFFER_SIZE);
78+
79+
List<IOException> capturedExceptions = new ArrayList<>();
80+
Thread worker = new Thread(() ->
81+
{
82+
try (in; OutputStream out = compressor.withCompression(pipedOut))
83+
{
84+
byte[] buffer = new byte[BUFFER_SIZE];
85+
int bytesRead;
86+
while ((bytesRead = in.read(buffer)) != -1)
87+
out.write(buffer, 0, bytesRead);
88+
}
89+
catch (IOException e)
90+
{
91+
capturedExceptions.add(e);
92+
}
93+
});
94+
worker.setDaemon(true);
95+
worker.start();
96+
97+
if (!capturedExceptions.isEmpty())
98+
{
99+
IOException e = capturedExceptions.removeFirst();
100+
capturedExceptions.stream().forEach(e::addSuppressed);
101+
throw e;
102+
}
103+
104+
return pipedIn;
105+
}
106+
107+
@Override
108+
public InputStream fromGzip(InputStream in) throws IOException
109+
{
110+
return new GZIPInputStream(in);
111+
}
112+
113+
@Override
114+
public InputStream fromBzip2(InputStream in) throws IOException
115+
{
116+
Objects.requireNonNull(in, "in");
117+
118+
return new BZip2CompressorInputStream(in);
119+
}
120+
121+
@Override
122+
public InputStream fromLzma2(InputStream in) throws IOException
123+
{
124+
return new XZInputStream(in);
125+
}
126+
}

dsf-bpe/dsf-bpe-process-api-v2-impl/src/main/java/dev/dsf/bpe/v2/spring/ApiServiceConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import dev.dsf.bpe.v2.listener.ListenerVariables;
3838
import dev.dsf.bpe.v2.listener.StartListener;
3939
import dev.dsf.bpe.v2.plugin.ProcessPluginFactoryImpl;
40+
import dev.dsf.bpe.v2.service.CompressionService;
41+
import dev.dsf.bpe.v2.service.CompressionServiceImpl;
4042
import dev.dsf.bpe.v2.service.CryptoService;
4143
import dev.dsf.bpe.v2.service.CryptoServiceImpl;
4244
import dev.dsf.bpe.v2.service.DataLogger;
@@ -103,7 +105,7 @@ public ProcessPluginApi processPluginApiV2()
103105
return new ProcessPluginApiImpl(proxyConfigDelegate(), endpointProvider(), fhirContext(), dsfClientProvider(),
104106
fhirClientProvider(), oidcClientProvider(), mailService(), mimeTypeService(), objectMapper(),
105107
organizationProvider(), processAuthorizationHelper(), questionnaireResponseHelper(), readAccessHelper(),
106-
taskHelper(), cryptoService(), targetProvider(), dataLogger());
108+
taskHelper(), compressionService(), cryptoService(), targetProvider(), dataLogger());
107109
}
108110

109111
@Bean
@@ -284,6 +286,12 @@ public ListenerFactory listenerFactory()
284286
continueListener());
285287
}
286288

289+
@Bean
290+
public CompressionService compressionService()
291+
{
292+
return new CompressionServiceImpl();
293+
}
294+
287295
@Bean
288296
public CryptoService cryptoService()
289297
{
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package dev.dsf.bpe.v2.service;
2+
3+
import static org.junit.Assert.assertArrayEquals;
4+
import static org.junit.Assert.assertEquals;
5+
import static org.junit.Assert.assertNotNull;
6+
7+
import java.io.ByteArrayInputStream;
8+
import java.io.IOException;
9+
import java.io.InputStream;
10+
import java.nio.charset.StandardCharsets;
11+
12+
import org.junit.Test;
13+
14+
public class CompressionServiceTest
15+
{
16+
@FunctionalInterface
17+
private static interface Converter
18+
{
19+
InputStream convert(InputStream i) throws IOException;
20+
}
21+
22+
private static final byte[] TEST_DATA_1 = "Hello compression World!".getBytes(StandardCharsets.UTF_8);
23+
private static final byte[] TEST_DATA_2 = new byte[1_000_000];
24+
private static final byte[] TEST_DATA_3 = """
25+
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nulla a mauris ligula. Maecenas finibus magna mi, at placerat neque sollicitudin et. Nunc iaculis eros sagittis, rutrum lectus quis, eleifend nunc. Interdum et malesuada fames ac ante ipsum primis in faucibus. Aenean quam risus, blandit in mauris eu, imperdiet tristique neque. Curabitur varius nisl risus, mattis dapibus est vulputate quis. Mauris cursus vel justo id consectetur. Vivamus mollis orci sit amet erat sagittis sodales.
26+
Maecenas interdum erat et ipsum fermentum lobortis. Orci varius natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Proin et ipsum velit. Donec efficitur commodo ipsum, in condimentum dui facilisis non. Donec urna ipsum, tempus non finibus quis, condimentum at mi. Aliquam at porta ante. Integer luctus nec tellus in ornare. Quisque vel elit posuere, eleifend turpis eget, molestie nulla. Sed volutpat porttitor aliquet. Quisque eu mauris eget nisl porta cursus non sed nisl.
27+
Mauris mi nisi, pellentesque malesuada magna interdum, pretium feugiat erat. Mauris et velit sed erat efficitur pulvinar a in dui. Mauris id sem purus. In eget efficitur elit. Mauris a eros sodales, interdum libero at, rutrum sapien. Ut vitae nibh at augue mollis hendrerit a ut odio. Phasellus quis diam id est interdum facilisis. Nullam vitae turpis nisl. Pellentesque eu venenatis diam, ac mattis nisl. Maecenas libero dui, volutpat id eros ut, pulvinar fringilla ligula. Nam maximus orci a lectus suscipit ornare. Praesent vehicula, eros ut scelerisque faucibus, sem sapien pellentesque enim, nec suscipit risus lorem eget odio. Vivamus condimentum urna eget urna feugiat ornare. Donec vitae tempor sem. Duis scelerisque ipsum ac nunc elementum maximus id sed turpis.
28+
Proin finibus sapien lacus, eu iaculis neque tempus sed. Nam aliquam odio quis vestibulum sodales. Nam vehicula dictum libero id venenatis. Duis vulputate ligula in risus ultricies tincidunt. Nulla dignissim augue enim, eget interdum sapien interdum at. Nulla enim ligula, facilisis finibus elit euismod, tempor aliquet ante. Vivamus malesuada quis nisi vitae varius. Maecenas id velit convallis, vehicula nisl et, volutpat dui.
29+
Pellentesque et dignissim arcu. Cras eleifend egestas tellus, sit amet pellentesque quam vestibulum in. Duis id hendrerit tortor, eget dapibus sapien. Nulla massa tellus, sollicitudin sit amet mauris vitae, iaculis commodo sem. Fusce gravida eros in lorem imperdiet vestibulum. Nullam quis dolor nec est dictum euismod. Aliquam id tempus quam. Phasellus sed fringilla dui, aliquet semper nisi.
30+
"""
31+
.getBytes(StandardCharsets.UTF_8);
32+
33+
private CompressionService compressionService = new CompressionServiceImpl();
34+
35+
@Test
36+
public void testGzip() throws Exception
37+
{
38+
test(compressionService::toGzip, 44, compressionService::fromGzip, TEST_DATA_1);
39+
test(compressionService::toGzip, 1003, compressionService::fromGzip, TEST_DATA_2);
40+
test(compressionService::toGzip, 1120, compressionService::fromGzip, TEST_DATA_3);
41+
}
42+
43+
@Test
44+
public void testBzip2() throws Exception
45+
{
46+
test(compressionService::toBzip2, 63, compressionService::fromBzip2, TEST_DATA_1);
47+
test(compressionService::toBzip2, 48, compressionService::fromBzip2, TEST_DATA_2);
48+
test(compressionService::toBzip2, 1078, compressionService::fromBzip2, TEST_DATA_3);
49+
}
50+
51+
@Test
52+
public void testLzma2() throws Exception
53+
{
54+
test(compressionService::toLzma2, 80, compressionService::fromLzma2, TEST_DATA_1);
55+
test(compressionService::toLzma2, 276, compressionService::fromLzma2, TEST_DATA_2);
56+
test(compressionService::toLzma2, 1228, compressionService::fromLzma2, TEST_DATA_3);
57+
}
58+
59+
private void test(Converter compressor, int compressedSize, Converter decompressor, byte[] testData)
60+
throws IOException
61+
{
62+
byte[] compressed;
63+
try (InputStream in = compressor.convert(new ByteArrayInputStream(testData)))
64+
{
65+
compressed = in.readAllBytes();
66+
}
67+
68+
assertNotNull(compressed);
69+
assertEquals(compressedSize, compressed.length);
70+
71+
byte[] uncompressed;
72+
try (InputStream in = decompressor.convert(new ByteArrayInputStream(compressed)))
73+
{
74+
uncompressed = in.readAllBytes();
75+
}
76+
77+
assertNotNull(uncompressed);
78+
assertEquals(testData.length, uncompressed.length);
79+
assertArrayEquals(testData, uncompressed);
80+
}
81+
}

dsf-bpe/dsf-bpe-process-api-v2/src/main/java/dev/dsf/bpe/v2/ProcessPluginApi.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import ca.uhn.fhir.context.FhirContext;
99
import dev.dsf.bpe.v2.config.ProxyConfig;
10+
import dev.dsf.bpe.v2.service.CompressionService;
1011
import dev.dsf.bpe.v2.service.CryptoService;
1112
import dev.dsf.bpe.v2.service.DataLogger;
1213
import dev.dsf.bpe.v2.service.DsfClientProvider;
@@ -59,6 +60,8 @@ public interface ProcessPluginApi
5960

6061
TaskHelper getTaskHelper();
6162

63+
CompressionService getCompressionService();
64+
6265
CryptoService getCryptoService();
6366

6467
TargetProvider getTargetProvider();

0 commit comments

Comments
 (0)