Skip to content

Commit 6fe69fe

Browse files
authored
Fix BeamSQL CalcRel DATETIME expression (#35743)
* Fix BeamSQL CalcRel DATETIME expression * Add nullable array test case * fix checkstyle
1 parent f7e5ee5 commit 6fe69fe

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
import org.codehaus.janino.ScriptEvaluator;
111111
import org.joda.time.DateTime;
112112
import org.joda.time.Instant;
113+
import org.joda.time.base.AbstractInstant;
113114
import org.slf4j.Logger;
114115
import org.slf4j.LoggerFactory;
115116

@@ -632,8 +633,10 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
632633
case BOOLEAN:
633634
return Expressions.convert_(value, Boolean.class);
634635
case DATETIME:
636+
// AbstractInstant handles both joda Instant and DateTime
635637
return nullOr(
636-
value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis"));
638+
value,
639+
Expressions.call(Expressions.convert_(value, AbstractInstant.class), "getMillis"));
637640
case BYTES:
638641
return nullOr(
639642
value, Expressions.new_(ByteString.class, Expressions.convert_(value, byte[].class)));

sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.time.LocalTime;
2424
import java.util.Arrays;
2525
import java.util.HashMap;
26+
import java.util.List;
2627
import java.util.Map;
2728
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
2829
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -43,6 +44,7 @@
4344
import org.apache.beam.sdk.values.Row;
4445
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
4546
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
47+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
4648
import org.joda.time.Duration;
4749
import org.joda.time.Instant;
4850
import org.junit.Ignore;
@@ -349,6 +351,44 @@ public void testNestedArrayOfBytes() {
349351
pipeline.run();
350352
}
351353

354+
@Test
355+
public void testNestedDatetime() {
356+
List<Instant> dateTimes =
357+
ImmutableList.of(Instant.EPOCH, Instant.ofEpochSecond(10000), Instant.now());
358+
List<Instant> nullDateTimes = Lists.newArrayList(Instant.EPOCH, null, Instant.now());
359+
360+
Schema nestedInputSchema =
361+
Schema.of(
362+
Schema.Field.of("c_dts", Schema.FieldType.array(Schema.FieldType.DATETIME)),
363+
Schema.Field.of(
364+
"c_null_dts",
365+
Schema.FieldType.array(Schema.FieldType.DATETIME.withNullable(true))));
366+
Schema inputSchema =
367+
Schema.of(Schema.Field.of("nested", Schema.FieldType.row(nestedInputSchema)));
368+
369+
Schema outputSchema =
370+
Schema.of(
371+
Schema.Field.of("f0", Schema.FieldType.DATETIME),
372+
Schema.Field.of("f1", Schema.FieldType.DATETIME.withNullable(true)));
373+
374+
Row nestedRow =
375+
Row.withSchema(nestedInputSchema).addValue(dateTimes).addValue(nullDateTimes).build();
376+
Row row = Row.withSchema(inputSchema).addValue(nestedRow).build();
377+
Row expected =
378+
Row.withSchema(outputSchema).addValues(dateTimes.get(1), nullDateTimes.get(1)).build();
379+
380+
PCollection<Row> result =
381+
pipeline
382+
.apply(Create.of(row).withRowSchema(inputSchema))
383+
.apply(
384+
SqlTransform.query(
385+
"SELECT t.nested.c_dts[2], t.nested.c_null_dts[2] AS f0 FROM PCOLLECTION t"));
386+
387+
PAssert.that(result).containsInAnyOrder(expected);
388+
389+
pipeline.run();
390+
}
391+
352392
@Test
353393
public void testRowConstructor() {
354394
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);

0 commit comments

Comments
 (0)