Skip to content

ReadData and DataBlockCodec#137

Merged
tpietzsch merged 67 commits intosaalfeldlab:masterfrom
tpietzsch:readdata
May 16, 2025
Merged

ReadData and DataBlockCodec#137
tpietzsch merged 67 commits intosaalfeldlab:masterfrom
tpietzsch:readdata

Conversation

@tpietzsch
Copy link
Copy Markdown
Collaborator

@tpietzsch tpietzsch commented Jan 29, 2025

Add DataBlock methods
void readData(ByteOrder, ReadData) and
void writeData(ByteOrder, OutputStream)
remove
ByteBuffer toByteBuffer() and
void readData(ByteBuffer).

Add Splittable.ReadData, etc.

Still not fully settled...

@cmhulbert
Copy link
Copy Markdown
Contributor

I think there is some overlap with this branch, I'll be back at work on Monday fully, so hopefully we can distill the important parts from these two. My branch is more focused on getting this working for sharding/codecs, but hopefully there is a lot of shared logic. I created a draft PR to better compare at #138

Some specific files that are likely to overlap:
SplitableData.java
SplitKeyValueAccessData.java
SplitByteBufferedData.java

@tpietzsch
Copy link
Copy Markdown
Collaborator Author

I'm still working on this, so please wait for a bit (just until later today, probably) before starting to reconcile.

In this PR, the ReadData is turning out more of an abstraction over byte[], ByteBuffer, InputStream, OutputStream. I'm changing the API to just always hand around ReadData. I'll write more about it when it has settled.
So that there is also SplittableReadData (extending ReadData) is not the important thing. It's more about cleaning up the DataBlock/Compression/BlockWriter/BlockReader API.

Probably we should set up a zoom meeting to discuss, later this week

@cmhulbert
Copy link
Copy Markdown
Contributor

Sounds good to me, thanks for taking a stab at this. I'll clean up some of what I have, and let me know when you have time to chat, would be happy to zoom

@tpietzsch
Copy link
Copy Markdown
Collaborator Author

@tpietzsch
Copy link
Copy Markdown
Collaborator Author

Ok, I'm happy now.

The ReadData abstraction is used throughout, now:

DataBlock de/serialization:

public interface DataBlock<T> {
    void readData(ReadData readData) throws IOException;
    ReadData writeData(ByteOrder byteOrder);
}

BytesCodec/Compression interface:

public interface BytesCodec {
    ReadData decode(ReadData readData, int decodedLength) throws IOException;
    ReadData encode(ReadData readData) throws IOException;
}

block reader:

    final ReadData data = ReadData.from(in)
                .decode(datasetAttributes.getCompression(), numBytes);
    dataBlock.readData(data);

and block writer:

    dataBlock.writeData(ByteOrder.BIG_ENDIAN)
                .encode(datasetAttributes.getCompression())
                .writeTo(out);

There is also extensive javadoc for ReadData, BytesCodec.

@axtimwalde @bogovicj @cmhulbert Let's set up a review session this week? I think this is ready to merge.

@tpietzsch tpietzsch marked this pull request as ready for review February 3, 2025 19:13
Copy link
Copy Markdown
Contributor

@bogovicj bogovicj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small comments / observations

