Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -207,6 +208,8 @@ public FieldType visit(ArrowType.Timestamp type) {
if (type.getUnit() == TimeUnit.MILLISECOND
|| type.getUnit() == TimeUnit.MICROSECOND) {
return FieldType.DATETIME;
} else if (type.getUnit() == TimeUnit.NANOSECOND) {
return FieldType.logicalType(Timestamp.NANOS);
} else {
throw new IllegalArgumentException(
"Unsupported timestamp unit: " + type.getUnit().name());
Expand Down Expand Up @@ -456,21 +459,32 @@ public Optional<Function<Object, Object>> visit(ArrowType.Time type) {

@Override
public Optional<Function<Object, Object>> visit(ArrowType.Timestamp type) {
// Arrow timestamp semantics:
// - With timezone: epoch is always UTC, timezone is display metadata
// - Without timezone: epoch is in an unknown timezone ("naive" wall-clock time)
DateTimeZone tz;
try {
tz = DateTimeZone.forID(type.getTimezone());
} catch (Exception e) {
throw new IllegalArgumentException(
"Encountered unrecognized Timezone: " + type.getTimezone());
}
switch (type.getUnit()) {
case MICROSECOND:
return Optional.of((epochMicros) -> new DateTime((long) epochMicros / 1000, tz));
case MILLISECOND:
return Optional.of((epochMills) -> new DateTime((long) epochMills, tz));
default:
throw new AssertionError("Encountered unrecognized TimeUnit: " + type.getUnit());
}

return Optional.of(
epoch -> {
switch (type.getUnit()) {
case MILLISECOND:
return new DateTime((long) epoch, tz);
case MICROSECOND:
return new DateTime(Math.floorDiv((long) epoch, 1000L), tz);
case NANOSECOND:
long seconds = Math.floorDiv((long) epoch, 1_000_000_000L);
long nanoAdjustment = Math.floorMod((long) epoch, 1_000_000_000L);
return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment);
default:
throw new AssertionError("Encountered unrecognized TimeUnit: " + type.getUnit());
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.time.Instant;
import java.util.ArrayList;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
Expand All @@ -30,6 +31,7 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.TimeStampMicroTZVector;
import org.apache.arrow.vector.TimeStampMilliTZVector;
import org.apache.arrow.vector.TimeStampNanoTZVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
Expand All @@ -40,6 +42,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.collection.IsIterableContainingInOrder;
Expand Down Expand Up @@ -95,7 +98,8 @@ public void rowIterator() {
new ArrowType.List(),
field("int32s", new ArrowType.Int(32, true))),
field("boolean", new ArrowType.Bool()),
field("fixed_size_binary", new ArrowType.FixedSizeBinary(3))));
field("fixed_size_binary", new ArrowType.FixedSizeBinary(3)),
field("timestampNanoUTC", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))));

Schema beamSchema = ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema);

Expand All @@ -109,6 +113,9 @@ public void rowIterator() {
(TimeStampMicroTZVector) expectedSchemaRoot.getFieldVectors().get(3);
TimeStampMilliTZVector timeStampMilliTZVector =
(TimeStampMilliTZVector) expectedSchemaRoot.getFieldVectors().get(4);
TimeStampNanoTZVector timestampNanoUtcVector =
(TimeStampNanoTZVector) expectedSchemaRoot.getFieldVectors().get(8);

ListVector int32ListVector = (ListVector) expectedSchemaRoot.getFieldVectors().get(5);
IntVector int32ListElementVector =
int32ListVector
Expand All @@ -123,6 +130,10 @@ public void rowIterator() {
ArrayList<Row> expectedRows = new ArrayList<>();
for (int i = 0; i < 16; i++) {
DateTime dt = new DateTime(2019, 1, i + 1, i, i, i, DateTimeZone.UTC);
Instant instantNano =
Instant.ofEpochSecond(
dt.getMillis() / 1000,
(dt.getMillis() % 1000) * 1_000_000L + (1_000_000000L - 1 - i));
expectedRows.add(
Row.withSchema(beamSchema)
.addValues(
Expand All @@ -133,14 +144,16 @@ public void rowIterator() {
dt,
ImmutableList.of(i),
(i % 2) != 0,
new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)})
new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)},
instantNano)
.build());

intVector.set(i, i);
floatVector.set(i, i + .1 * i);
strVector.set(i, new Text("" + i));
timestampMicroUtcVector.set(i, dt.getMillis() * 1000);
timeStampMilliTZVector.set(i, dt.getMillis());
timestampNanoUtcVector.set(i, dt.getMillis() * 1_000_000L + (1_000_000000L - 1 - i));
int32ListVector.startNewValue(i);
int32ListElementVector.set(i, i);
int32ListVector.endValue(i, 1);
Expand All @@ -158,6 +171,23 @@ public void rowIterator() {
expectedSchemaRoot.close();
}

@Test
public void toBeamSchema_convertsTimestampTypes() {
org.apache.arrow.vector.types.pojo.Schema arrowSchema =
new org.apache.arrow.vector.types.pojo.Schema(
ImmutableList.of(
field("ts_milli", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
field("ts_micro", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")),
field("ts_nano", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))));

Schema beamSchema = ArrowConversion.ArrowSchemaTranslator.toBeamSchema(arrowSchema);

assertThat(beamSchema.getField("ts_milli").getType(), equalTo(FieldType.DATETIME));
assertThat(beamSchema.getField("ts_micro").getType(), equalTo(FieldType.DATETIME));
assertThat(
beamSchema.getField("ts_nano").getType(), equalTo(FieldType.logicalType(Timestamp.NANOS)));
}

private static org.apache.arrow.vector.types.pojo.Field field(
String name,
boolean nullable,
Expand Down
Loading