Skip to content

Commit bf19899

Browse files
authored
Add timestamp logicaltype (#36705)
* Add arbitrary precision timestamp logical type. * Add arbitrary precision timestamp logical type.
1 parent d1327b7 commit bf19899

File tree

2 files changed

+433
-0
lines changed

2 files changed

+433
-0
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas.logicaltypes;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
22+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
23+
24+
import java.time.Instant;
25+
import org.apache.beam.sdk.schemas.Schema;
26+
import org.apache.beam.sdk.values.Row;
27+
import org.checkerframework.checker.nullness.qual.NonNull;
28+
29+
/**
30+
* A timestamp represented with configurable precision.
31+
*
32+
* <p>This logical type stores timestamps as a Row with two fields:
33+
*
34+
* <ul>
35+
* <li>seconds: INT64 - seconds since Unix epoch (can be negative)
36+
* <li>subseconds: INT16 or INT32 - always non-negative (0 to 10^precision - 1)
37+
* </ul>
38+
*
39+
* <p>The subseconds field is always non-negative, even for timestamps before the epoch. For
40+
* example, -1.5 seconds is represented as {seconds: -2, subseconds: 500000} for microsecond
41+
* precision. This matches Java's {@link java.time.Instant} internal representation.
42+
*
43+
* <p><b>Note for users converting from single-integer timestamp representations:</b> If you have
44+
* timestamps stored as a single long value (e.g., microseconds since epoch), you must handle
45+
* negative modulo correctly when converting:
46+
*
47+
* <pre>{@code
48+
* long timestampMicros = -1_500_000;
49+
* long seconds = timestampMicros / 1_000_000;
50+
* long micros = timestampMicros % 1_000_000;
51+
* if (micros < 0) {
52+
* micros += 1_000_000;
53+
* seconds -= 1;
54+
* }
55+
* Instant instant = Instant.ofEpochSecond(seconds, micros * 1000);
56+
* }</pre>
57+
*/
58+
public class Timestamp implements Schema.LogicalType<Instant, Row> {
59+
public static final String IDENTIFIER = "beam:logical_type:timestamp:v1";
60+
static final int MIN_PRECISION = 0;
61+
static final int MAX_PRECISION = 9;
62+
63+
private final int precision;
64+
private final int scalingFactor;
65+
private final Schema timestampSchema;
66+
67+
public static Timestamp of(int precision) {
68+
return new Timestamp(precision);
69+
}
70+
71+
public static final Timestamp MILLIS = Timestamp.of(3);
72+
public static final Timestamp MICROS = Timestamp.of(6);
73+
public static final Timestamp NANOS = Timestamp.of(9);
74+
75+
public Timestamp(int precision) {
76+
checkArgument(
77+
precision <= MAX_PRECISION && precision >= MIN_PRECISION,
78+
"Timestamp precision must be between %s and %s (inclusive), but was %s.",
79+
MIN_PRECISION,
80+
MAX_PRECISION,
81+
precision);
82+
this.precision = precision;
83+
this.scalingFactor = (int) Math.pow(10, MAX_PRECISION - precision);
84+
if (precision < 5) {
85+
this.timestampSchema =
86+
Schema.builder().addInt64Field("seconds").addInt16Field("subseconds").build();
87+
} else {
88+
this.timestampSchema =
89+
Schema.builder().addInt64Field("seconds").addInt32Field("subseconds").build();
90+
}
91+
}
92+
93+
@Override
94+
public String getIdentifier() {
95+
return IDENTIFIER;
96+
}
97+
98+
@Override
99+
public Schema.FieldType getArgumentType() {
100+
return Schema.FieldType.INT32;
101+
}
102+
103+
@Override
104+
public Integer getArgument() {
105+
return precision;
106+
}
107+
108+
@Override
109+
public Schema.FieldType getBaseType() {
110+
return Schema.FieldType.row(timestampSchema);
111+
}
112+
113+
@Override
114+
public Row toBaseType(Instant input) {
115+
// Avoid silent data loss
116+
checkState(
117+
input.getNano() % scalingFactor == 0,
118+
"Timestamp logical type was configured with precision %s, but encountered "
119+
+ "a Java Instant with %s nanoseconds (not evenly divisible by scaling factor %s).",
120+
precision,
121+
input.getNano(),
122+
scalingFactor);
123+
124+
int subseconds = input.getNano() / scalingFactor;
125+
126+
Row.Builder rowBuilder = Row.withSchema(timestampSchema).addValue(input.getEpochSecond());
127+
if (precision < 5) {
128+
rowBuilder.addValue((short) subseconds); // Explicitly add as short
129+
} else {
130+
rowBuilder.addValue(subseconds); // Add as int
131+
}
132+
return rowBuilder.build();
133+
}
134+
135+
@Override
136+
public Instant toInputType(@NonNull Row base) {
137+
long subseconds =
138+
(precision < 5)
139+
? checkArgumentNotNull(
140+
base.getInt16(1),
141+
"While trying to convert to Instant: Row missing subseconds field")
142+
: checkArgumentNotNull(
143+
base.getInt32(1),
144+
"While trying to convert to Instant: Row missing subseconds field");
145+
146+
checkArgument(
147+
subseconds >= 0,
148+
"While trying to convert to Instant: subseconds field must be non-negative, "
149+
+ "but was %s. This likely indicates data corruption.",
150+
subseconds);
151+
152+
int maxSubseconds = (int) (Math.pow(10, precision) - 1);
153+
checkArgument(
154+
subseconds <= maxSubseconds,
155+
"While trying to convert to Instant: subseconds field must be <= %s for precision %s, "
156+
+ "but was %s. This likely indicates data corruption or precision mismatch.",
157+
maxSubseconds,
158+
precision,
159+
subseconds);
160+
161+
return Instant.ofEpochSecond(
162+
checkArgumentNotNull(
163+
base.getInt64(0), "While trying to convert to Instant: Row missing seconds field"),
164+
subseconds * scalingFactor);
165+
}
166+
}

0 commit comments

Comments
 (0)