Skip to content

Commit bce731e

Browse files
committed
read zip store
1 parent 12a86b7 commit bce731e

File tree

3 files changed

+202
-78
lines changed

3 files changed

+202
-78
lines changed
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package dev.zarr.zarrjava.store;
2+
3+
import javax.annotation.Nonnull;
4+
import javax.annotation.Nullable;
5+
import java.io.ByteArrayOutputStream;
6+
import java.io.IOException;
7+
import java.io.InputStream;
8+
import java.nio.ByteBuffer;
9+
import java.nio.file.Path;
10+
import java.nio.file.Paths;
11+
import java.util.stream.Stream;
12+
import java.util.zip.ZipEntry;
13+
import java.util.zip.ZipInputStream;
14+
15+
16+
/** A Store implementation that buffers reads and writes and flushes them to an underlying Store as a zip file.
17+
*/
18+
public class BufferedZipStore implements Store, Store.ListableStore {
19+
20+
private final StoreHandle underlyingStore;
21+
private final Store.ListableStore bufferStore;
22+
23+
private void writeBuffer() throws IOException{
24+
// create zip file bytes from buffer store and write to underlying store
25+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
26+
27+
// try (ZipOutputStream zos = new ZipOutputStream(baos)) {
28+
// // iterate over bufferStore.list()
29+
// for (String entry: bufferStore.list(new String[]{}).toArray(String[]::new)) {
30+
// List<String> pathComponents = entry.getKey();
31+
// byte[] data = entry.getValue();
32+
//
33+
// // Build the ZIP path (e.g. ["dir", "sub", "file.txt"] → "dir/sub/file.txt")
34+
// String path = String.join("/", pathComponents);
35+
//
36+
// ZipEntry zipEntry = new ZipEntry(path);
37+
// zos.putNextEntry(zipEntry);
38+
//
39+
// zos.write(data);
40+
// zos.closeEntry();
41+
// }
42+
// }
43+
44+
// byte[] zipBytes = baos.toByteArray();
45+
// return ByteBuffer.wrap(zipBytes);
46+
//
47+
// underlyingStore.set();
48+
}
49+
50+
private void loadBuffer() throws IOException{
51+
// read zip file bytes from underlying store and populate buffer store
52+
ByteBuffer buffer = underlyingStore.read();
53+
if (buffer == null) {
54+
return;
55+
}
56+
try (ZipInputStream zis = new ZipInputStream(new ByteBufferBackedInputStream(buffer))) {
57+
ZipEntry entry;
58+
while ((entry = zis.getNextEntry()) != null) {
59+
if (entry.isDirectory()) {
60+
zis.closeEntry();
61+
continue;
62+
}
63+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
64+
byte[] tmp = new byte[8192];
65+
int read;
66+
while ((read = zis.read(tmp)) != -1) {
67+
baos.write(tmp, 0, read);
68+
}
69+
70+
byte[] bytes = baos.toByteArray();
71+
System.out.println("Loading entry: " + entry.getName() + " (" + bytes.length + " bytes)");
72+
73+
bufferStore.set(new String[]{entry.getName()}, ByteBuffer.wrap(bytes));
74+
75+
zis.closeEntry();
76+
}
77+
}
78+
79+
}
80+
81+
public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore) {
82+
this.underlyingStore = underlyingStore;
83+
this.bufferStore = bufferStore;
84+
try {
85+
loadBuffer();
86+
} catch (IOException e) {
87+
throw new RuntimeException("Failed to load buffer from underlying store", e);
88+
}
89+
}
90+
91+
public BufferedZipStore(@Nonnull StoreHandle underlyingStore) {
92+
this(underlyingStore, new MemoryStore());
93+
}
94+
95+
public BufferedZipStore(@Nonnull Path underlyingStore) {
96+
this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()));
97+
System.out.println("Created BufferedZipStore with underlying path: " + this.underlyingStore.toString());
98+
99+
}
100+
101+
public BufferedZipStore(@Nonnull String underlyingStorePath) {
102+
this(Paths.get(underlyingStorePath));
103+
}
104+
105+
/**
106+
* Flushes the buffer to the underlying store as a zip file.
107+
*/
108+
public void flush() throws IOException {
109+
writeBuffer();
110+
}
111+
112+
@Override
113+
public Stream<String[]> list(String[] keys) {
114+
return bufferStore.list(keys);
115+
}
116+
117+
@Override
118+
public boolean exists(String[] keys) {
119+
return bufferStore.exists(keys);
120+
}
121+
122+
@Nullable
123+
@Override
124+
public ByteBuffer get(String[] keys) {
125+
return bufferStore.get(keys);
126+
}
127+
128+
@Nullable
129+
@Override
130+
public ByteBuffer get(String[] keys, long start) {
131+
return bufferStore.get(keys, start);
132+
}
133+
134+
@Nullable
135+
@Override
136+
public ByteBuffer get(String[] keys, long start, long end) {
137+
return bufferStore.get(keys, start, end);
138+
}
139+
140+
@Override
141+
public void set(String[] keys, ByteBuffer bytes) {
142+
bufferStore.set(keys, bytes);
143+
}
144+
145+
@Override
146+
public void delete(String[] keys) {
147+
bufferStore.delete(keys);
148+
}
149+
150+
@Nonnull
151+
@Override
152+
public StoreHandle resolve(String... keys) {
153+
return new StoreHandle(this, keys);
154+
}
155+
156+
@Override
157+
public String toString() {
158+
return "BufferedZipStore(" + underlyingStore.toString() + ")";
159+
}
160+
161+
static class ByteBufferBackedInputStream extends InputStream {
162+
private final ByteBuffer buf;
163+
164+
public ByteBufferBackedInputStream(ByteBuffer buf) {
165+
this.buf = buf;
166+
}
167+
168+
@Override
169+
public int read() {
170+
return buf.hasRemaining() ? (buf.get() & 0xFF) : -1;
171+
}
172+
173+
@Override
174+
public int read(byte[] bytes, int off, int len) {
175+
if (!buf.hasRemaining()) {
176+
return -1;
177+
}
178+
179+
int toRead = Math.min(len, buf.remaining());
180+
buf.get(bytes, off, toRead);
181+
return toRead;
182+
}
183+
}
184+
185+
}

