Skip to content

Commit 5c74445

Browse files
committed
add store.getInputStream
1 parent 9014fef commit 5c74445

File tree

9 files changed

+192
-7
lines changed

9 files changed

+192
-7
lines changed

src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import javax.annotation.Nullable;
55
import java.io.ByteArrayOutputStream;
66
import java.io.IOException;
7+
import java.io.InputStream;
78
import java.nio.ByteBuffer;
89
import java.nio.file.Path;
910
import java.nio.file.Paths;
@@ -290,6 +291,11 @@ public StoreHandle resolve(String... keys) {
290291
return new StoreHandle(this, keys);
291292
}
292293

294+
@Override
295+
public InputStream getInputStream(String[] keys, long start, long end) {
296+
return bufferStore.getInputStream(keys, start, end);
297+
}
298+
293299
@Override
294300
public String toString() {
295301
return "BufferedZipStore(" + underlyingStore.toString() + ")";

src/main/java/dev/zarr/zarrjava/store/FilesystemStore.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package dev.zarr.zarrjava.store;
22

33
import dev.zarr.zarrjava.utils.Utils;
4+
import org.apache.commons.io.input.BoundedInputStream;
5+
46
import java.io.IOException;
7+
import java.io.InputStream;
58
import java.nio.ByteBuffer;
69
import java.nio.channels.SeekableByteChannel;
710
import java.nio.file.Files;
@@ -146,4 +149,25 @@ public String toString() {
146149
return this.path.toUri().toString().replaceAll("\\/$", "");
147150
}
148151

152+
@Override
153+
public InputStream getInputStream(String[] keys, long start, long end) {
154+
Path keyPath = resolveKeys(keys);
155+
try {
156+
InputStream inputStream = Files.newInputStream(keyPath);
157+
if (start > 0) {
158+
long skipped = inputStream.skip(start);
159+
if (skipped < start) {
160+
throw new IOException("Unable to skip to the desired start position.");
161+
}
162+
}
163+
if (end != -1) {
164+
long bytesToRead = end - start;
165+
return new BoundedInputStream(inputStream, bytesToRead);
166+
} else {
167+
return inputStream;
168+
}
169+
} catch (IOException e) {
170+
throw new RuntimeException(e);
171+
}
172+
}
149173
}

src/main/java/dev/zarr/zarrjava/store/HttpStore.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import com.squareup.okhttp.Request;
66
import com.squareup.okhttp.Response;
77
import com.squareup.okhttp.ResponseBody;
8+
9+
import java.io.FilterInputStream;
810
import java.io.IOException;
11+
import java.io.InputStream;
912
import java.nio.ByteBuffer;
1013
import javax.annotation.Nonnull;
1114
import javax.annotation.Nullable;
@@ -101,6 +104,34 @@ public StoreHandle resolve(String... keys) {
101104

102105
@Override
103106
public String toString() {
104-
return uri;
107+
return uri;
105108
}
109+
110+
@Override
111+
@Nullable
112+
public InputStream getInputStream(String[] keys, long start, long end) {
113+
if (start < 0) {
114+
throw new IllegalArgumentException("Argument 'start' needs to be non-negative.");
115+
}
116+
Request request = new Request.Builder().url(resolveKeys(keys)).header(
117+
"Range", String.format("Bytes=%d-%d", start, end - 1)).build();
118+
Call call = httpClient.newCall(request);
119+
try {
120+
Response response = call.execute();
121+
ResponseBody body = response.body();
122+
if (body == null) return null;
123+
InputStream stream = body.byteStream();
124+
125+
// Ensure closing the stream also closes the response
126+
return new FilterInputStream(stream) {
127+
@Override
128+
public void close() throws IOException {
129+
super.close();
130+
body.close();
131+
}
132+
};
133+
} catch (IOException e) {
134+
return null;
135+
}
136+
}
106137
}

src/main/java/dev/zarr/zarrjava/store/MemoryStore.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import javax.annotation.Nonnull;
44
import javax.annotation.Nullable;
5+
import java.io.InputStream;
56
import java.nio.ByteBuffer;
67
import java.util.*;
78
import java.util.concurrent.ConcurrentHashMap;
@@ -45,7 +46,7 @@ public ByteBuffer get(String[] keys, long start, long end) {
4546
if (bytes == null) return null;
4647
if (end < 0) end = bytes.length;
4748
if (end > Integer.MAX_VALUE) throw new IllegalArgumentException("End index too large");
48-
return ByteBuffer.wrap(bytes, (int) start, (int) end);
49+
return ByteBuffer.wrap(bytes, (int) start, (int) (end - start));
4950
}
5051

5152

