Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.sdk.coders.IterableLikeCoder;
Expand Down Expand Up @@ -90,8 +89,7 @@ public StateBackedIterable(
StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
this.prefix = prefix;
this.suffix =
StateFetchingIterators.readAllAndDecodeStartingFrom(
Caches.subCache(cache, stateKey), beamFnStateClient, request, elemCoder);
StateFetchingIterators.readAllAndDecodeStartingFrom(beamFnStateClient, request, elemCoder);
this.elemCoder = elemCoder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -46,6 +48,7 @@
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;

/**
* Adapters which convert a logical series of chunks using continuation tokens over the Beam Fn
Expand Down Expand Up @@ -91,6 +94,82 @@ public static <T> CachingStateIterable<T> readAllAndDecodeStartingFrom(
valueCoder);
}

public static <T> UncachedStateIterable<T> readAllAndDecodeStartingFrom(
BeamFnStateClient beamFnStateClient,
StateRequest stateRequestForFirstChunk,
Coder<T> valueCoder) {
return new UncachedStateIterable<>(beamFnStateClient, stateRequestForFirstChunk, valueCoder);
}

static class UncachedStateIterable<T> extends PrefetchableIterables.Default<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this isn't used for state anywhere (I don't think it is) we probably don't even need to use the Prefethable* interfaces at all, simplifying things further.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to refer to the StateBackedIterable class itself. StateFetchingIterators are definitely used elsewhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry still not sure I'm understanding. Are you suggesting just inlining this class in StateBackedIterable to avoid having it use the interfaces?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant, e.g. we could probably change declaration at https://github.com/apache/beam/blob/release-2.60.0/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateBackedIterable.java#L77 to be just an Iterable, changing the constructor for StateBackedIterable[Coder] to not take a Cache at all, etc. But it's also not too much work to keep it as a Prefetchable iterable given the underlying iterable is.

private final BeamFnStateClient beamFnStateClient;
private final StateRequest stateRequestForFirstChunk;
private final Coder<T> valueCoder;

public UncachedStateIterable(
BeamFnStateClient beamFnStateClient,
StateRequest stateRequestForFirstChunk,
Coder<T> valueCoder) {
this.beamFnStateClient = beamFnStateClient;
this.stateRequestForFirstChunk = stateRequestForFirstChunk;
this.valueCoder = valueCoder;
}

@Override
public PrefetchableIterator<T> createIterator() {
return new DecodingIterator<T>(
new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequestForFirstChunk),
valueCoder);
}

private static class DecodingIterator<T> extends AbstractIterator<T>
implements PrefetchableIterator<T> {
PrefetchableIterator<ByteString> chunkIterator;
InputStream currentChunk;
Coder<T> valueCoder;

public DecodingIterator(PrefetchableIterator<ByteString> chunkIterator, Coder<T> valueCoder) {
this.chunkIterator = chunkIterator;
this.currentChunk = ByteString.EMPTY.newInput();
this.valueCoder = valueCoder;
}

@Override
protected T computeNext() {
try {
while (currentChunk.available() == 0) {
if (chunkIterator.hasNext()) {
currentChunk = chunkIterator.next().newInput();
} else {
return endOfData();
}
}
return valueCoder.decode(currentChunk);
} catch (IOException exn) {
// Should never get here as ByteString.newInput() returns InputStreams
// that don't do actual IO operations.
throw new IllegalStateException(exn);
}
}

@Override
public boolean isReady() {
try {
return currentChunk.available() > 0 || chunkIterator.isReady();
} catch (IOException exn) {
throw new IllegalStateException(exn);
}
}

@Override
public void prefetch() {
if (!isReady()) {
chunkIterator.prefetch();
}
}
}
}

@VisibleForTesting
static class IterableCacheKey implements Weighted {

Expand Down
Loading