src/main/java/dev/zarr/zarrjava/store/ZipStore.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

src/test/java/dev/zarr/zarrjava/ZarrStoreTest.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void testMemoryStoreV2(boolean useParallel) throws ZarrException, IOExcep
140140
}
141141

142142
@Test
143-
public void testZipStore() throws ZarrException, IOException {
143+
public void testOpenZipStore() throws ZarrException, IOException {
144144
Path sourceDir = TESTOUTPUT.resolve("testZipStore");
145145
Path targetDir = TESTOUTPUT.resolve("testZipStore.zip");
146146
FilesystemStore fsStore = new FilesystemStore(sourceDir);
@@ -150,19 +150,30 @@ public void testZipStore() throws ZarrException, IOException {
150150
ZipOutputStream zipOut = new ZipOutputStream(fos);
151151

152152
File fileToZip = new File(sourceDir.toUri());
153-
zipFile(fileToZip, fileToZip.getName(), zipOut);
153+
zipFile(fileToZip, "", zipOut);
154154
zipOut.close();
155155
fos.close();
156156

157-
ZipStore zipStore = new ZipStore(targetDir);
157+
BufferedZipStore zipStore = new BufferedZipStore(targetDir);
158158
assertIsTestGroupV3(Group.open(zipStore.resolve()), true);
159159
}
160160

161+
@Test
162+
public void testWriteZipStore() throws ZarrException, IOException {
163+
Path targetDir = TESTOUTPUT.resolve("testWriteZipStore.zip");
164+
BufferedZipStore zipStore = new BufferedZipStore(targetDir);
165+
writeTestGroupV3(zipStore, true);
166+
zipStore.flush();
167+
168+
BufferedZipStore zipStoreRead = new BufferedZipStore(targetDir);
169+
assertIsTestGroupV3(Group.open(zipStoreRead.resolve()), true);
170+
}
171+
161172
static Stream<Store> localStores() {
162173
return Stream.of(
163-
// new ConcurrentMemoryStore(),
174+
new MemoryStore(),
164175
new FilesystemStore(TESTOUTPUT.resolve("testLocalStoresFS")),
165-
new ZipStore(TESTOUTPUT.resolve("testLocalStoresZIP.zip"))
176+
new BufferedZipStore(TESTOUTPUT.resolve("testLocalStoresZIP.zip"))
166177
);
167178
}
168179

@@ -187,7 +198,7 @@ Group writeTestGroupV3(Store store, boolean useParallel) throws ZarrException, I
187198
Array array = group.createArray("array", b -> b
188199
.withShape(1024, 1024)
189200
.withDataType(DataType.UINT32)
190-
.withChunkShape(5, 5)
201+
.withChunkShape(512, 512)
191202
);
192203
array.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{1024, 1024}, testData()), useParallel);
193204
group.createGroup("subgroup");

0 commit comments

Comments
 (0)