Skip to content

Commit 712172d

Browse files
claudevdmClaude
andauthored
Add arrow conversion support for timestamp nanoseconds logical type. (#36960)
* Support arrow nanoseconds. * Comments. --------- Co-authored-by: Claude <[email protected]>
1 parent c9d2e60 commit 712172d

File tree

2 files changed

+54
-10
lines changed

2 files changed

+54
-10
lines changed

sdks/java/extensions/arrow/src/main/java/org/apache/beam/sdk/extensions/arrow/ArrowConversion.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.beam.sdk.schemas.Schema.Field;
4646
import org.apache.beam.sdk.schemas.Schema.FieldType;
4747
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
48+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
4849
import org.apache.beam.sdk.values.Row;
4950
import org.apache.beam.sdk.values.TypeDescriptor;
5051
import org.joda.time.DateTime;
@@ -207,6 +208,8 @@ public FieldType visit(ArrowType.Timestamp type) {
207208
if (type.getUnit() == TimeUnit.MILLISECOND
208209
|| type.getUnit() == TimeUnit.MICROSECOND) {
209210
return FieldType.DATETIME;
211+
} else if (type.getUnit() == TimeUnit.NANOSECOND) {
212+
return FieldType.logicalType(Timestamp.NANOS);
210213
} else {
211214
throw new IllegalArgumentException(
212215
"Unsupported timestamp unit: " + type.getUnit().name());
@@ -456,21 +459,32 @@ public Optional<Function<Object, Object>> visit(ArrowType.Time type) {
456459

457460
@Override
458461
public Optional<Function<Object, Object>> visit(ArrowType.Timestamp type) {
462+
// Arrow timestamp semantics:
463+
// - With timezone: epoch is always UTC, timezone is display metadata
464+
// - Without timezone: epoch is in an unknown timezone ("naive" wall-clock time)
459465
DateTimeZone tz;
460466
try {
461467
tz = DateTimeZone.forID(type.getTimezone());
462468
} catch (Exception e) {
463469
throw new IllegalArgumentException(
464470
"Encountered unrecognized Timezone: " + type.getTimezone());
465471
}
466-
switch (type.getUnit()) {
467-
case MICROSECOND:
468-
return Optional.of((epochMicros) -> new DateTime((long) epochMicros / 1000, tz));
469-
case MILLISECOND:
470-
return Optional.of((epochMills) -> new DateTime((long) epochMills, tz));
471-
default:
472-
throw new AssertionError("Encountered unrecognized TimeUnit: " + type.getUnit());
473-
}
472+
473+
return Optional.of(
474+
epoch -> {
475+
switch (type.getUnit()) {
476+
case MILLISECOND:
477+
return new DateTime((long) epoch, tz);
478+
case MICROSECOND:
479+
return new DateTime(Math.floorDiv((long) epoch, 1000L), tz);
480+
case NANOSECOND:
481+
long seconds = Math.floorDiv((long) epoch, 1_000_000_000L);
482+
long nanoAdjustment = Math.floorMod((long) epoch, 1_000_000_000L);
483+
return java.time.Instant.ofEpochSecond(seconds, nanoAdjustment);
484+
default:
485+
throw new AssertionError("Encountered unrecognized TimeUnit: " + type.getUnit());
486+
}
487+
});
474488
}
475489

476490
@Override

sdks/java/extensions/arrow/src/test/java/org/apache/beam/sdk/extensions/arrow/ArrowConversionTest.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.hamcrest.MatcherAssert.assertThat;
2222
import static org.hamcrest.Matchers.equalTo;
2323

24+
import java.time.Instant;
2425
import java.util.ArrayList;
2526
import org.apache.arrow.memory.BufferAllocator;
2627
import org.apache.arrow.memory.RootAllocator;
@@ -30,6 +31,7 @@
3031
import org.apache.arrow.vector.IntVector;
3132
import org.apache.arrow.vector.TimeStampMicroTZVector;
3233
import org.apache.arrow.vector.TimeStampMilliTZVector;
34+
import org.apache.arrow.vector.TimeStampNanoTZVector;
3335
import org.apache.arrow.vector.VarCharVector;
3436
import org.apache.arrow.vector.VectorSchemaRoot;
3537
import org.apache.arrow.vector.complex.ListVector;
@@ -40,6 +42,7 @@
4042
import org.apache.beam.sdk.schemas.Schema;
4143
import org.apache.beam.sdk.schemas.Schema.Field;
4244
import org.apache.beam.sdk.schemas.Schema.FieldType;
45+
import org.apache.beam.sdk.schemas.logicaltypes.Timestamp;
4346
import org.apache.beam.sdk.values.Row;
4447
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4548
import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -95,7 +98,8 @@ public void rowIterator() {
9598
new ArrowType.List(),
9699
field("int32s", new ArrowType.Int(32, true))),
97100
field("boolean", new ArrowType.Bool()),
98-
field("fixed_size_binary", new ArrowType.FixedSizeBinary(3))));
101+
field("fixed_size_binary", new ArrowType.FixedSizeBinary(3)),
102+
field("timestampNanoUTC", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))));
99103

