Skip to content

Commit ce76f81

Browse files
claudevdmClaude
andauthored
Add timestamp-nanos avro logical type support in bigquery avro utils. (#36892)
* Add timestamp-nanos avro logical type support in bigquery utils. * Lint. * Avoid overflows for millis/micros. * Remove println * Comments. * Comments * Comments. --------- Co-authored-by: Claude <[email protected]>
1 parent 712172d commit ce76f81

File tree

5 files changed

+357
-49
lines changed

5 files changed

+357
-49
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Timestamp.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ public Instant toInputType(@NonNull Row base) {
157157
maxSubseconds,
158158
precision,
159159
subseconds);
160-
161160
return Instant.ofEpochSecond(
162161
checkArgumentNotNull(
163162
base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"),

sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtils.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.io.ObjectOutputStream;
2828
import java.lang.reflect.Method;
2929
import java.math.BigDecimal;
30+
import java.math.BigInteger;
3031
import java.nio.ByteBuffer;
3132
import java.nio.charset.StandardCharsets;
3233
import java.util.ArrayList;
@@ -81,6 +82,7 @@
8182
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
8283
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
8384
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
85+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
8486
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
8587
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
8688
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
@@ -137,6 +139,7 @@
137139
* LogicalTypes.TimestampMillis <-----> DATETIME
138140
* LogicalTypes.TimestampMicros ------> Long
139141
* LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1")
142+
* LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9))
140143
* LogicalTypes.Decimal <-----> DECIMAL
141144
* </pre>
142145
*
@@ -164,6 +167,8 @@ public class AvroUtils {
164167

165168
private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;
166169

170+
private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
171+
167172
static {
168173
GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData();
169174
addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
@@ -1027,6 +1032,11 @@ private static FieldType toFieldType(TypeWithNullability type) {
10271032
fieldType = FieldType.DATETIME;
10281033
}
10291034
}
1035+
// TODO: Remove once Avro 1.12+ has timestamp-nanos
1036+
if (fieldType == null
1037+
&& TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) {
1038+
fieldType = FieldType.logicalType(Timestamp.NANOS);
1039+
}
10301040

10311041
if (fieldType == null) {
10321042
switch (type.type.getType()) {
@@ -1186,6 +1196,14 @@ private static org.apache.avro.Schema getFieldSchema(
11861196
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
11871197
baseType =
11881198
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
1199+
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
1200+
int precision = checkNotNull(logicalType.getArgument());
1201+
if (precision != 9) {
1202+
throw new RuntimeException(
1203+
"Timestamp logical type precision not supported:" + precision);
1204+
}
1205+
baseType = org.apache.avro.Schema.create(Type.LONG);
1206+
baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE);
11891207
} else {
11901208
throw new RuntimeException(
11911209
"Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier());
@@ -1340,6 +1358,16 @@ private static org.apache.avro.Schema getFieldSchema(
13401358
java.time.Instant instant = (java.time.Instant) value;
13411359
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
13421360
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano());
1361+
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
1362+
java.time.Instant instant = (java.time.Instant) value;
1363+
// Use BigInteger to work around long overflows so that epochNanos = Long.MIN_VALUE can be
1364+
// supported. Instant always stores nanos as positive adjustment so the math will silently
1365+
// overflow with regular int64.
1366+
BigInteger epochSeconds = BigInteger.valueOf(instant.getEpochSecond());
1367+
BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano());
1368+
BigInteger epochNanos =
1369+
epochSeconds.multiply(BigInteger.valueOf(1_000_000_000L)).add(nanosOfSecond);
1370+
return epochNanos.longValueExact();
13431371
} else {
13441372
throw new RuntimeException("Unhandled logical type " + identifier);
13451373
}
@@ -1387,6 +1415,24 @@ private static Object convertLogicalType(
13871415
@Nonnull FieldType fieldType,
13881416
@Nonnull GenericData genericData) {
13891417
TypeWithNullability type = new TypeWithNullability(avroSchema);
1418+
1419+
// TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos
1420+
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) {
1421+
if (type.type.getType() == Type.LONG) {
1422+
Long nanos = (Long) value;
1423+
// Check if Beam expects Timestamp logical type
1424+
if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
1425+
&& org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals(
1426+
fieldType.getLogicalType().getIdentifier())) {
1427+
long seconds = Math.floorDiv(nanos, 1_000_000_000L);
1428+
long nanoAdjustment = Math.floorMod(nanos, 1_000_000_000L);
1429+
return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment);
1430+
} else {
1431+
return nanos;
1432+
}
1433+
}
1434+
}
1435+
13901436
LogicalType logicalType = LogicalTypes.fromSchema(type.type);
13911437
if (logicalType == null) {
13921438
return null;

sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroUtilsTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
5555
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
5656
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
57+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
5758
import org.apache.beam.sdk.testing.CoderProperties;
5859
import org.apache.beam.sdk.transforms.Create;
5960
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -549,6 +550,88 @@ public void testFromBeamSchema() {
549550
assertEquals(getAvroSchema(), avroSchema);
550551
}
551552

553+
@Test
554+
public void testBeamTimestampNanosLogicalTypeToAvroSchema() {
555+
Schema beamSchema =
556+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
557+
558+
// Expected Avro schema with timestamp-nanos
559+
String expectedJson =
560+
"{\"type\": \"record\", \"name\": \"topLevelRecord\", "
561+
+ "\"fields\": [{\"name\": \"timestampNanos\", "
562+
+ "\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}]}";
563+
564+
org.apache.avro.Schema expectedAvroSchema =
565+
new org.apache.avro.Schema.Parser().parse(expectedJson);
566+
567+
assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema));
568+
}
569+
570+
@Test
571+
public void testBeamTimestampNanosToGenericRecord() {
572+
Schema beamSchema =
573+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
574+
575+
java.time.Instant instant = java.time.Instant.parse("2000-01-01T01:02:03.123456789Z");
576+
Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();
577+
578+
// Expected nanos since epoch
579+
long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();
580+
581+
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
582+
GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema);
583+
584+
assertEquals(expectedNanos, avroRecord.get("timestampNanos"));
585+
}
586+
587+
@Test
588+
public void testTimestampNanosRoundTrip() {
589+
Schema beamSchema =
590+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
591+
592+
// Test various nanosecond precisions
593+
java.time.Instant[] testInstants = {
594+
java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano
595+
java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos
596+
java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos
597+
java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported
598+
java.time.Instant.parse("1677-09-21T00:12:43.145224192Z"), // min supported by an int64
599+
};
600+
601+
org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
602+
603+
for (java.time.Instant instant : testInstants) {
604+
Row originalRow = Row.withSchema(beamSchema).addValue(instant).build();
605+
GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, avroSchema);
606+
Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema);
607+
608+
assertEquals(originalRow, roundTripRow);
609+
java.time.Instant roundTripInstant =
610+
(java.time.Instant) roundTripRow.getValue("timestampNanos");
611+
assertEquals(instant, roundTripInstant);
612+
}
613+
}
614+
615+
@Test
616+
public void testTimestampNanosAvroSchemaToBeamSchema() {
617+
List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
618+
fields.add(
619+
new org.apache.avro.Schema.Field(
620+
"timestampNanos",
621+
new org.apache.avro.Schema.Parser()
622+
.parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"),
623+
"",
624+
(Object) null));
625+
org.apache.avro.Schema avroSchema =
626+
org.apache.avro.Schema.createRecord("test", null, null, false, fields);
627+
628+
Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);
629+
630+
Schema expected =
631+
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
632+
assertEquals(expected, beamSchema);
633+
}
634+
552635
@Test
553636
public void testAvroSchemaFromBeamSchemaCanBeParsed() {
554637
org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema());

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java

Lines changed: 75 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@
5050
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
5151
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
5252
import org.checkerframework.checker.nullness.qual.Nullable;
53-
import org.joda.time.format.DateTimeFormat;
54-
import org.joda.time.format.DateTimeFormatter;
5553

5654
/** A set of utilities for working with Avro files. */
5755
class BigQueryAvroUtils {
@@ -60,7 +58,7 @@ class BigQueryAvroUtils {
6058
Optional.ofNullable(Schema.class.getPackage())
6159
.map(Package::getImplementationVersion)
6260
.orElse("");
63-
61+
private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
6462
// org.apache.avro.LogicalType
6563
static class DateTimeLogicalType extends LogicalType {
6664
public DateTimeLogicalType() {
@@ -161,36 +159,73 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
161159
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
162160
* immutable.
163161
*/
164-
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
165-
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
166-
167-
@VisibleForTesting
168-
static String formatTimestamp(Long timestampMicro) {
169-
String dateTime = formatDatetime(timestampMicro);
170-
return dateTime + " UTC";
162+
private static final java.time.format.DateTimeFormatter DATE_TIME_FORMATTER =
163+
java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
164+
.withZone(java.time.ZoneOffset.UTC);
165+
166+
/** Enum to define the precision of a timestamp since the epoch. */
167+
enum TimestampPrecision {
168+
MILLISECONDS,
169+
MICROSECONDS,
170+
NANOSECONDS;
171+
172+
/** Converts an epoch value of this precision to an Instant. */
173+
java.time.Instant toInstant(long epochValue) {
174+
switch (this) {
175+
case MILLISECONDS:
176+
return java.time.Instant.ofEpochMilli(epochValue);
177+
case MICROSECONDS:
178+
{
179+
long seconds = Math.floorDiv(epochValue, 1_000_000L);
180+
long microsOfSecond = Math.floorMod(epochValue, 1_000_000L);
181+
return java.time.Instant.ofEpochSecond(seconds, microsOfSecond * 1_000L);
182+
}
183+
case NANOSECONDS:
184+
{
185+
long seconds = Math.floorDiv(epochValue, 1_000_000_000L);
186+
long nanosOfSecond = Math.floorMod(epochValue, 1_000_000_000L);
187+
return java.time.Instant.ofEpochSecond(seconds, nanosOfSecond);
188+
}
189+
default:
190+
throw new IllegalStateException("Unknown precision: " + this);
191+
}
192+
}
171193
}
172194

195+
/**
196+
* Formats an Instant with minimal fractional second precision. Shows 0, 3, 6, or 9 decimal places
197+
* based on actual precision of the value.
198+
*/
173199
@VisibleForTesting
174-
static String formatDatetime(Long timestampMicro) {
175-
// timestampMicro is in "microseconds since epoch" format,
176-
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
177-
// Separate into seconds and microseconds.
178-
long timestampSec = timestampMicro / 1_000_000;
179-
long micros = timestampMicro % 1_000_000;
180-
if (micros < 0) {
181-
micros += 1_000_000;
182-
timestampSec -= 1;
183-
}
184-
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
185-
if (micros == 0) {
186-
return dayAndTime;
187-
} else if (micros % 1000 == 0) {
188-
return String.format("%s.%03d", dayAndTime, micros / 1000);
200+
@SuppressWarnings("JavaInstantGetSecondsGetNano")
201+
static String formatDatetime(java.time.Instant instant) {
202+
String dateTime = DATE_TIME_FORMATTER.format(instant);
203+
int nanos = instant.getNano();
204+
205+
if (nanos == 0) {
206+
return dateTime;
207+
} else if (nanos % 1_000_000 == 0) {
208+
return dateTime + String.format(".%03d", nanos / 1_000_000);
209+
} else if (nanos % 1_000 == 0) {
210+
return dateTime + String.format(".%06d", nanos / 1_000);
189211
} else {
190-
return String.format("%s.%06d", dayAndTime, micros);
212+
return dateTime + String.format(".%09d", nanos);
191213
}
192214
}
193215

216+
@VisibleForTesting
217+
static String formatDatetime(long epochValue, TimestampPrecision precision) {
218+
return formatDatetime(precision.toInstant(epochValue));
219+
}
220+
221+
static String formatTimestamp(java.time.Instant instant) {
222+
return formatDatetime(instant) + " UTC";
223+
}
224+
225+
static String formatTimestamp(long epochValue, TimestampPrecision precision) {
226+
return formatTimestamp(precision.toInstant(epochValue));
227+
}
228+
194229
/**
195230
* This method formats a BigQuery DATE value into a String matching the format used by JSON
196231
* export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic
@@ -335,7 +370,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
335370
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
336371
// INTEGER type maps to an Avro LONG type.
337372
checkNotNull(v, "REQUIRED field %s should not be null", name);
338-
339373
Type type = schema.getType();
340374
LogicalType logicalType = schema.getLogicalType();
341375
switch (type) {
@@ -364,21 +398,26 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
364398
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
365399
// Write only: SQL type TIMESTAMP
366400
// ideally Instant but TableRowJsonCoder encodes as String
367-
return formatTimestamp((Long) v * 1000L);
401+
return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS);
368402
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
369403
// SQL type TIMESTAMP
370404
// ideally Instant but TableRowJsonCoder encodes as String
371-
return formatTimestamp((Long) v);
405+
return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS);
406+
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
407+
} else if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) {
408+
// SQL type TIMESTAMP
409+
// ideally Instant but TableRowJsonCoder encodes as String
410+
return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS);
372411
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
373412
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
374413
// Write only: SQL type DATETIME
375414
// ideally LocalDateTime but TableRowJsonCoder encodes as String
376-
return formatDatetime(((Long) v) * 1000);
415+
return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS);
377416
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
378417
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
379418
// Write only: SQL type DATETIME
380419
// ideally LocalDateTime but TableRowJsonCoder encodes as String
381-
return formatDatetime((Long) v);
420+
return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS);
382421
} else {
383422
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
384423
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
@@ -602,6 +641,11 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
602641
return fieldSchema.setType("INTEGER");
603642
}
604643
case LONG:
644+
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
645+
if (useAvroLogicalTypes
646+
&& (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
647+
return fieldSchema.setType("TIMESTAMP");
648+
}
605649
if (logicalType instanceof LogicalTypes.TimeMicros) {
606650
return fieldSchema.setType("TIME");
607651
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))

0 commit comments

Comments
 (0)