Skip to content

Commit 6b8ef13

Browse files
committed
Address reviewer comments.
1 parent deeb4c4 commit 6b8ef13

File tree

1 file changed

+18
-5
lines changed

1 file changed

+18
-5
lines changed

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,25 @@ public static <T> CachingStateIterable<T> readAllAndDecodeStartingFrom(
9494
valueCoder);
9595
}
9696

97+
/**
98+
* This adapter handles using the continuation token to provide iteration over all the elements
99+
* returned by the Beam Fn State API using the supplied state client, state request for the first
100+
* chunk of the state stream, and a value decoder, without caching support.
101+
*
102+
* @param beamFnStateClient A client for handling state requests.
103+
* @param stateRequestForFirstChunk A fully populated state request for the first (and possibly
104+
* only) chunk of a state stream. This state request will be populated with a continuation
105+
* token to request further chunks of the stream if required.
106+
* @param valueCoder A coder for decoding the state stream.
107+
*/
97108
public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom(
98109
BeamFnStateClient beamFnStateClient,
99110
StateRequest stateRequestForFirstChunk,
100111
Coder<T> valueCoder) {
101112
return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder);
102113
}
103114

104-
static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> {
115+
private static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> {
105116
private final BeamFnStateClient beamFnStateClient;
106117
private final StateRequest stateRequestForFirstChunk;
107118
private final Coder<T> valueCoder;
@@ -124,14 +135,14 @@ public PrefetchableIterator<T> createIterator() {
124135

125136
private static class DecodingIterator<T> extends AbstractIterator<T>
126137
implements PrefetchableIterator<T> {
127-
PrefetchableIterator<ByteString> chunkIterator;
128-
InputStream currentChunk;
129-
Coder<T> valueCoder;
138+
private final PrefetchableIterator<ByteString> chunkIterator;
139+
private final Coder<T> valueCoder;
140+
private InputStream currentChunk;
130141

131142
public DecodingIterator(PrefetchableIterator<ByteString> chunkIterator, Coder<T> valueCoder) {
132143
this.chunkIterator = chunkIterator;
133-
this.currentChunk = ByteString.EMPTY.newInput();
134144
this.valueCoder = valueCoder;
145+
this.currentChunk = ByteString.EMPTY.newInput();
135146
}
136147

137148
@Override
@@ -157,6 +168,8 @@ public boolean isReady() {
157168
try {
158169
return currentChunk.available() > 0 || chunkIterator.isReady();
159170
} catch (IOException exn) {
171+
// Should never get here as ByteString.newInput() returns InputStreams
172+
// that don't do actual IO operations.
160173
throw new IllegalStateException(exn);
161174
}
162175
}

0 commit comments

Comments
 (0)