100104
Schema beamSchema = ArrowConversion.ArrowSchemaTranslator.toBeamSchema(schema);
101105

@@ -109,6 +113,9 @@ public void rowIterator() {
109113
(TimeStampMicroTZVector) expectedSchemaRoot.getFieldVectors().get(3);
110114
TimeStampMilliTZVector timeStampMilliTZVector =
111115
(TimeStampMilliTZVector) expectedSchemaRoot.getFieldVectors().get(4);
116+
TimeStampNanoTZVector timestampNanoUtcVector =
117+
(TimeStampNanoTZVector) expectedSchemaRoot.getFieldVectors().get(8);
118+
112119
ListVector int32ListVector = (ListVector) expectedSchemaRoot.getFieldVectors().get(5);
113120
IntVector int32ListElementVector =
114121
int32ListVector
@@ -123,6 +130,10 @@ public void rowIterator() {
123130
ArrayList<Row> expectedRows = new ArrayList<>();
124131
for (int i = 0; i < 16; i++) {
125132
DateTime dt = new DateTime(2019, 1, i + 1, i, i, i, DateTimeZone.UTC);
133+
Instant instantNano =
134+
Instant.ofEpochSecond(
135+
dt.getMillis() / 1000,
136+
(dt.getMillis() % 1000) * 1_000_000L + (1_000_000000L - 1 - i));
126137
expectedRows.add(
127138
Row.withSchema(beamSchema)
128139
.addValues(
@@ -133,14 +144,16 @@ public void rowIterator() {
133144
dt,
134145
ImmutableList.of(i),
135146
(i % 2) != 0,
136-
new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)})
147+
new byte[] {(byte) i, (byte) (i + 1), (byte) (i + 2)},
148+
instantNano)
137149
.build());
138150

139151
intVector.set(i, i);
140152
floatVector.set(i, i + .1 * i);
141153
strVector.set(i, new Text("" + i));
142154
timestampMicroUtcVector.set(i, dt.getMillis() * 1000);
143155
timeStampMilliTZVector.set(i, dt.getMillis());
156+
timestampNanoUtcVector.set(i, dt.getMillis() * 1_000_000L + (1_000_000000L - 1 - i));
144157
int32ListVector.startNewValue(i);
145158
int32ListElementVector.set(i, i);
146159
int32ListVector.endValue(i, 1);
@@ -158,6 +171,23 @@ public void rowIterator() {
158171
expectedSchemaRoot.close();
159172
}
160173

174+
@Test
175+
public void toBeamSchema_convertsTimestampTypes() {
176+
org.apache.arrow.vector.types.pojo.Schema arrowSchema =
177+
new org.apache.arrow.vector.types.pojo.Schema(
178+
ImmutableList.of(
179+
field("ts_milli", new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC")),
180+
field("ts_micro", new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC")),
181+
field("ts_nano", new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"))));
182+
183+
Schema beamSchema = ArrowConversion.ArrowSchemaTranslator.toBeamSchema(arrowSchema);
184+
185+
assertThat(beamSchema.getField("ts_milli").getType(), equalTo(FieldType.DATETIME));
186+
assertThat(beamSchema.getField("ts_micro").getType(), equalTo(FieldType.DATETIME));
187+
assertThat(
188+
beamSchema.getField("ts_nano").getType(), equalTo(FieldType.logicalType(Timestamp.NANOS)));
189+
}
190+
161191
private static org.apache.arrow.vector.types.pojo.Field field(
162192
String name,
163193
boolean nullable,

0 commit comments

Comments
 (0)