@@ -83,5 +84,14 @@ public StoreHandle resolve(String... keys) {
8384
public String toString() {
8485
return String.format("<MemoryStore {%s}>", hashCode());
8586
}
87+
88+
@Override
89+
public InputStream getInputStream(String[] keys, long start, long end) {
90+
byte[] bytes = map.get(resolveKeys(keys));
91+
if (bytes == null) return null;
92+
if (end < 0) end = bytes.length;
93+
if (end > Integer.MAX_VALUE) throw new IllegalArgumentException("End index too large");
94+
return new java.io.ByteArrayInputStream(bytes, (int) start, (int)(end - start));
95+
}
8696
}
8797

src/main/java/dev/zarr/zarrjava/store/ReadOnlyZipStore.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
44
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
55
import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream;
6+
import org.apache.commons.io.input.BoundedInputStream;
67

78
import javax.annotation.Nonnull;
89
import javax.annotation.Nullable;
910
import java.io.ByteArrayOutputStream;
1011
import java.io.IOException;
12+
import java.io.InputStream;
1113
import java.nio.ByteBuffer;
1214
import java.nio.file.Path;
1315
import java.nio.file.Paths;
@@ -83,7 +85,8 @@ public ByteBuffer get(String[] keys, long start, long end) {
8385
continue;
8486
}
8587

86-
if (zis.skip(start) != start) {
88+
long skipResult = zis.skip(start);
89+
if (skipResult != start) {
8790
throw new IOException("Failed to skip to start position " + start + " in zip entry " + entryName);
8891
}
8992

@@ -168,4 +171,38 @@ public Stream<String[]> list(String[] keys) {
168171
}
169172
return builder.build();
170173
}
174+
175+
@Override
176+
public InputStream getInputStream(String[] keys, long start, long end) {
177+
InputStream baseStream = underlyingStore.getInputStream();
178+
179+
try {
180+
ZipArchiveInputStream zis = new ZipArchiveInputStream(baseStream);
181+
ZipArchiveEntry entry;
182+
while ((entry = zis.getNextEntry()) != null) {
183+
String entryName = entry.getName();
184+
185+
if (entryName.startsWith("/")) {
186+
entryName = entryName.substring(1);
187+
}
188+
if (entry.isDirectory() || !entryName.equals(resolveKeys(keys))) {
189+
continue;
190+
}
191+
192+
long skipResult = zis.skip(start);
193+
if (skipResult != start) {
194+
throw new IOException("Failed to skip to start position " + start + " in zip entry " + entryName);
195+
}
196+
197+
long bytesToRead;
198+
if (end != -1) bytesToRead = end - start;
199+
else bytesToRead = Long.MAX_VALUE;
200+
201+
return new BoundedInputStream(zis, bytesToRead);
202+
}
203+
return null;
204+
} catch (IOException e) {
205+
}
206+
return null;
207+
}
171208
}

src/main/java/dev/zarr/zarrjava/store/S3Store.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,17 @@ public StoreHandle resolve(String... keys) {
121121
return new StoreHandle(this, keys);
122122
}
123123

124+
@Override
125+
public InputStream getInputStream(String[] keys, long start, long end) {
126+
GetObjectRequest req = GetObjectRequest.builder()
127+
.bucket(bucketName)
128+
.key(resolveKeys(keys))
129+
.range(String.format("bytes=%d-%d", start, end-1)) // S3 range is inclusive
130+
.build();
131+
ResponseInputStream<GetObjectResponse> responseInputStream = s3client.getObject(req);
132+
return responseInputStream;
133+
}
134+
124135
@Override
125136
public String toString() {
126137
return "s3://" + bucketName + "/" + prefix;

src/main/java/dev/zarr/zarrjava/store/Store.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.zarr.zarrjava.store;
22

3+
import java.io.InputStream;
34
import java.nio.ByteBuffer;
45
import java.util.stream.Stream;
56
import javax.annotation.Nonnull;
@@ -42,4 +43,10 @@ default Stream<String[]> list() {
4243
return list(new String[]{});
4344
}
4445
}
46+
47+
InputStream getInputStream(String[] keys, long start, long end);
48+
49+
default InputStream getInputStream(String[] keys) {
50+
return getInputStream(keys, 0, -1);
51+
}
4552
}

src/main/java/dev/zarr/zarrjava/store/StoreHandle.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package dev.zarr.zarrjava.store;
22

33
import dev.zarr.zarrjava.utils.Utils;
4+
5+
import java.io.InputStream;
46
import java.nio.ByteBuffer;
57
import java.nio.file.NoSuchFileException;
68
import java.nio.file.Path;
@@ -44,6 +46,14 @@ public ByteBuffer read(long start, long end) {
4446
return store.get(keys, start, end);
4547
}
4648

