Skip to content

Commit 950cc74

Browse files
committed
parallelize
1 parent 8342e1c commit 950cc74

File tree

2 files changed

+146
-65
lines changed

2 files changed

+146
-65
lines changed

src/main/java/dev/zarr/zarrjava/v3/Array.java

Lines changed: 123 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Map;
1515
import java.util.function.Function;
1616
import java.util.stream.Collectors;
17+
import java.util.stream.Stream;
1718
import javax.annotation.Nonnull;
1819
import javax.annotation.Nullable;
1920
import ucar.ma2.InvalidRangeException;
@@ -119,6 +120,7 @@ public static ArrayMetadataBuilder metadataBuilder(ArrayMetadata existingMetadat
119120

120121
/**
121122
* Reads the entire Zarr array into an ucar.ma2.Array.
123+
* Utilizes no parallelism.
122124
*
123125
* @throws ZarrException
124126
*/
@@ -129,13 +131,38 @@ public ucar.ma2.Array read() throws ZarrException {
129131

130132
/**
131133
* Reads a part of the Zarr array based on a requested offset and shape into an ucar.ma2.Array.
134+
* Utilizes no parallelism.
132135
*
133136
* @param offset
134137
* @param shape
135138
* @throws ZarrException
136139
*/
137140
@Nonnull
138141
public ucar.ma2.Array read(final long[] offset, final int[] shape) throws ZarrException {
142+
return read(offset, shape, false);
143+
}
144+
145+
/**
146+
* Reads the entire Zarr array into an ucar.ma2.Array.
147+
*
148+
* @param parallel
149+
* @throws ZarrException
150+
*/
151+
@Nonnull
152+
public ucar.ma2.Array read(final boolean parallel) throws ZarrException {
153+
return read(new long[metadata.ndim()], Utils.toIntArray(metadata.shape), parallel);
154+
}
155+
156+
/**
157+
* Reads a part of the Zarr array based on a requested offset and shape into an ucar.ma2.Array.
158+
*
159+
* @param offset
160+
* @param shape
161+
* @param parallel
162+
* @throws ZarrException
163+
*/
164+
@Nonnull
165+
public ucar.ma2.Array read(final long[] offset, final int[] shape, final boolean parallel) throws ZarrException {
139166
if (offset.length != metadata.ndim()) {
140167
throw new IllegalArgumentException("'offset' needs to have rank '" + metadata.ndim() + "'.");
141168
}
@@ -150,41 +177,44 @@ public ucar.ma2.Array read(final long[] offset, final int[] shape) throws ZarrEx
150177

151178
final ucar.ma2.Array outputArray = ucar.ma2.Array.factory(metadata.dataType.getMA2DataType(),
152179
shape);
153-
Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape))
154-
.forEach(
155-
chunkCoords -> {
156-
try {
157-
final IndexingUtils.ChunkProjection chunkProjection =
158-
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
159-
shape
160-
);
161-
162-
if (chunkIsInArray(chunkCoords)) {
163-
MultiArrayUtils.copyRegion(metadata.allocateFillValueChunk(),
164-
chunkProjection.chunkOffset, outputArray, chunkProjection.outOffset,
165-
chunkProjection.shape
166-
);
167-
}
168-
169-
final String[] chunkKeys = metadata.chunkKeyEncoding.encodeChunkKey(chunkCoords);
170-
final StoreHandle chunkHandle = storeHandle.resolve(chunkKeys);
171-
172-
if (codecPipeline.supportsPartialDecode()) {
173-
final ucar.ma2.Array chunkArray = codecPipeline.decodePartial(chunkHandle,
174-
Utils.toLongArray(chunkProjection.chunkOffset), chunkProjection.shape);
175-
MultiArrayUtils.copyRegion(chunkArray, new int[metadata.ndim()], outputArray,
176-
chunkProjection.outOffset, chunkProjection.shape
177-
);
178-
} else {
179-
MultiArrayUtils.copyRegion(readChunk(chunkCoords), chunkProjection.chunkOffset,
180-
outputArray, chunkProjection.outOffset, chunkProjection.shape
181-
);
182-
}
183-
184-
} catch (ZarrException e) {
185-
throw new RuntimeException(e);
186-
}
187-
});
180+
Stream<long[]> chunkStream = Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape));
181+
if (parallel) {
182+
chunkStream = chunkStream.parallel();
183+
}
184+
chunkStream.forEach(
185+
chunkCoords -> {
186+
try {
187+
final IndexingUtils.ChunkProjection chunkProjection =
188+
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
189+
shape
190+
);
191+
192+
if (chunkIsInArray(chunkCoords)) {
193+
MultiArrayUtils.copyRegion(metadata.allocateFillValueChunk(),
194+
chunkProjection.chunkOffset, outputArray, chunkProjection.outOffset,
195+
chunkProjection.shape
196+
);
197+
}
198+
199+
final String[] chunkKeys = metadata.chunkKeyEncoding.encodeChunkKey(chunkCoords);
200+
final StoreHandle chunkHandle = storeHandle.resolve(chunkKeys);
201+
202+
if (codecPipeline.supportsPartialDecode()) {
203+
final ucar.ma2.Array chunkArray = codecPipeline.decodePartial(chunkHandle,
204+
Utils.toLongArray(chunkProjection.chunkOffset), chunkProjection.shape);
205+
MultiArrayUtils.copyRegion(chunkArray, new int[metadata.ndim()], outputArray,
206+
chunkProjection.outOffset, chunkProjection.shape
207+
);
208+
} else {
209+
MultiArrayUtils.copyRegion(readChunk(chunkCoords), chunkProjection.chunkOffset,
210+
outputArray, chunkProjection.outOffset, chunkProjection.shape
211+
);
212+
}
213+
214+
} catch (ZarrException e) {
215+
throw new RuntimeException(e);
216+
}
217+
});
188218
return outputArray;
189219
}
190220

