Skip to content

Commit f967bde

Browse files
claudevdmClaude
andauthored
Bq timestamp schema conversion (#36986)
* stash * done. * Add test. * Comments. * comments. * Comments. --------- Co-authored-by: Claude <[email protected]>
1 parent 48869d2 commit f967bde

File tree

3 files changed

+262
-9
lines changed

3 files changed

+262
-9
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
6868
import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
6969
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
70+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
7071
import org.apache.beam.sdk.transforms.SerializableFunction;
7172
import org.apache.beam.sdk.transforms.SerializableFunctions;
7273
import org.apache.beam.sdk.util.Preconditions;
@@ -157,15 +158,34 @@ public abstract static class SchemaConversionOptions implements Serializable {
157158
*/
158159
public abstract boolean getInferMaps();
159160

161+
/**
162+
* Controls how BigQuery {@code TIMESTAMP(12)} (picosecond precision) columns are mapped to Beam
163+
* schema types.
164+
*
165+
* <p>Standard TIMESTAMP(6) columns are mapped to FieldType.DATETIME, which only support up to
166+
* millisecond precision. This option allows mapping TIMESTAMP(12) columns to logical types
167+
* Timestamp.MILLIS, Timestamp.MICROS, Timestamp.NANOS or preserve full picosecond precision as
168+
* a STRING type.
169+
*
170+
* <p>This option has no effect on {@code TIMESTAMP(6)} (microsecond) columns.
171+
*
172+
* <p>Defaults to {@link TimestampPrecision#NANOS}.
173+
*/
174+
public abstract TimestampPrecision getPicosecondTimestampMapping();
175+
160176
public static Builder builder() {
161-
return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder().setInferMaps(false);
177+
return new AutoValue_BigQueryUtils_SchemaConversionOptions.Builder()
178+
.setInferMaps(false)
179+
.setPicosecondTimestampMapping(TimestampPrecision.NANOS);
162180
}
163181

164182
/** Builder for {@link SchemaConversionOptions}. */
165183
@AutoValue.Builder
166184
public abstract static class Builder {
167185
public abstract Builder setInferMaps(boolean inferMaps);
168186

187+
public abstract Builder setPicosecondTimestampMapping(TimestampPrecision conversion);
188+
169189
public abstract SchemaConversionOptions build();
170190
}
171191
}
@@ -256,6 +276,21 @@ public abstract static class Builder {
256276
.toFormatter();
257277
}
258278

279+
private static final java.time.format.DateTimeFormatter VAR_PRECISION_FORMATTER;
280+
281+
static {
282+
VAR_PRECISION_FORMATTER =
283+
new java.time.format.DateTimeFormatterBuilder()
284+
.appendPattern("yyyy-MM-dd HH:mm:ss")
285+
286+
// Variable Nano-of-second (0 to 9 digits)
287+
// The 'true' argument means: "Expect a decimal point only if fractions exist"
288+
.appendFraction(java.time.temporal.ChronoField.NANO_OF_SECOND, 0, 9, true)
289+
.appendLiteral(" UTC")
290+
.toFormatter()
291+
.withZone(java.time.ZoneId.of("UTC"));
292+
}
293+
259294
private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING =
260295
ImmutableMap.<TypeName, StandardSQLTypeName>builder()
261296
.put(TypeName.BYTE, StandardSQLTypeName.INT64)
@@ -350,14 +385,15 @@ static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {
350385
*
351386
* <p>Supports both standard and legacy SQL types.
352387
*
353-
* @param typeName Name of the type returned by {@link TableFieldSchema#getType()}
388+
* @param schema Schema of the type returned
354389
* @param nestedFields Nested fields for the given type (eg. RECORD type)
355390
* @return Corresponding Beam {@link FieldType}
356391
*/
357392
private static FieldType fromTableFieldSchemaType(
358-
String typeName, List<TableFieldSchema> nestedFields, SchemaConversionOptions options) {
393+
TableFieldSchema schema, SchemaConversionOptions options) {
359394
// see
360395
// https://googleapis.dev/java/google-api-services-bigquery/latest/com/google/api/services/bigquery/model/TableFieldSchema.html#getType--
396+
String typeName = schema.getType();
361397
switch (typeName) {
362398
case "STRING":
363399
return FieldType.STRING;
@@ -373,7 +409,26 @@ private static FieldType fromTableFieldSchemaType(
373409
case "BOOL":
374410
return FieldType.BOOLEAN;
375411
case "TIMESTAMP":
376-
return FieldType.DATETIME;
412+
// Timestamp columns can only have 6 (micros) or 12 (picos) precision.
413+
// BigQuerySchema currently returns null for all microsecond timestamp
414+
// columns but this cannot be guaranteed forever.
415+
if ((schema.getTimestampPrecision() == null)
416+
|| Long.valueOf(6L).equals(schema.getTimestampPrecision())) {
417+
return FieldType.DATETIME;
418+
}
419+
switch (options.getPicosecondTimestampMapping()) {
420+
case MILLIS:
421+
return FieldType.logicalType(Timestamp.MILLIS);
422+
case MICROS:
423+
return FieldType.logicalType(Timestamp.MICROS);
424+
case NANOS:
425+
return FieldType.logicalType(Timestamp.NANOS);
426+
case PICOS:
427+
return FieldType.STRING;
428+
default:
429+
throw new UnsupportedOperationException(
430+
"Converting BigQuery type " + typeName + " to Beam type is unsupported");
431+
}
377432
case "DATE":
378433
return FieldType.logicalType(SqlTypes.DATE);
379434
case "TIME":
@@ -389,14 +444,14 @@ private static FieldType fromTableFieldSchemaType(
389444
return FieldType.STRING;
390445
case "RECORD":
391446
case "STRUCT":
447+
List<TableFieldSchema> nestedFields = schema.getFields();
392448
if (options.getInferMaps() && nestedFields.size() == 2) {
393449
TableFieldSchema key = nestedFields.get(0);
394450
TableFieldSchema value = nestedFields.get(1);
395451
if (BIGQUERY_MAP_KEY_FIELD_NAME.equals(key.getName())
396452
&& BIGQUERY_MAP_VALUE_FIELD_NAME.equals(value.getName())) {
397453
return FieldType.map(
398-
fromTableFieldSchemaType(key.getType(), key.getFields(), options),
399-
fromTableFieldSchemaType(value.getType(), value.getFields(), options));
454+
fromTableFieldSchemaType(key, options), fromTableFieldSchemaType(value, options));
400455
}
401456
}
402457
Schema rowSchema = fromTableFieldSchema(nestedFields, options);
@@ -412,9 +467,7 @@ private static Schema fromTableFieldSchema(
412467
List<TableFieldSchema> tableFieldSchemas, SchemaConversionOptions options) {
413468
Schema.Builder schemaBuilder = Schema.builder();
414469
for (TableFieldSchema tableFieldSchema : tableFieldSchemas) {
415-
FieldType fieldType =
416-
fromTableFieldSchemaType(
417-
tableFieldSchema.getType(), tableFieldSchema.getFields(), options);
470+
FieldType fieldType = fromTableFieldSchemaType(tableFieldSchema, options);
418471

419472
Optional<Mode> fieldMode = Optional.ofNullable(tableFieldSchema.getMode()).map(Mode::valueOf);
420473
if (fieldMode.filter(m -> m == Mode.REPEATED).isPresent()
@@ -703,6 +756,8 @@ public static TableRow toTableRow(Row row) {
703756
java.time.format.DateTimeFormatter localDateTimeFormatter =
704757
(0 == localDateTime.getNano()) ? ISO_LOCAL_DATE_TIME : BIGQUERY_DATETIME_FORMATTER;
705758
return localDateTimeFormatter.format(localDateTime);
759+
} else if (Timestamp.IDENTIFIER.equals(fieldType.getLogicalType().getIdentifier())) {
760+
return BigQueryAvroUtils.formatTimestamp((java.time.Instant) fieldValue);
706761
} else if ("Enum".equals(identifier)) {
707762
return fieldType
708763
.getLogicalType(EnumerationType.class)
@@ -803,6 +858,8 @@ public static Row toBeamRow(Schema rowSchema, TableSchema bqSchema, TableRow jso
803858
} catch (NumberFormatException e) {
804859
return java.time.Instant.parse(jsonBQString);
805860
}
861+
} else if (fieldType.isLogicalType(Timestamp.IDENTIFIER)) {
862+
return VAR_PRECISION_FORMATTER.parse(jsonBQString, java.time.Instant::from);
806863
}
807864
}
808865

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.bigquery;
19+
20+
/** Specifies Timestamp precision. */
21+
public enum TimestampPrecision {
22+
MILLIS,
23+
MICROS,
24+
NANOS,
25+
PICOS
26+
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.beam.sdk.schemas.Schema.FieldType;
5656
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
5757
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
58+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
5859
import org.apache.beam.sdk.values.Row;
5960
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
6061
import org.joda.time.DateTime;
@@ -1294,4 +1295,173 @@ public void testTrimSchema() {
12941295
BigQueryUtils.trimSchema(BQ_ROW_TYPE, Arrays.asList("row.id", "row.value", "row.name")));
12951296
}
12961297
}
1298+
1299+
@Test
1300+
public void testFromTableSchema_timestampPrecision12_defaultToNanos() {
1301+
TableFieldSchema picosTimestamp =
1302+
new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L);
1303+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp));
1304+
1305+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema);
1306+
1307+
assertEquals(
1308+
Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.NANOS)).build(),
1309+
beamSchema);
1310+
}
1311+
1312+
@Test
1313+
public void testFromTableSchema_timestampPrecision12_millis() {
1314+
TableFieldSchema picosTimestamp =
1315+
new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L);
1316+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp));
1317+
1318+
BigQueryUtils.SchemaConversionOptions options =
1319+
BigQueryUtils.SchemaConversionOptions.builder()
1320+
.setPicosecondTimestampMapping(TimestampPrecision.MILLIS)
1321+
.build();
1322+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options);
1323+
1324+
assertEquals(
1325+
Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.MILLIS)).build(),
1326+
beamSchema);
1327+
}
1328+
1329+
@Test
1330+
public void testFromTableSchema_timestampPrecision12_micros() {
1331+
TableFieldSchema picosTimestamp =
1332+
new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L);
1333+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp));
1334+
1335+
BigQueryUtils.SchemaConversionOptions options =
1336+
BigQueryUtils.SchemaConversionOptions.builder()
1337+
.setPicosecondTimestampMapping(TimestampPrecision.MICROS)
1338+
.build();
1339+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options);
1340+
1341+
assertEquals(
1342+
Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.MICROS)).build(),
1343+
beamSchema);
1344+
}
1345+
1346+
@Test
1347+
public void testFromTableSchema_timestampPrecision12_nanos() {
1348+
TableFieldSchema picosTimestamp =
1349+
new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L);
1350+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp));
1351+
1352+
BigQueryUtils.SchemaConversionOptions options =
1353+
BigQueryUtils.SchemaConversionOptions.builder()
1354+
.setPicosecondTimestampMapping(TimestampPrecision.NANOS)
1355+
.build();
1356+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options);
1357+
1358+
assertEquals(
1359+
Schema.builder().addNullableField("ts", FieldType.logicalType(Timestamp.NANOS)).build(),
1360+
beamSchema);
1361+
}
1362+
1363+
@Test
1364+
public void testFromTableSchema_timestampPrecision12_picos() {
1365+
TableFieldSchema picosTimestamp =
1366+
new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(12L);
1367+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(picosTimestamp));
1368+
1369+
BigQueryUtils.SchemaConversionOptions options =
1370+
BigQueryUtils.SchemaConversionOptions.builder()
1371+
.setPicosecondTimestampMapping(TimestampPrecision.PICOS)
1372+
.build();
1373+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options);
1374+
1375+
assertEquals(Schema.builder().addNullableField("ts", FieldType.STRING).build(), beamSchema);
1376+
}
1377+
1378+
@Test
1379+
public void testFromTableSchema_timestampPrecision6_ignoredOption() {
1380+
// Standard microsecond precision should ignore the picosecond conversion option
1381+
TableFieldSchema microsTimestamp =
1382+
new TableFieldSchema().setName("ts").setType("TIMESTAMP").setTimestampPrecision(6L);
1383+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(microsTimestamp));
1384+
1385+
BigQueryUtils.SchemaConversionOptions options =
1386+
BigQueryUtils.SchemaConversionOptions.builder()
1387+
.setPicosecondTimestampMapping(TimestampPrecision.PICOS)
1388+
.build();
1389+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema, options);
1390+
1391+
assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema);
1392+
}
1393+
1394+
@Test
1395+
public void testFromTableSchema_timestampNullPrecision_defaultsToDatetime() {
1396+
// Null precision should default to DATETIME (backwards compatibility)
1397+
TableFieldSchema timestamp = new TableFieldSchema().setName("ts").setType("TIMESTAMP");
1398+
TableSchema bqSchema = new TableSchema().setFields(Arrays.asList(timestamp));
1399+
1400+
Schema beamSchema = BigQueryUtils.fromTableSchema(bqSchema);
1401+
1402+
assertEquals(Schema.builder().addNullableField("ts", FieldType.DATETIME).build(), beamSchema);
1403+
}
1404+
1405+
@Test
1406+
@SuppressWarnings("JavaInstantGetSecondsGetNano")
1407+
public void testToBeamRow_timestampNanos_utcSuffix() {
1408+
Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build();
1409+
1410+
// BigQuery format with " UTC" suffix
1411+
String timestamp = "2024-08-10 16:52:07.123456789 UTC";
1412+
1413+
Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp));
1414+
1415+
java.time.Instant actual = (java.time.Instant) beamRow.getValue("ts");
1416+
assertEquals(2024, actual.atZone(java.time.ZoneOffset.UTC).getYear());
1417+
assertEquals(8, actual.atZone(java.time.ZoneOffset.UTC).getMonthValue());
1418+
assertEquals(10, actual.atZone(java.time.ZoneOffset.UTC).getDayOfMonth());
1419+
assertEquals(16, actual.atZone(java.time.ZoneOffset.UTC).getHour());
1420+
assertEquals(52, actual.atZone(java.time.ZoneOffset.UTC).getMinute());
1421+
assertEquals(7, actual.atZone(java.time.ZoneOffset.UTC).getSecond());
1422+
assertEquals(123456789, actual.getNano());
1423+
}
1424+
1425+
@Test
1426+
@SuppressWarnings("JavaInstantGetSecondsGetNano")
1427+
public void testToBeamRow_timestampMicros_utcSuffix() {
1428+
Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.MICROS).build();
1429+
1430+
// BigQuery format with " UTC" suffix
1431+
String timestamp = "2024-08-10 16:52:07.123456 UTC";
1432+
1433+
Row beamRow = BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", timestamp));
1434+
1435+
java.time.Instant actual = (java.time.Instant) beamRow.getValue("ts");
1436+
assertEquals(2024, actual.atZone(java.time.ZoneOffset.UTC).getYear());
1437+
assertEquals(8, actual.atZone(java.time.ZoneOffset.UTC).getMonthValue());
1438+
assertEquals(10, actual.atZone(java.time.ZoneOffset.UTC).getDayOfMonth());
1439+
assertEquals(16, actual.atZone(java.time.ZoneOffset.UTC).getHour());
1440+
assertEquals(52, actual.atZone(java.time.ZoneOffset.UTC).getMinute());
1441+
assertEquals(7, actual.atZone(java.time.ZoneOffset.UTC).getSecond());
1442+
assertEquals(123456000, actual.getNano());
1443+
}
1444+
1445+
@Test
1446+
@SuppressWarnings("JavaInstantGetSecondsGetNano")
1447+
public void testToBeamRow_timestampNanos_variablePrecision() {
1448+
// Test that different decimal place counts are handled
1449+
Schema schema = Schema.builder().addLogicalTypeField("ts", Timestamp.NANOS).build();
1450+
1451+
// 3 decimal places
1452+
Row row3 =
1453+
BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10 16:52:07.123 UTC"));
1454+
assertEquals(123000000, ((java.time.Instant) row3.getValue("ts")).getNano());
1455+
1456+
// 6 decimal places
1457+
Row row6 =
1458+
BigQueryUtils.toBeamRow(schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456 UTC"));
1459+
assertEquals(123456000, ((java.time.Instant) row6.getValue("ts")).getNano());
1460+
1461+
// 9 decimal places
1462+
Row row9 =
1463+
BigQueryUtils.toBeamRow(
1464+
schema, new TableRow().set("ts", "2024-08-10 16:52:07.123456789 UTC"));
1465+
assertEquals(123456789, ((java.time.Instant) row9.getValue("ts")).getNano());
1466+
}
12971467
}

0 commit comments

Comments
 (0)