49+
public InputStream getInputStream(int start, int end) {
50+
return store.getInputStream(keys, start, end);
51+
}
52+
53+
public InputStream getInputStream() {
54+
return store.getInputStream(keys);
55+
}
56+
4757
public void set(ByteBuffer bytes) {
4858
store.set(keys, bytes);
4959
}

src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import software.amazon.awssdk.services.s3.S3Client;
1919

2020
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.nio.ByteBuffer;
2123
import java.nio.file.Files;
2224
import java.util.ArrayList;
2325
import java.util.Arrays;
@@ -103,6 +105,55 @@ public void testS3StoreGet() throws IOException, ZarrException {
103105

104106
}
105107

108+
static Stream<StoreHandle> inputStreamStores() throws IOException {
109+
String[] s3StoreKeys = new String[]{"zarr_v3", "l4_sample", "color", "1", "zarr.json"};
110+
StoreHandle s3StoreHandle = new S3Store(S3Client.builder()
111+
.region(Region.of("eu-west-1"))
112+
.credentialsProvider(AnonymousCredentialsProvider.create())
113+
.build(), "static.webknossos.org", "data")
114+
.resolve(s3StoreKeys);
115+
116+
byte[] testData = new byte[100];
117+
for (int i = 0; i < testData.length; i++) {
118+
testData[i] = (byte) i;
119+
}
120+
121+
StoreHandle memoryStoreHandle = new MemoryStore().resolve();
122+
memoryStoreHandle.set(ByteBuffer.wrap(testData));
123+
124+
StoreHandle fsStoreHandle = new FilesystemStore(TESTOUTPUT.resolve("testInputStreamFS")).resolve("testfile");
125+
fsStoreHandle.set(ByteBuffer.wrap(testData));
126+
127+
zipFile(TESTOUTPUT.resolve("testInputStreamFS"), TESTOUTPUT.resolve("testInputStreamZIP.zip"));
128+
StoreHandle bufferedZipStoreHandle = new BufferedZipStore(TESTOUTPUT.resolve("testInputStreamZIP.zip"), true)
129+
.resolve("testfile");
130+
131+
StoreHandle readOnlyZipStoreHandle = new ReadOnlyZipStore(TESTOUTPUT.resolve("testInputStreamZIP.zip"))
132+
.resolve("testfile");
133+
134+
StoreHandle httpStoreHandle = new HttpStore("https://static.webknossos.org/data/zarr_v3/l4_sample")
135+
.resolve("color", "1", "zarr.json");
136+
return Stream.of(
137+
memoryStoreHandle,
138+
s3StoreHandle,
139+
fsStoreHandle,
140+
bufferedZipStoreHandle,
141+
readOnlyZipStoreHandle,
142+
httpStoreHandle
143+
);
144+
}
145+
146+
@ParameterizedTest
147+
@MethodSource("inputStreamStores")
148+
public void testStoreInputStream(StoreHandle storeHandle) throws IOException, ZarrException {
149+
InputStream is = storeHandle.getInputStream(10, 20);
150+
byte[] buffer = new byte[10];
151+
int bytesRead = is.read(buffer);
152+
Assertions.assertEquals(10, bytesRead);
153+
byte[] expectedBuffer = new byte[10];
154+
storeHandle.read(10, 20).get(expectedBuffer);
155+
Assertions.assertArrayEquals(expectedBuffer, buffer);
156+
}
106157

107158
@Test
108159
public void testHttpStore() throws IOException, ZarrException {
@@ -115,8 +166,7 @@ public void testHttpStore() throws IOException, ZarrException {
115166
@ParameterizedTest
116167
@CsvSource({"false", "true",})
117168
public void testMemoryStoreV3(boolean useParallel) throws ZarrException, IOException {
118-
int[] testData = new int[1024 * 1024];
119-
Arrays.setAll(testData, p -> p);
169+
int[] testData = testData();
120170

121171
dev.zarr.zarrjava.v3.Group group = dev.zarr.zarrjava.v3.Group.create(new MemoryStore().resolve());
122172
Array array = group.createArray("array", b -> b
@@ -140,8 +190,7 @@ public void testMemoryStoreV3(boolean useParallel) throws ZarrException, IOExcep
140190
@ParameterizedTest
141191
@CsvSource({"false", "true",})
142192
public void testMemoryStoreV2(boolean useParallel) throws ZarrException, IOException {
143-
int[] testData = new int[1024 * 1024];
144-
Arrays.setAll(testData, p -> p);
193+
int[] testData = testData();
145194

146195
dev.zarr.zarrjava.v2.Group group = dev.zarr.zarrjava.v2.Group.create(new MemoryStore().resolve());
147196
dev.zarr.zarrjava.v2.Array array = group.createArray("array", b -> b

0 commit comments

Comments
 (0)