@@ -228,6 +258,7 @@ public ucar.ma2.Array readChunk(long[] chunkCoords)
228258
/**
229259
* Writes a ucar.ma2.Array into the Zarr array at the beginning of the Zarr array. The shape of
230260
* the Zarr array needs be large enough for the write.
261+
* Utilizes no parallelism.
231262
*
232263
* @param array
233264
*/
@@ -238,11 +269,37 @@ public void write(ucar.ma2.Array array) {
238269
/**
239270
* Writes a ucar.ma2.Array into the Zarr array at a specified offset. The shape of the Zarr array
240271
* needs be large enough for the write.
272+
* Utilizes no parallelism.
241273
*
242274
* @param offset
243275
* @param array
244276
*/
245277
public void write(long[] offset, ucar.ma2.Array array) {
278+
write(offset, array, false);
279+
}
280+
281+
/**
282+
* Writes a ucar.ma2.Array into the Zarr array at the beginning of the Zarr array. The shape of
283+
* the Zarr array needs be large enough for the write.
284+
*
285+
* @param array
286+
* @param parallel
287+
*/
288+
public void write(ucar.ma2.Array array, boolean parallel) {
289+
write(new long[metadata.ndim()], array, parallel);
290+
}
291+
292+
293+
294+
/**
295+
* Writes a ucar.ma2.Array into the Zarr array at a specified offset. The shape of the Zarr array
296+
* needs be large enough for the write.
297+
*
298+
* @param offset
299+
* @param array
300+
* @param parallel
301+
*/
302+
public void write(long[] offset, ucar.ma2.Array array, boolean parallel) {
246303
if (offset.length != metadata.ndim()) {
247304
throw new IllegalArgumentException("'offset' needs to have rank '" + metadata.ndim() + "'.");
248305
}
@@ -253,34 +310,37 @@ public void write(long[] offset, ucar.ma2.Array array) {
253310
int[] shape = array.getShape();
254311

255312
final int[] chunkShape = metadata.chunkShape();
256-
Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape))
257-
.forEach(
258-
chunkCoords -> {
259-
try {
260-
final IndexingUtils.ChunkProjection chunkProjection =
261-
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
262-
shape
263-
);
264-
265-
ucar.ma2.Array chunkArray;
266-
if (IndexingUtils.isFullChunk(chunkProjection.chunkOffset, chunkProjection.shape,
267-
chunkShape
268-
)) {
269-
chunkArray = array.sectionNoReduce(chunkProjection.outOffset,
270-
chunkProjection.shape,
271-
null
272-
);
273-
} else {
274-
chunkArray = readChunk(chunkCoords);
275-
MultiArrayUtils.copyRegion(array, chunkProjection.outOffset, chunkArray,
276-
chunkProjection.chunkOffset, chunkProjection.shape
277-
);
278-
}
279-
writeChunk(chunkCoords, chunkArray);
280-
} catch (ZarrException | InvalidRangeException e) {
281-
throw new RuntimeException(e);
282-
}
283-
});
313+
Stream<long[]> chunkStream = Arrays.stream(IndexingUtils.computeChunkCoords(metadata.shape, chunkShape, offset, shape));
314+
if(parallel) {
315+
chunkStream = chunkStream.parallel();
316+
}
317+
chunkStream.forEach(
318+
chunkCoords -> {
319+
try {
320+
final IndexingUtils.ChunkProjection chunkProjection =
321+
IndexingUtils.computeProjection(chunkCoords, metadata.shape, chunkShape, offset,
322+
shape
323+
);
324+
325+
ucar.ma2.Array chunkArray;
326+
if (IndexingUtils.isFullChunk(chunkProjection.chunkOffset, chunkProjection.shape,
327+
chunkShape
328+
)) {
329+
chunkArray = array.sectionNoReduce(chunkProjection.outOffset,
330+
chunkProjection.shape,
331+
null
332+
);
333+
} else {
334+
chunkArray = readChunk(chunkCoords);
335+
MultiArrayUtils.copyRegion(array, chunkProjection.outOffset, chunkArray,
336+
chunkProjection.chunkOffset, chunkProjection.shape
337+
);
338+
}
339+
writeChunk(chunkCoords, chunkArray);
340+
} catch (ZarrException | InvalidRangeException e) {
341+
throw new RuntimeException(e);
342+
}
343+
});
284344
}
285345

