Skip to content

Commit 6fbdaa2

Browse files
(fix #34038) Use CivilTimeEncoder to encode Time values in AvroGenericRecordToStorageApiProto (#34059)
* (fix #34038) Use CivilTimeEncoder to encode Time values in AvroGenericRecordToStorageApiProto * Update CHANGES * Don't throw away precision
1 parent 9d92dd7 commit 6fbdaa2

File tree

3 files changed

+23
-11
lines changed

3 files changed

+23
-11
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
## Bugfixes
8585

8686
* (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)).
87+
* (Java) Fixed TIME field encodings for BigQuery Storage API writes on GenericRecords ([#34059](https://github.com/apache/beam/pull/34059)).
8788

8889
## Security Fixes
8990
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Optional;
3030
import java.util.UUID;
31+
import java.util.concurrent.TimeUnit;
3132
import java.util.function.BiFunction;
3233
import java.util.function.Function;
3334
import java.util.stream.Collectors;
@@ -174,19 +175,21 @@ static Integer convertDate(Object value) {
174175

175176
static Long convertTime(Object value, boolean micros) {
176177
if (value instanceof org.joda.time.LocalTime) {
177-
return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay();
178+
return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value);
178179
} else if (value instanceof java.time.LocalTime) {
179-
return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
180-
((java.time.LocalTime) value).toNanoOfDay());
180+
return CivilTimeEncoder.encodePacked64TimeMicros((java.time.LocalTime) value);
181181
} else {
182182
if (micros) {
183183
Preconditions.checkArgument(
184184
value instanceof Long, "Expecting a value as Long type (time).");
185-
return (Long) value;
185+
return CivilTimeEncoder.encodePacked64TimeMicros(
186+
java.time.LocalTime.ofNanoOfDay((TimeUnit.MICROSECONDS.toNanos((long) value))));
186187
} else {
187188
Preconditions.checkArgument(
188189
value instanceof Integer, "Expecting a value as Integer type (time).");
189-
return 1_000L * (Integer) value;
190+
return CivilTimeEncoder.encodePacked64TimeMicros(
191+
java.time.LocalTime.ofNanoOfDay(
192+
(TimeUnit.MILLISECONDS).toNanos(((Integer) value).longValue())));
190193
}
191194
}
192195
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.List;
4141
import java.util.Map;
4242
import java.util.UUID;
43+
import java.util.concurrent.TimeUnit;
4344
import java.util.stream.Collectors;
4445
import org.apache.avro.Conversions;
4546
import org.apache.avro.LogicalTypes;
@@ -400,13 +401,14 @@ enum TestEnum {
400401
.set("uuidValue", uuid.toString())
401402
.build();
402403

404+
org.joda.time.LocalTime localTime = org.joda.time.LocalTime.fromMillisOfDay(42_000L);
403405
jodaTimeLogicalTypesRecord =
404406
new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA)
405407
.set("numericValue", numeric)
406408
.set("bigNumericValue", bigNumeric)
407409
.set("dateValue", new org.joda.time.LocalDate(1970, 1, 1).plusDays(42))
408-
.set("timeMicrosValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L))
409-
.set("timeMillisValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L))
410+
.set("timeMicrosValue", localTime)
411+
.set("timeMillisValue", localTime)
410412
.set("timestampMicrosValue", org.joda.time.Instant.ofEpochSecond(42L))
411413
.set("timestampMillisValue", org.joda.time.Instant.ofEpochSecond(42L))
412414
.set(
@@ -423,8 +425,14 @@ enum TestEnum {
423425
.set("numericValue", numeric)
424426
.set("bigNumericValue", bigNumeric)
425427
.set("dateValue", java.time.LocalDate.ofEpochDay(42L))
426-
.set("timeMicrosValue", java.time.LocalTime.ofSecondOfDay(42L))
427-
.set("timeMillisValue", java.time.LocalTime.ofSecondOfDay(42L))
428+
.set(
429+
"timeMicrosValue",
430+
java.time.LocalTime.ofNanoOfDay(
431+
TimeUnit.MILLISECONDS.toNanos(localTime.getMillisOfDay())))
432+
.set(
433+
"timeMillisValue",
434+
java.time.LocalTime.ofNanoOfDay(
435+
TimeUnit.MILLISECONDS.toNanos(localTime.getMillisOfDay())))
428436
.set("timestampMicrosValue", java.time.Instant.ofEpochSecond(42L))
429437
.set("timestampMillisValue", java.time.Instant.ofEpochSecond(42L))
430438
.set(
@@ -456,8 +464,8 @@ enum TestEnum {
456464
.put("numericvalue", numericBytes)
457465
.put("bignumericvalue", bigNumericBytes)
458466
.put("datevalue", 42)
459-
.put("timemicrosvalue", 42_000_000L)
460-
.put("timemillisvalue", 42_000_000L)
467+
.put("timemicrosvalue", CivilTimeEncoder.encodePacked64TimeMicros(localTime))
468+
.put("timemillisvalue", CivilTimeEncoder.encodePacked64TimeMicros(localTime))
461469
.put("timestampmicrosvalue", 42_000_000L)
462470
.put("timestampmillisvalue", 42_000_000L)
463471
.put("localtimestampmicrosvalue", 42_000_000L)

0 commit comments

Comments
 (0)