Skip to content

Commit cc8dc2f

Browse files
claudevdmClaude
andauthored
Add timestamp precision option to bigquery storage read for TIMESTAMP(12) columns. (#37079)
* initial * fix tests. * Fix test. * Make setting name more descriptive. * Comments. --------- Co-authored-by: Claude <[email protected]>
1 parent 423a3c3 commit cc8dc2f

File tree

7 files changed

+847
-14
lines changed

7 files changed

+847
-14
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,6 +1251,8 @@ abstract Builder<T> setBadRecordErrorHandler(
12511251
abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);
12521252

12531253
abstract Builder<T> setProjectionPushdownApplied(boolean projectionPushdownApplied);
1254+
1255+
abstract Builder<T> setDirectReadPicosTimestampPrecision(TimestampPrecision precision);
12541256
}
12551257

12561258
abstract @Nullable ValueProvider<String> getJsonTableRef();
@@ -1306,6 +1308,8 @@ abstract Builder<T> setBadRecordErrorHandler(
13061308

13071309
abstract boolean getProjectionPushdownApplied();
13081310

1311+
abstract @Nullable TimestampPrecision getDirectReadPicosTimestampPrecision();
1312+
13091313
/**
13101314
* An enumeration type for the priority of a query.
13111315
*
@@ -1381,7 +1385,8 @@ private BigQueryStorageQuerySource<T> createStorageQuerySource(
13811385
getFormat(),
13821386
getParseFn(),
13831387
outputCoder,
1384-
getBigQueryServices());
1388+
getBigQueryServices(),
1389+
getDirectReadPicosTimestampPrecision());
13851390
}
13861391

13871392
private static final String QUERY_VALIDATION_FAILURE_ERROR =
@@ -1525,7 +1530,12 @@ public PCollection<T> expand(PBegin input) {
15251530
if (selectedFields != null && selectedFields.isAccessible()) {
15261531
tableSchema = BigQueryUtils.trimSchema(tableSchema, selectedFields.get());
15271532
}
1528-
beamSchema = BigQueryUtils.fromTableSchema(tableSchema);
1533+
BigQueryUtils.SchemaConversionOptions.Builder builder =
1534+
BigQueryUtils.SchemaConversionOptions.builder();
1535+
if (getDirectReadPicosTimestampPrecision() != null) {
1536+
builder.setPicosecondTimestampMapping(getDirectReadPicosTimestampPrecision());
1537+
}
1538+
beamSchema = BigQueryUtils.fromTableSchema(tableSchema, builder.build());
15291539
}
15301540

15311541
final Coder<T> coder = inferCoder(p.getCoderRegistry());
@@ -1710,7 +1720,8 @@ private PCollection<T> expandForDirectRead(
17101720
getParseFn(),
17111721
outputCoder,
17121722
getBigQueryServices(),
1713-
getProjectionPushdownApplied())));
1723+
getProjectionPushdownApplied(),
1724+
getDirectReadPicosTimestampPrecision())));
17141725
if (beamSchema != null) {
17151726
rows.setSchema(
17161727
beamSchema,
@@ -1731,7 +1742,8 @@ private PCollection<T> expandForDirectRead(
17311742
getParseFn(),
17321743
outputCoder,
17331744
getBigQueryServices(),
1734-
getProjectionPushdownApplied());
1745+
getProjectionPushdownApplied(),
1746+
getDirectReadPicosTimestampPrecision());
17351747
List<? extends BoundedSource<T>> sources;
17361748
try {
17371749
// This splitting logic taken from the SDF implementation of Read
@@ -2293,6 +2305,18 @@ public TypedRead<T> withMethod(TypedRead.Method method) {
22932305
return toBuilder().setMethod(method).build();
22942306
}
22952307

2308+
/**
2309+
* Sets the timestamp precision to request for TIMESTAMP(12) BigQuery columns when reading via
2310+
* the Storage Read API.
2311+
*
2312+
* <p>This option only affects precision of TIMESTAMP(12) column reads using {@link
2313+
* Method#DIRECT_READ}. If not set the BQ client will return microsecond precision by default.
2314+
*/
2315+
public TypedRead<T> withDirectReadPicosTimestampPrecision(
2316+
TimestampPrecision timestampPrecision) {
2317+
return toBuilder().setDirectReadPicosTimestampPrecision(timestampPrecision).build();
2318+
}
2319+
22962320
/** See {@link DataFormat}. */
22972321
public TypedRead<T> withFormat(DataFormat format) {
22982322
return toBuilder().setFormat(format).build();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ static class BigQueryIOReadTranslator implements TransformPayloadTranslator<Type
109109
.addNullableBooleanField("projection_pushdown_applied")
110110
.addNullableByteArrayField("bad_record_router")
111111
.addNullableByteArrayField("bad_record_error_handler")
112+
.addNullableByteArrayField("direct_read_picos_timestamp_precision")
112113
.build();
113114

114115
public static final String BIGQUERY_READ_TRANSFORM_URN =
@@ -195,6 +196,11 @@ public Row toConfigRow(TypedRead<?> transform) {
195196
if (transform.getUseAvroLogicalTypes() != null) {
196197
fieldValues.put("use_avro_logical_types", transform.getUseAvroLogicalTypes());
197198
}
199+
if (transform.getDirectReadPicosTimestampPrecision() != null) {
200+
fieldValues.put(
201+
"direct_read_picos_timestamp_precision",
202+
toByteArray(transform.getDirectReadPicosTimestampPrecision()));
203+
}
198204
fieldValues.put("projection_pushdown_applied", transform.getProjectionPushdownApplied());
199205
fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter()));
200206
fieldValues.put(
@@ -293,6 +299,13 @@ public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
293299
if (formatBytes != null) {
294300
builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
295301
}
302+
byte[] timestampPrecisionBytes =
303+
configRow.getBytes("direct_read_picos_timestamp_precision");
304+
if (timestampPrecisionBytes != null) {
305+
builder =
306+
builder.setDirectReadPicosTimestampPrecision(
307+
(TimestampPrecision) fromByteArray(timestampPrecisionBytes));
308+
}
296309
Collection<String> selectedFields = configRow.getArray("selected_fields");
297310
if (selectedFields != null && !selectedFields.isEmpty()) {
298311
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,38 @@
4040
/** A {@link org.apache.beam.sdk.io.Source} representing reading the results of a query. */
4141
class BigQueryStorageQuerySource<T> extends BigQueryStorageSourceBase<T> {
4242

43+
public static <T> BigQueryStorageQuerySource<T> create(
44+
String stepUuid,
45+
ValueProvider<String> queryProvider,
46+
Boolean flattenResults,
47+
Boolean useLegacySql,
48+
QueryPriority priority,
49+
@Nullable String location,
50+
@Nullable String queryTempDataset,
51+
@Nullable String queryTempProject,
52+
@Nullable String kmsKey,
53+
@Nullable DataFormat format,
54+
SerializableFunction<SchemaAndRecord, T> parseFn,
55+
Coder<T> outputCoder,
56+
BigQueryServices bqServices,
57+
@Nullable TimestampPrecision picosTimestampPrecision) {
58+
return new BigQueryStorageQuerySource<>(
59+
stepUuid,
60+
queryProvider,
61+
flattenResults,
62+
useLegacySql,
63+
priority,
64+
location,
65+
queryTempDataset,
66+
queryTempProject,
67+
kmsKey,
68+
format,
69+
parseFn,
70+
outputCoder,
71+
bqServices,
72+
picosTimestampPrecision);
73+
}
74+
4375
public static <T> BigQueryStorageQuerySource<T> create(
4476
String stepUuid,
4577
ValueProvider<String> queryProvider,
@@ -67,7 +99,8 @@ public static <T> BigQueryStorageQuerySource<T> create(
6799
format,
68100
parseFn,
69101
outputCoder,
70-
bqServices);
102+
bqServices,
103+
/*picosTimestampPrecision=*/ null);
71104
}
72105

73106
public static <T> BigQueryStorageQuerySource<T> create(
@@ -94,7 +127,8 @@ public static <T> BigQueryStorageQuerySource<T> create(
94127
null,
95128
parseFn,
96129
outputCoder,
97-
bqServices);
130+
bqServices,
131+
/*picosTimestampPrecision=*/ null);
98132
}
99133

100134
private final String stepUuid;
@@ -123,8 +157,9 @@ private BigQueryStorageQuerySource(
123157
@Nullable DataFormat format,
124158
SerializableFunction<SchemaAndRecord, T> parseFn,
125159
Coder<T> outputCoder,
126-
BigQueryServices bqServices) {
127-
super(format, null, null, parseFn, outputCoder, bqServices);
160+
BigQueryServices bqServices,
161+
@Nullable TimestampPrecision picosTimestampPrecision) {
162+
super(format, null, null, parseFn, outputCoder, bqServices, picosTimestampPrecision);
128163
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
129164
this.queryProvider = checkNotNull(queryProvider, "queryProvider");
130165
this.flattenResults = checkNotNull(flattenResults, "flattenResults");

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.google.api.services.bigquery.model.Table;
2323
import com.google.api.services.bigquery.model.TableReference;
2424
import com.google.api.services.bigquery.model.TableSchema;
25+
import com.google.cloud.bigquery.storage.v1.ArrowSerializationOptions;
26+
import com.google.cloud.bigquery.storage.v1.AvroSerializationOptions;
2527
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
2628
import com.google.cloud.bigquery.storage.v1.DataFormat;
2729
import com.google.cloud.bigquery.storage.v1.ReadSession;
@@ -69,20 +71,23 @@ abstract class BigQueryStorageSourceBase<T> extends BoundedSource<T> {
6971
protected final SerializableFunction<SchemaAndRecord, T> parseFn;
7072
protected final Coder<T> outputCoder;
7173
protected final BigQueryServices bqServices;
74+
private final @Nullable TimestampPrecision picosTimestampPrecision;
7275

7376
BigQueryStorageSourceBase(
7477
@Nullable DataFormat format,
7578
@Nullable ValueProvider<List<String>> selectedFieldsProvider,
7679
@Nullable ValueProvider<String> rowRestrictionProvider,
7780
SerializableFunction<SchemaAndRecord, T> parseFn,
7881
Coder<T> outputCoder,
79-
BigQueryServices bqServices) {
82+
BigQueryServices bqServices,
83+
@Nullable TimestampPrecision picosTimestampPrecision) {
8084
this.format = format;
8185
this.selectedFieldsProvider = selectedFieldsProvider;
8286
this.rowRestrictionProvider = rowRestrictionProvider;
8387
this.parseFn = checkNotNull(parseFn, "parseFn");
8488
this.outputCoder = checkNotNull(outputCoder, "outputCoder");
8589
this.bqServices = checkNotNull(bqServices, "bqServices");
90+
this.picosTimestampPrecision = picosTimestampPrecision;
8691
}
8792

8893
/**
@@ -131,11 +136,12 @@ public List<BigQueryStorageStreamSource<T>> split(
131136
if (rowRestrictionProvider != null && rowRestrictionProvider.isAccessible()) {
132137
tableReadOptionsBuilder.setRowRestriction(rowRestrictionProvider.get());
133138
}
134-
readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
135139

136140
if (format != null) {
137141
readSessionBuilder.setDataFormat(format);
142+
setPicosTimestampPrecision(tableReadOptionsBuilder, format);
138143
}
144+
readSessionBuilder.setReadOptions(tableReadOptionsBuilder);
139145

140146
// Setting the requested max stream count to 0, implies that the Read API backend will select
141147
// an appropriate number of streams for the Session to produce reasonable throughput.
@@ -199,4 +205,61 @@ public List<BigQueryStorageStreamSource<T>> split(
199205
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
200206
throw new UnsupportedOperationException("BigQuery storage source must be split before reading");
201207
}
208+
209+
private void setPicosTimestampPrecision(
210+
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder, DataFormat dataFormat) {
211+
if (picosTimestampPrecision == null) {
212+
return;
213+
}
214+
215+
if (dataFormat == DataFormat.ARROW) {
216+
setArrowTimestampPrecision(tableReadOptionsBuilder, picosTimestampPrecision);
217+
} else if (dataFormat == DataFormat.AVRO) {
218+
setAvroTimestampPrecision(tableReadOptionsBuilder, picosTimestampPrecision);
219+
}
220+
}
221+
222+
private static void setArrowTimestampPrecision(
223+
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
224+
TimestampPrecision timestampPrecision) {
225+
ArrowSerializationOptions.PicosTimestampPrecision precision;
226+
switch (timestampPrecision) {
227+
case MICROS:
228+
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
229+
break;
230+
case NANOS:
231+
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
232+
break;
233+
case PICOS:
234+
precision = ArrowSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
235+
break;
236+
default:
237+
throw new IllegalArgumentException(
238+
"Unsupported timestamp precision for Storage Read API: " + timestampPrecision);
239+
}
240+
tableReadOptionsBuilder.setArrowSerializationOptions(
241+
ArrowSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
242+
}
243+
244+
private static void setAvroTimestampPrecision(
245+
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder,
246+
TimestampPrecision timestampPrecision) {
247+
AvroSerializationOptions.PicosTimestampPrecision precision;
248+
switch (timestampPrecision) {
249+
case MICROS:
250+
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_MICROS;
251+
break;
252+
case NANOS:
253+
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_NANOS;
254+
break;
255+
case PICOS:
256+
precision = AvroSerializationOptions.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS;
257+
break;
258+
default:
259+
throw new IllegalArgumentException(
260+
"Unsupported timestamp precision for Storage Read API: " + timestampPrecision);
261+
}
262+
tableReadOptionsBuilder.setAvroSerializationOptions(
263+
AvroSerializationOptions.newBuilder().setPicosTimestampPrecision(precision));
264+
}
202265
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public static <T> BigQueryStorageTableSource<T> create(
6565
parseFn,
6666
outputCoder,
6767
bqServices,
68-
projectionPushdownApplied);
68+
projectionPushdownApplied,
69+
/*picosTimestampPrecision=*/ null);
6970
}
7071

7172
public static <T> BigQueryStorageTableSource<T> create(
@@ -83,7 +84,30 @@ public static <T> BigQueryStorageTableSource<T> create(
8384
parseFn,
8485
outputCoder,
8586
bqServices,
86-
false);
87+
/*projectionPushdownApplied=*/ false,
88+
/*picosTimestampPrecision=*/ null);
89+
}
90+
91+
public static <T> BigQueryStorageTableSource<T> create(
92+
ValueProvider<TableReference> tableRefProvider,
93+
DataFormat format,
94+
@Nullable ValueProvider<List<String>> selectedFields,
95+
@Nullable ValueProvider<String> rowRestriction,
96+
SerializableFunction<SchemaAndRecord, T> parseFn,
97+
Coder<T> outputCoder,
98+
BigQueryServices bqServices,
99+
boolean projectionPushdownApplied,
100+
@Nullable TimestampPrecision picosTimestampPrecision) {
101+
return new BigQueryStorageTableSource<>(
102+
tableRefProvider,
103+
format,
104+
selectedFields,
105+
rowRestriction,
106+
parseFn,
107+
outputCoder,
108+
bqServices,
109+
projectionPushdownApplied,
110+
picosTimestampPrecision);
87111
}
88112

89113
private BigQueryStorageTableSource(
@@ -94,8 +118,16 @@ private BigQueryStorageTableSource(
94118
SerializableFunction<SchemaAndRecord, T> parseFn,
95119
Coder<T> outputCoder,
96120
BigQueryServices bqServices,
97-
boolean projectionPushdownApplied) {
98-
super(format, selectedFields, rowRestriction, parseFn, outputCoder, bqServices);
121+
boolean projectionPushdownApplied,
122+
@Nullable TimestampPrecision picosTimestampPrecision) {
123+
super(
124+
format,
125+
selectedFields,
126+
rowRestriction,
127+
parseFn,
128+
outputCoder,
129+
bqServices,
130+
picosTimestampPrecision);
99131
this.tableReferenceProvider = checkNotNull(tableRefProvider, "tableRefProvider");
100132
this.projectionPushdownApplied = projectionPushdownApplied;
101133
cachedTable = new AtomicReference<>();

0 commit comments

Comments
 (0)