286346
/**
@@ -427,6 +487,5 @@ public void write(@Nonnull ucar.ma2.Array content) throws ZarrException {
427487
}
428488
array.write(offset, content);
429489
}
430-
431490
}
432491
}

src/test/java/dev/zarr/zarrjava/ZarrTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ public static void clearTestoutputFolder() throws IOException {
7171
"sharding,start", "sharding,end",
7272
"sharding_nested,_",
7373
"crc32c,_",
74-
}) public void testReadFromZarrita(String codec, String codecParam) throws IOException, ZarrException, InterruptedException {
74+
})
75+
public void testReadFromZarrita(String codec, String codecParam) throws IOException, ZarrException, InterruptedException {
7576
String command = pythonPath();
7677
ProcessBuilder pb = new ProcessBuilder(command, PYTHON_TEST_PATH.resolve("zarrita_write.py").toString(), codec, codecParam, TESTOUTPUT.toString());
7778
Process process = pb.start();
@@ -563,5 +564,26 @@ public void testV2() throws IOException {
563564
System.out.println(dev.zarr.zarrjava.v2.Array.open(httpStore.resolve("l4_sample", "color", "1")));
564565
}
565566

567+
@ParameterizedTest
568+
@ValueSource(booleans = {false,true})
569+
public void testParallel(boolean useParallel) throws IOException, ZarrException {
570+
int[] testData = new int[512 * 512 * 512];
571+
Arrays.setAll(testData, p -> p);
566572

573+
StoreHandle storeHandle = new FilesystemStore(TESTOUTPUT).resolve("testParallelRead");
574+
ArrayMetadata metadata = Array.metadataBuilder()
575+
.withShape(512, 512, 512)
576+
.withDataType(DataType.UINT32)
577+
.withChunkShape(100, 100, 100)
578+
.withFillValue(0)
579+
.build();
580+
Array writeArray = Array.create(storeHandle, metadata);
581+
writeArray.write(ucar.ma2.Array.factory(ucar.ma2.DataType.UINT, new int[]{512, 512, 512}, testData), useParallel);
582+
583+
Array readArray = Array.open(storeHandle);
584+
ucar.ma2.Array result = readArray.read(useParallel);
585+
586+
Assertions.assertArrayEquals(testData, (int[]) result.get1DJavaArray(ucar.ma2.DataType.INT));
587+
clearTestoutputFolder();
588+
}
567589
}

0 commit comments

Comments
 (0)