diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java index 45763c6ac14f..f2cadb8ee091 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java @@ -30,6 +30,7 @@ import com.google.cloud.bigquery.storage.v1.ReadStream; import java.io.IOException; import java.util.List; +import java.util.NoSuchElementException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; @@ -41,6 +42,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +74,7 @@ abstract class BigQueryStorageSourceBase extends BoundedSource { protected final Coder outputCoder; protected final BigQueryServices bqServices; private final @Nullable TimestampPrecision picosTimestampPrecision; + private boolean emptyOrPruned = false; BigQueryStorageSourceBase( @Nullable DataFormat format, @@ -179,8 +182,10 @@ public List> split( if (readSession.getStreamsList().isEmpty()) { LOG.info( "Returned stream list is empty. The underlying table is empty or all rows have been pruned."); + emptyOrPruned = true; return ImmutableList.of(); } else { + emptyOrPruned = false; LOG.info("Read session returned {} streams", readSession.getStreamsList().size()); } @@ -203,9 +208,51 @@ public List> split( @Override public BoundedReader createReader(PipelineOptions options) throws IOException { + if (emptyOrPruned) { + // When split() returns an empty list, UnboundedReadFromBoundedSource falls back to wrapping + // the original unsplit source directly (ImmutableList.of(bigQuerySotrageSourceBase)) so we + // need to return empty reader. + return new EmptyReader<>(this); + } throw new UnsupportedOperationException("BigQuery storage source must be split before reading"); } + private static class EmptyReader extends BoundedReader { + private final BigQueryStorageSourceBase source; + + EmptyReader(BigQueryStorageSourceBase source) { + this.source = source; + } + + @Override + public boolean start() throws IOException { + return false; + } + + @Override + public boolean advance() throws IOException { + return false; + } + + @Override + public T getCurrent() throws NoSuchElementException { + throw new NoSuchElementException(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + throw new NoSuchElementException(); + } + + @Override + public void close() throws IOException {} + + @Override + public BoundedSource getCurrentSource() { + return source; + } + } + private void setPicosTimestampPrecision( ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat dataFormat) { if (picosTimestampPrecision == null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java index db5777627a39..9eb4ba5e8a74 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java @@ -99,6 +99,7 @@ import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method; @@ -120,6 +121,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.construction.UnboundedReadFromBoundedSource; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -705,6 +707,49 @@ public void testTableSourceCreateReader() throws Exception { tableSource.createReader(options); } + @Test + public void testUnboundedReadFromBoundedSourceWithEmptyTable() throws Exception { + fakeDatasetService.createDataset("project-id", "dataset", "", "", null); + TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table"); + + Table table = + new Table().setTableReference(tableRef).setNumBytes(0L).setSchema(new TableSchema()); + + fakeDatasetService.createTable(table); + + ReadSession emptyReadSession = ReadSession.newBuilder().build(); + StorageClient fakeStorageClient = mock(StorageClient.class); + when(fakeStorageClient.createReadSession(any())).thenReturn(emptyReadSession); + + BigQueryStorageTableSource tableSource = + BigQueryStorageTableSource.create( + ValueProvider.StaticValueProvider.of(tableRef), + null, + null, + new TableRowParser(), + TableRowJsonCoder.of(), + new FakeBigQueryServices() + .withDatasetService(fakeDatasetService) + .withStorageClient(fakeStorageClient)); + + // This simulates what happens in a streaming pipeline when BoundedSource is used + UnboundedSource unboundedSource = + new UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter<>(tableSource); + + // Initial split + List> splits = unboundedSource.split(1, options); + // Because tableSource.split returns empty list, BoundedToUnboundedSourceAdapter falls back to + // returning itself + assertEquals(1, splits.size()); + UnboundedSource splitSource = splits.get(0); + + // Create reader + UnboundedSource.UnboundedReader reader = splitSource.createReader(options, null); + + // This should NOT throw UnsupportedOperationException + assertFalse(reader.start()); + } + private static GenericRecord createRecord(String name, Schema schema) { GenericRecord genericRecord = new Record(schema); genericRecord.put("name", name);