*/
public void readData(final ByteBuffer buffer);
// TODO: rename? "serialize"? "write"?
ReadData writeData(ByteOrder byteOrder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer a different name for this. Alternatively, as we discussed maybe if we add a DataBlockCodec interface, this method may not even be needed.

  • serialize
  • encode
    • this reflects that DataBlocks are currently responsible for the encoding that will later be done by a codec

final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
final Index<CompressionType> annotationIndex = Index.load(CompressionType.class, classLoader);
for (final IndexItem<CompressionType> item : annotationIndex) {
System.out.println("item.className() = " + item.className());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops. no. that's left over from debugging....


public interface SplittableReadData extends ReadData {

ReadData split(final long offset, final long length) throws IOException;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible / do we want to be able to split at some offset and have the length go "to the end" of the ReadData?

maybe length = -1? (but that's forbidden in some places) or length = Long.MAX_VALUE ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, because so far I didn't need splitting at all, but it sounds like that could be useful!

If the ReadData knows its length, then you can just data.split(offset, data.length() - offset).

Split "to the end" would be helpful for ReadData that don't know their length (and to avoid having to retrieve it).
Maybe we just add ReadData split(long offset) without a length argument to split to the end.

At the moment, the javadoc of ReadData.splittable() says

The returned SplittableReadData has a known length and multiple inputStreams can be opened on it.

That contract should be changed then, probably.
Maybe we should add an explicit ReadData.materialize() method for that instead.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something related I came across while experimenting. How would you feel about a method limit(long)?
It should be equivalent to split(0,long), when splittable, but could be useful in other cases, specifically:

final InputStream ones = new InputStream() {
     @Override public int read() throws IOException { return 1; }
};

// This fails because `splittable` tries to read the whole (infinite) input stream which is reasonable.
ReadData firstTenBySpllit = ReadData.from(ones).splittable().split(0, 10);

// this could be a sensible alternative
ReadData firstTen = ReadData.from(ones).limit(10);

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is more difficult, I think.
Ideally ReadData should be considered immutable.
Conceptually, ReadData.split(...) shouldn't modify the ReadData that you call it on.

However

final InputStream sequence = new InputStream() {
	private int value = 0;
	@Override public int read() throws IOException { return value++; }
};

ReadData data = ReadData.from(sequence);
// data.inputStream().read() would be 0 here

ReadData firstTen = data.limit(10);
// data.inputStream().read() would be 10 here

Therefore, a limit() implementation that reads a bit of the InputStream turns data unusable.

Maybe then, limit(n) should be considered destructive and return a pair of ReadData, one for the first n bytes, and one for the rest.

That would be an actual "split" then.
And maybe we should re-consider naming...
The current split() is actually more a "view" than a "split". Maybe slice(...) would be relatively consistent with ByteBuffer terminology?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... but I totally agree that limit() seems useful!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see what you mean, good point.

Maybe then, limit(n) should be considered destructive and return a pair of ReadData, one for the first n bytes, and one for the rest.

The current split() is actually more a "view" than a "split". Maybe slice(...) would be relatively consistent with ByteBuffer terminology?

This makes sense to me - I'd be happy with something like:

final InputStream sequence = new InputStream() {
	private int value = 0;
	@Override public int read() throws IOException { return value++; }
};

ReadData data = ReadData.from(sequence);

SplitReadData split = data.split(10);  // basically a proposed "limit", a Pair<ReadData,ReadData>

ReadData firstTen = split.first();
ReadData lastInfinity = split.last();
ReadData view = firstTen.slice(5,5); // what is currently called "split"

@tpietzsch
Copy link
Copy Markdown
Collaborator Author

I separated de/serialization out of DataBlock<T> completely, into DataBlockCodec<T>. (I had started to do the "incremental" version that we had discussed, but in the end just did the whole thing.)
The DataBlock implementations are super basic now, e.g.

public class LongArrayDataBlock extends AbstractDataBlock<long[]> {
	public LongArrayDataBlock(final int[] size, final long[] gridPosition, final long[] data) {
		super(size, gridPosition, data, a -> a.length);
	}
}

public class StringDataBlock extends AbstractDataBlock<String[]> {
	public StringDataBlock(final int[] size, final long[] gridPosition, final String[] data) {
		super(size, gridPosition, data, a -> a.length);
	}
}

The DataBlockCodec interface looks like this:

public interface DataBlockCodec<T> {
	ReadData encode(DataBlock<T> dataBlock) throws IOException;
	DataBlock<T> decode(ReadData readData, long[] gridPosition) throws IOException;
}

That looks like it could be quite compatible with @bogovicj DataBlockInputStream ideas?
DataBlockCodec can be obtained from DatasetAttributes. (Which is very nice, because ZarrDatasetAttributes can override it, and consequently we can just use DefaultBlockReader/Writer for Zarr.)

As discussed in the last code review, I also did the following:

  • I removed ByteOrder from ReadData again.
  • I replaced the OutputStreamEncoder interface (bundling OutputStream and finish method) with a ProxyOutputStream wrapper that blocks the close() method. I couldn't use UnaryOperator<OutputStream> though, because I want to throw IOException.
  • I removed the ReadData.encode/decode methods that just called return Compression.encode/decode(this).
  • I inlined the BytesCodec interface into Compression (for now).

@tpietzsch tpietzsch changed the title SplittableReadData ReadData and DataBlockCodec Feb 19, 2025
@tpietzsch tpietzsch mentioned this pull request Feb 19, 2025
@tpietzsch
Copy link
Copy Markdown
Collaborator Author

tpietzsch commented Feb 19, 2025

I moved the SplittableReadData part to a separate PR #140, so that this one can be merged.
(The "splittable" part was unused and should only be relevant to Zarr v3 / sharding ...)

@bogovicj @cmhulbert @axtimwalde Can we merge this already?
We could also set up another code review for next week to look at the datails.

@cmhulbert
Copy link
Copy Markdown
Contributor

Thanks @tpietzsch! I'll plan to spend some time today and tomorrow to try and rebase my current branch off this, and see how it plays with the codecs, especially when chaining arbitrary codecs. That should hopefully expose any thing else we need. Would also be happy to chat some time next week

Copy link
Copy Markdown
Contributor

@bogovicj bogovicj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small ideas about names.

One bigger question about how LazyReadData works

Edit:
I may be slowly starting to understand LazyReadData

The point is that we don't have an output stream where the encoders are defined - their job is to create a ReadData from something else, and that ReadData can write itself to whatever output stream it is given.

}
}

static class ChunkHeader {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd call this BlockHeader since everything in N5 uses "block" not "chunk"

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

import java.io.InputStream;
import java.util.Arrays;

class ByteArraySplittableReadData implements ReadData {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe rename to ByteArrayReadData since we're not doing the splittable part yet?
I'm also happy to keep the name as is since the splitting will come soon.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave it as is. This class is anyway package-private, so it is easy to rename later if necessary

final byte[] serializedData = flattenedArray.getBytes(ENCODING);
new ChunkHeader(dataBlock.getSize(), serializedData.length).writeTo(out);
compression.encode(ReadData.from(serializedData)).writeTo(out);
out.flush();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out.flush() is called in all the encode methods here - should it be the responsibility of the OutputStreamOperator to flush? could it instead be the job of LazyReadData?

I'll make a note there too.

Edit: okay maybe I get it now. OutputStreamWriter is a functional interface and can't be flushed itself.
This leads to a different question.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, actually. I'll have another look at who should be responsible for flush().

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I revised where flush() happens. 👍


class LazyReadData implements ReadData {

LazyReadData(final OutputStreamWriter writer) {
Copy link
Copy Markdown
Contributor

@bogovicj bogovicj Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused here. What's the benefit of using this functional interface instead of an OutputStream directly?

Edit: I may get the point - comment above. (sorry for the spread out comments)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OutputStreamWriter is a way to defer the writing/generation/serialization (not sure what to best call it) of the data until it is needed and it is clear what the destination is.

The Compression implementations (most of them) use Input/OutputStream wrappers to do the compression. The DataBlockCodec::encode methods use these Compressions so they have to use OutputStream too. Because they don't take an OutputStream argument we cannot serialize into an OutputStream directly.

Instead DataBlockCodec::encode returns a LazyReadData that is not committed to any OutputStream yet --

  • Maybe someone needs serialized data as a byte[] array. In that case, the LazyReadData will create a ByteArrayOutputStream and use the OutputStreamWriter to populate it.
  • Maybe someone wants to write the serialized data into a FileOutputStream. In that case, the LazyReadData can pass that FileOutputStream to the OutputStreamWriter directly (without intermediate serialization into a byte[] array).

tpietzsch and others added 27 commits May 15, 2025 21:39
When data is requested from the LazyReadData, the LazyReadData will ask
its OutputStreamWriter to write the data to a ByteArrayOutputStream.

When the LazyReadData itself is written to an OutputStream, it will pass
that OutputStream to its OutputStreamWriter (without loading the data
into a byte[] array first).
avoids the Compression argument to encode/decode methods
This is in preparation for moving SplittableReadData into a separate PR
* refactor: don't pass `decodedLength` to ReadData

* refactor: move DataBlockFactory/DataBlockCodecFactory to N5Codecs

* feat: Add StringDataCodec, ObjectDataCodec,  StringDataBlockCodec, ObjectDataBlockCodec

DataCodecs now creat the access object and return it during `deserialize`

* refactor: add AbstractDataBlock to extract shared logic between Default/String/Object blocks

* refactor: DatasetAttributes responsible for DataBlockCodec creation

N5BlockCodec uses dataType (and potentially other DatasetAttributes to wrap the desired DataBlockCodec

* revert: add back createDataBlock logic in DataType

* doc: retain javadoc from before refactor

* revert: keep protected constructor with DataBlockCodec parameter

refactor: inline createDataBlockCodec from constructor params

* refactor: remove currently unused N5BlockCodec. Something like this may be needed when multiple codecs are supported

* refactor: dont expose N5Codecs internals

* refactor: rename encodeBlockHeader -> createBlockHeader

* feat: add ZarrStringDataCodec support
@tpietzsch tpietzsch merged commit 3949a69 into saalfeldlab:master May 16, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants