Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ public Instant toInputType(@NonNull Row base) {
maxSubseconds,
precision,
subseconds);

return Instant.ofEpochSecond(
checkArgumentNotNull(
base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
Expand Down Expand Up @@ -81,6 +82,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
Expand Down Expand Up @@ -137,6 +139,7 @@
* LogicalTypes.TimestampMillis <-----> DATETIME
* LogicalTypes.TimestampMicros ------> Long
* LogicalTypes.TimestampMicros <------ LogicalType(urn="beam:logical_type:micros_instant:v1")
* LogicalTypes.TimestampNanos <------> LogicalType(TIMESTAMP(9))
* LogicalTypes.Decimal <-----> DECIMAL
* </pre>
*
Expand Down Expand Up @@ -164,6 +167,8 @@ public class AvroUtils {

private static final GenericData GENERIC_DATA_WITH_DEFAULT_CONVERSIONS;

private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";

static {
GENERIC_DATA_WITH_DEFAULT_CONVERSIONS = new GenericData();
addLogicalTypeConversions(GENERIC_DATA_WITH_DEFAULT_CONVERSIONS);
Expand Down Expand Up @@ -1027,6 +1032,11 @@ private static FieldType toFieldType(TypeWithNullability type) {
fieldType = FieldType.DATETIME;
}
}
// TODO: Remove once Avro 1.12+ has timestamp-nanos
if (fieldType == null
&& TIMESTAMP_NANOS_LOGICAL_TYPE.equals(avroSchema.getProp("logicalType"))) {
fieldType = FieldType.logicalType(Timestamp.NANOS);
}

if (fieldType == null) {
switch (type.type.getType()) {
Expand Down Expand Up @@ -1186,6 +1196,14 @@ private static org.apache.avro.Schema getFieldSchema(
} else if (SqlTypes.TIMESTAMP.getIdentifier().equals(identifier)) {
baseType =
LogicalTypes.timestampMicros().addToSchema(org.apache.avro.Schema.create(Type.LONG));
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
int precision = checkNotNull(logicalType.getArgument());
if (precision != 9) {
throw new RuntimeException(
"Timestamp logical type precision not supported:" + precision);
}
baseType = org.apache.avro.Schema.create(Type.LONG);
baseType.addProp("logicalType", TIMESTAMP_NANOS_LOGICAL_TYPE);
} else {
throw new RuntimeException(
"Unhandled logical type " + checkNotNull(fieldType.getLogicalType()).getIdentifier());
Expand Down Expand Up @@ -1340,6 +1358,16 @@ private static org.apache.avro.Schema getFieldSchema(
java.time.Instant instant = (java.time.Instant) value;
return TimeUnit.SECONDS.toMicros(instant.getEpochSecond())
+ TimeUnit.NANOSECONDS.toMicros(instant.getNano());
} else if (Timestamp.IDENTIFIER.equals(identifier)) {
java.time.Instant instant = (java.time.Instant) value;
// Use BigInteger to work around long overflows so that epochNanos = Long.MIN_VALUE can be
// supported. Instant always stores nanos as positive adjustment so the math will silently
// overflow with regular int64.
BigInteger epochSeconds = BigInteger.valueOf(instant.getEpochSecond());
BigInteger nanosOfSecond = BigInteger.valueOf(instant.getNano());
BigInteger epochNanos =
epochSeconds.multiply(BigInteger.valueOf(1_000_000_000L)).add(nanosOfSecond);
return epochNanos.longValueExact();
} else {
throw new RuntimeException("Unhandled logical type " + identifier);
}
Expand Down Expand Up @@ -1387,6 +1415,24 @@ private static Object convertLogicalType(
@Nonnull FieldType fieldType,
@Nonnull GenericData genericData) {
TypeWithNullability type = new TypeWithNullability(avroSchema);

// TODO: Remove this workaround once Avro is upgraded to 1.12+ where timestamp-nanos
if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.type.getProp("logicalType"))) {
if (type.type.getType() == Type.LONG) {
Long nanos = (Long) value;
// Check if Beam expects Timestamp logical type
if (fieldType.getTypeName() == TypeName.LOGICAL_TYPE
&& org.apache.beam.sdk.schemas.logicaltypes.Timestamp.IDENTIFIER.equals(
fieldType.getLogicalType().getIdentifier())) {
long seconds = Math.floorDiv(nanos, 1_000_000_000L);
long nanoAdjustment = Math.floorMod(nanos, 1_000_000_000L);
return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment);
} else {
return nanos;
}
}
}

LogicalType logicalType = LogicalTypes.fromSchema(type.type);
if (logicalType == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.beam.sdk.schemas.logicaltypes.EnumerationType;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SimpleFunction;
Expand Down Expand Up @@ -549,6 +550,88 @@ public void testFromBeamSchema() {
assertEquals(getAvroSchema(), avroSchema);
}

@Test
public void testBeamTimestampNanosLogicalTypeToAvroSchema() {
Schema beamSchema =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();

// Expected Avro schema with timestamp-nanos
String expectedJson =
"{\"type\": \"record\", \"name\": \"topLevelRecord\", "
+ "\"fields\": [{\"name\": \"timestampNanos\", "
+ "\"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}}]}";

org.apache.avro.Schema expectedAvroSchema =
new org.apache.avro.Schema.Parser().parse(expectedJson);

assertEquals(expectedAvroSchema, AvroUtils.toAvroSchema(beamSchema));
}

@Test
public void testBeamTimestampNanosToGenericRecord() {
Schema beamSchema =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();

java.time.Instant instant = java.time.Instant.parse("2000-01-01T01:02:03.123456789Z");
Row beamRow = Row.withSchema(beamSchema).addValue(instant).build();

// Expected nanos since epoch
long expectedNanos = TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord avroRecord = AvroUtils.toGenericRecord(beamRow, avroSchema);

assertEquals(expectedNanos, avroRecord.get("timestampNanos"));
}

@Test
public void testTimestampNanosRoundTrip() {
Schema beamSchema =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();

// Test various nanosecond precisions
java.time.Instant[] testInstants = {
java.time.Instant.parse("2000-01-01T00:00:00.000000001Z"), // 1 nano
java.time.Instant.parse("2000-01-01T00:00:00.123456789Z"), // full nanos
java.time.Instant.parse("2000-01-01T00:00:00.999999999Z"), // max nanos
java.time.Instant.ofEpochSecond(0L, Long.MAX_VALUE), // max supported
java.time.Instant.parse("1677-09-21T00:12:43.145224192Z"), // min supported by an int64
};

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);

for (java.time.Instant instant : testInstants) {
Row originalRow = Row.withSchema(beamSchema).addValue(instant).build();
GenericRecord avroRecord = AvroUtils.toGenericRecord(originalRow, avroSchema);
Row roundTripRow = AvroUtils.toBeamRowStrict(avroRecord, beamSchema);

assertEquals(originalRow, roundTripRow);
java.time.Instant roundTripInstant =
(java.time.Instant) roundTripRow.getValue("timestampNanos");
assertEquals(instant, roundTripInstant);
}
}

@Test
public void testTimestampNanosAvroSchemaToBeamSchema() {
List<org.apache.avro.Schema.Field> fields = Lists.newArrayList();
fields.add(
new org.apache.avro.Schema.Field(
"timestampNanos",
new org.apache.avro.Schema.Parser()
.parse("{\"type\": \"long\", \"logicalType\": \"timestamp-nanos\"}"),
"",
(Object) null));
org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("test", null, null, false, fields);

Schema beamSchema = AvroUtils.toBeamSchema(avroSchema);

Schema expected =
Schema.builder().addLogicalTypeField("timestampNanos", Timestamp.NANOS).build();
assertEquals(expected, beamSchema);
}

@Test
public void testAvroSchemaFromBeamSchemaCanBeParsed() {
org.apache.avro.Schema convertedSchema = AvroUtils.toAvroSchema(getBeamSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

/** A set of utilities for working with Avro files. */
class BigQueryAvroUtils {
Expand All @@ -60,7 +58,7 @@ class BigQueryAvroUtils {
Optional.ofNullable(Schema.class.getPackage())
.map(Package::getImplementationVersion)
.orElse("");

private static final String TIMESTAMP_NANOS_LOGICAL_TYPE = "timestamp-nanos";
// org.apache.avro.LogicalType
static class DateTimeLogicalType extends LogicalType {
public DateTimeLogicalType() {
Expand Down Expand Up @@ -161,36 +159,73 @@ static Schema getPrimitiveType(TableFieldSchema schema, Boolean useAvroLogicalTy
* Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
* immutable.
*/
private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();

@VisibleForTesting
static String formatTimestamp(Long timestampMicro) {
String dateTime = formatDatetime(timestampMicro);
return dateTime + " UTC";
private static final java.time.format.DateTimeFormatter DATE_TIME_FORMATTER =
java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
.withZone(java.time.ZoneOffset.UTC);

/** Enum to define the precision of a timestamp since the epoch. */
enum TimestampPrecision {
MILLISECONDS,
MICROSECONDS,
NANOSECONDS;

/** Converts an epoch value of this precision to an Instant. */
java.time.Instant toInstant(long epochValue) {
switch (this) {
case MILLISECONDS:
return java.time.Instant.ofEpochMilli(epochValue);
case MICROSECONDS:
{
long seconds = Math.floorDiv(epochValue, 1_000_000L);
long microsOfSecond = Math.floorMod(epochValue, 1_000_000L);
return java.time.Instant.ofEpochSecond(seconds, microsOfSecond * 1_000L);
}
case NANOSECONDS:
{
long seconds = Math.floorDiv(epochValue, 1_000_000_000L);
long nanosOfSecond = Math.floorMod(epochValue, 1_000_000_000L);
return java.time.Instant.ofEpochSecond(seconds, nanosOfSecond);
}
default:
throw new IllegalStateException("Unknown precision: " + this);
}
}
}

/**
* Formats an Instant with minimal fractional second precision. Shows 0, 3, 6, or 9 decimal places
* based on actual precision of the value.
*/
@VisibleForTesting
static String formatDatetime(Long timestampMicro) {
// timestampMicro is in "microseconds since epoch" format,
// e.g., 1452062291123456L means "2016-01-06 06:38:11.123456 UTC".
// Separate into seconds and microseconds.
long timestampSec = timestampMicro / 1_000_000;
long micros = timestampMicro % 1_000_000;
if (micros < 0) {
micros += 1_000_000;
timestampSec -= 1;
}
String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(timestampSec * 1000);
if (micros == 0) {
return dayAndTime;
} else if (micros % 1000 == 0) {
return String.format("%s.%03d", dayAndTime, micros / 1000);
@SuppressWarnings("JavaInstantGetSecondsGetNano")
static String formatDatetime(java.time.Instant instant) {
String dateTime = DATE_TIME_FORMATTER.format(instant);
int nanos = instant.getNano();

if (nanos == 0) {
return dateTime;
} else if (nanos % 1_000_000 == 0) {
return dateTime + String.format(".%03d", nanos / 1_000_000);
} else if (nanos % 1_000 == 0) {
return dateTime + String.format(".%06d", nanos / 1_000);
} else {
return String.format("%s.%06d", dayAndTime, micros);
return dateTime + String.format(".%09d", nanos);
}
}

@VisibleForTesting
static String formatDatetime(long epochValue, TimestampPrecision precision) {
return formatDatetime(precision.toInstant(epochValue));
}

static String formatTimestamp(java.time.Instant instant) {
return formatDatetime(instant) + " UTC";
}

static String formatTimestamp(long epochValue, TimestampPrecision precision) {
return formatTimestamp(precision.toInstant(epochValue));
}

/**
* This method formats a BigQuery DATE value into a String matching the format used by JSON
* export. Date records are stored in "days since epoch" format, and BigQuery uses the proleptic
Expand Down Expand Up @@ -335,7 +370,6 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
// REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
// INTEGER type maps to an Avro LONG type.
checkNotNull(v, "REQUIRED field %s should not be null", name);

Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
switch (type) {
Expand Down Expand Up @@ -364,21 +398,26 @@ private static Object convertRequiredField(String name, Schema schema, Object v)
} else if (logicalType instanceof LogicalTypes.TimestampMillis) {
// Write only: SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v * 1000L);
return formatTimestamp((Long) v, TimestampPrecision.MILLISECONDS);
} else if (logicalType instanceof LogicalTypes.TimestampMicros) {
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v);
return formatTimestamp((Long) v, TimestampPrecision.MICROSECONDS);
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
} else if (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(schema.getProp("logicalType"))) {
// SQL type TIMESTAMP
// ideally Instant but TableRowJsonCoder encodes as String
return formatTimestamp((Long) v, TimestampPrecision.NANOSECONDS);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMillis) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime(((Long) v) * 1000);
return formatDatetime(((Long) v), TimestampPrecision.MILLISECONDS);
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
&& logicalType instanceof LogicalTypes.LocalTimestampMicros) {
// Write only: SQL type DATETIME
// ideally LocalDateTime but TableRowJsonCoder encodes as String
return formatDatetime((Long) v);
return formatDatetime((Long) v, TimestampPrecision.MICROSECONDS);
} else {
// SQL type INT64 (INT, SMALLINT, INTEGER, BIGINT, TINYINT, BYTEINT)
// ideally Long if in [2^53+1, 2^53-1] but keep consistency with BQ JSON export that uses
Expand Down Expand Up @@ -602,6 +641,11 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
return fieldSchema.setType("INTEGER");
}
case LONG:
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
if (useAvroLogicalTypes
&& (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
return fieldSchema.setType("TIMESTAMP");
}
if (logicalType instanceof LogicalTypes.TimeMicros) {
return fieldSchema.setType("TIME");
} else if (!(VERSION_AVRO.startsWith("1.8") || VERSION_AVRO.startsWith("1.9"))
Expand Down
Loading
Loading