Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.cloud.bigquery.storage.v1.ReadStream;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
Expand All @@ -41,6 +42,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -72,6 +74,7 @@ abstract class BigQueryStorageSourceBase<T> extends BoundedSource<T> {
protected final Coder<T> outputCoder;
protected final BigQueryServices bqServices;
private final @Nullable TimestampPrecision picosTimestampPrecision;
private boolean emptyOrPruned = false;

BigQueryStorageSourceBase(
@Nullable DataFormat format,
Expand Down Expand Up @@ -179,8 +182,10 @@ public List<BigQueryStorageStreamSource<T>> split(
if (readSession.getStreamsList().isEmpty()) {
LOG.info(
"Returned stream list is empty. The underlying table is empty or all rows have been pruned.");
emptyOrPruned = true;
return ImmutableList.of();
} else {
emptyOrPruned = false;
LOG.info("Read session returned {} streams", readSession.getStreamsList().size());
}

Expand All @@ -203,9 +208,51 @@ public List<BigQueryStorageStreamSource<T>> split(

@Override
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
if (emptyOrPruned) {
// When split() returns an empty list, UnboundedReadFromBoundedSource falls back to wrapping
// the original unsplit source directly (ImmutableList.of(bigQuerySotrageSourceBase)) so we
// need to return empty reader.
return new EmptyReader<>(this);
}
throw new UnsupportedOperationException("BigQuery storage source must be split before reading");
}

private static class EmptyReader<T> extends BoundedReader<T> {
private final BigQueryStorageSourceBase<T> source;

EmptyReader(BigQueryStorageSourceBase<T> source) {
this.source = source;
}

@Override
public boolean start() throws IOException {
return false;
}

@Override
public boolean advance() throws IOException {
return false;
}

@Override
public T getCurrent() throws NoSuchElementException {
throw new NoSuchElementException();
}

@Override
public Instant getCurrentTimestamp() throws NoSuchElementException {
throw new NoSuchElementException();
}

@Override
public void close() throws IOException {}

@Override
public BoundedSource<T> getCurrentSource() {
return source;
}
}

private void setPicosTimestampPrecision(
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat dataFormat) {
if (picosTimestampPrecision == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
Expand All @@ -120,6 +121,7 @@
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -705,6 +707,49 @@ public void testTableSourceCreateReader() throws Exception {
tableSource.createReader(options);
}

@Test
public void testUnboundedReadFromBoundedSourceWithEmptyTable() throws Exception {
fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table");

Table table =
new Table().setTableReference(tableRef).setNumBytes(0L).setSchema(new TableSchema());

fakeDatasetService.createTable(table);

ReadSession emptyReadSession = ReadSession.newBuilder().build();
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.createReadSession(any())).thenReturn(emptyReadSession);

BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(tableRef),
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient));

// This simulates what happens in a streaming pipeline when BoundedSource is used
UnboundedSource<TableRow, ?> unboundedSource =
new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(tableSource);

// Initial split
List<? extends UnboundedSource<TableRow, ?>> splits = unboundedSource.split(1, options);
// Because tableSource.split returns empty list, BoundedToUnboundedSourceAdapter falls back to
// returning itself
assertEquals(1, splits.size());
UnboundedSource<TableRow, ?> splitSource = splits.get(0);

// Create reader
UnboundedSource.UnboundedReader<TableRow> reader = splitSource.createReader(options, null);

// This should NOT throw UnsupportedOperationException
assertFalse(reader.start());
}

private static GenericRecord createRecord(String name, Schema schema) {
GenericRecord genericRecord = new Record(schema);
genericRecord.put("name", name);
Expand Down
Loading