diff --git a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java index e0dcedc47faf..3e22bad6a415 100644 --- a/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java +++ b/sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java @@ -45,6 +45,7 @@ import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.DateTime; @@ -207,6 +208,8 @@ public FieldType visit(ArrowType.Timestamp type) { if (type.getUnit() == TimeUnit.MILLISECOND || type.getUnit() == TimeUnit.MICROSECOND) { return FieldType.DATETIME; + } else if (type.getUnit() == TimeUnit.NANOSECOND) { + return FieldType.logicalType(Timestamp.NANOS); } else { throw new IllegalArgumentException( "Unsupported timestamp unit: " + type.getUnit().name()); @@ -456,6 +459,9 @@ public Optional> visit(ArrowType.Time type) { @Override public Optional> visit(ArrowType.Timestamp type) { + // Arrow timestamp semantics: + // - With timezone: epoch is always UTC, timezone is display metadata + // - Without timezone: epoch is in an unknown timezone ("naive" wall-clock time) DateTimeZone tz; try { tz = DateTimeZone.forID(type.getTimezone()); @@ -463,14 +469,22 @@ public Optional> visit(ArrowType.Timestamp type) { throw new IllegalArgumentException( "Encountered unrecognized Timezone: " + type.getTimezone()); } - switch (type.getUnit()) { - case MICROSECOND: - return Optional.of((epochMicros) -> new DateTime((long) epochMicros / 1000, tz)); - case MILLISECOND: - return Optional.of((epochMills) -> new DateTime((long) epochMills, tz)); - default: - throw new AssertionError("Encountered unrecognized TimeUnit: " + type.getUnit()); - } + + return Optional.of( + epoch -> { + switch (type.getUnit()) { + case MILLISECOND: + return new DateTime((long) epoch, tz); + case MICROSECOND: + return new DateTime(Math.floorDiv((long) epoch, 1000L), tz); + case NANOSECOND: + long seconds = Math.floorDiv((long) epoch, 1_000_000_000L); + long nanoAdjustment = Math.floorMod((long) epoch, 1_000_000_000L); + return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment); + default: + throw new AssertionError("Encountered unrecognized TimeUnit: " + type.getUnit()); + } + }); } @Override diff --git a/sdks/java/extensions/arrow/src/test/java/org/apache/beam/sdk/extensions/arrow/ArrowConversionTest.java b/sdks/java/extensions/arrow/src/test/java/org/apache/beam/sdk/extensions/arrow/ArrowConversionTest.java index 3af72ef5579c..8c2975766507 100644 --- a/sdks/java/extensions/arrow/src/test/java/org/apache/beam/sdk/extensions/arrow/ArrowConversionTest.java +++ b/sdks/java/extensions/arrow/src/test/java/org/apache/beam/sdk/extensions/arrow/ArrowConversionTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import java.time.Instant; import java.util.ArrayList; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; @@ -30,6 +31,7 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.TimeStampMicroTZVector; import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; @@ -40,6 +42,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.hamcrest.collection.IsIterableContainingInOrder; @@ -95,7 +98,8 @@ public void rowIterator() { new ArrowType.List(), field("int32s", new ArrowType.Int(32, true))), field("boolean", new ArrowType.Bool()), - field("fixed_size_binary", new ArrowType.FixedSizeBinary(3)))); + field("fixed_size_binary", new ArrowType.FixedSizeBinary(3)), + field("timestampNanoUTC", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")))); Schema beamSchema = ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema); @@ -109,6 +113,9 @@ public void rowIterator() { (TimeStampMicroTZVector) expectedSchemaRoot.getFieldVectors().get(3); TimeStampMilliTZVector timeStampMilliTZVector = (TimeStampMilliTZVector) expectedSchemaRoot.getFieldVectors().get(4); + TimeStampNanoTZVector timestampNanoUtcVector = + (TimeStampNanoTZVector) expectedSchemaRoot.getFieldVectors().get(8); + ListVector int32ListVector = (ListVector) expectedSchemaRoot.getFieldVectors().get(5); IntVector int32ListElementVector = int32ListVector @@ -123,6 +130,10 @@ public void rowIterator() { ArrayList expectedRows = new ArrayList<>(); for (int i = 0; i < 16; i++) { DateTime dt = new DateTime(2019, 1, i + 1, i, i, i, DateTimeZone.UTC); + Instant instantNano = + Instant.ofEpochSecond( + dt.getMillis() / 1000, + (dt.getMillis() % 1000) * 1_000_000L + (1_000_000000L - 1 - i)); expectedRows.add( Row.withSchema(beamSchema) .addValues( @@ -133,7 +144,8 @@ public void rowIterator() { dt, ImmutableList.of(i), (i % 2) != 0, - new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)}) + new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)}, + instantNano) .build()); intVector.set(i, i); @@ -141,6 +153,7 @@ public void rowIterator() { strVector.set(i, new Text("" + i)); timestampMicroUtcVector.set(i, dt.getMillis() * 1000); timeStampMilliTZVector.set(i, dt.getMillis()); + timestampNanoUtcVector.set(i, dt.getMillis() * 1_000_000L + (1_000_000000L - 1 - i)); int32ListVector.startNewValue(i); int32ListElementVector.set(i, i); int32ListVector.endValue(i, 1); @@ -158,6 +171,23 @@ public void rowIterator() { expectedSchemaRoot.close(); } + @Test + public void toBeamSchema_convertsTimestampTypes() { + org.apache.arrow.vector.types.pojo.Schema arrowSchema = + new org.apache.arrow.vector.types.pojo.Schema( + ImmutableList.of( + field("ts_milli", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")), + field("ts_micro", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")), + field("ts_nano", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC")))); + + Schema beamSchema = ArrowConversion.ArrowSchemaTranslator.toBeamSchema(arrowSchema); + + assertThat(beamSchema.getField("ts_milli").getType(), equalTo(FieldType.DATETIME)); + assertThat(beamSchema.getField("ts_micro").getType(), equalTo(FieldType.DATETIME)); + assertThat( + beamSchema.getField("ts_nano").getType(), equalTo(FieldType.logicalType(Timestamp.NANOS))); + } + private static org.apache.arrow.vector.types.pojo.Field field( String name, boolean nullable,