|
19 | 19 | package org.apache.paimon.predicate; |
20 | 20 |
|
21 | 21 | import org.apache.paimon.annotation.Public; |
| 22 | +import org.apache.paimon.data.BinaryString; |
| 23 | +import org.apache.paimon.data.Decimal; |
| 24 | +import org.apache.paimon.data.Timestamp; |
22 | 25 | import org.apache.paimon.types.DataField; |
| 26 | +import org.apache.paimon.types.DataType; |
| 27 | +import org.apache.paimon.types.DecimalType; |
23 | 28 | import org.apache.paimon.types.RowType; |
24 | 29 | import org.apache.paimon.utils.Pair; |
25 | 30 | import org.apache.paimon.utils.Preconditions; |
26 | 31 |
|
27 | 32 | import javax.annotation.Nullable; |
28 | 33 |
|
| 34 | +import java.math.BigDecimal; |
| 35 | +import java.sql.Date; |
| 36 | +import java.time.Instant; |
| 37 | +import java.time.LocalDate; |
| 38 | +import java.time.LocalDateTime; |
| 39 | +import java.time.LocalTime; |
| 40 | +import java.time.ZoneId; |
| 41 | +import java.time.ZoneOffset; |
| 42 | +import java.time.temporal.ChronoUnit; |
29 | 43 | import java.util.ArrayList; |
30 | 44 | import java.util.Arrays; |
31 | 45 | import java.util.Collections; |
32 | 46 | import java.util.List; |
| 47 | +import java.util.Map; |
33 | 48 | import java.util.Objects; |
34 | 49 | import java.util.Optional; |
35 | 50 | import java.util.Set; |
36 | 51 | import java.util.stream.Collectors; |
37 | 52 |
|
38 | 53 | import static java.util.Collections.singletonList; |
| 54 | +import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal; |
39 | 55 |
|
40 | 56 | /** |
41 | 57 | * A utility class to create {@link Predicate} object for common filter conditions. |
@@ -302,6 +318,98 @@ private static void splitCompound( |
302 | 318 | } |
303 | 319 | } |
304 | 320 |
|
| 321 | + public static Object convertJavaObject(DataType literalType, Object o) { |
| 322 | + if (o == null) { |
| 323 | + return null; |
| 324 | + } |
| 325 | + switch (literalType.getTypeRoot()) { |
| 326 | + case BOOLEAN: |
| 327 | + return o; |
| 328 | + case BIGINT: |
| 329 | + return ((Number) o).longValue(); |
| 330 | + case DOUBLE: |
| 331 | + return ((Number) o).doubleValue(); |
| 332 | + case TINYINT: |
| 333 | + return ((Number) o).byteValue(); |
| 334 | + case SMALLINT: |
| 335 | + return ((Number) o).shortValue(); |
| 336 | + case INTEGER: |
| 337 | + return ((Number) o).intValue(); |
| 338 | + case FLOAT: |
| 339 | + return ((Number) o).floatValue(); |
| 340 | + case CHAR: |
| 341 | + case VARCHAR: |
| 342 | + return BinaryString.fromString(o.toString()); |
| 343 | + case DATE: |
| 344 | + // Hive uses `java.sql.Date.valueOf(lit.toString());` to convert a literal to Date |
| 345 | + // Which uses `java.util.Date()` internally to create the object and that uses the |
| 346 | + // TimeZone.getDefaultRef() |
| 347 | + // To get back the expected date we have to use the LocalDate which gets rid of the |
| 348 | + // TimeZone misery as it uses the year/month/day to generate the object |
| 349 | + LocalDate localDate; |
| 350 | + if (o instanceof java.sql.Timestamp) { |
| 351 | + localDate = ((java.sql.Timestamp) o).toLocalDateTime().toLocalDate(); |
| 352 | + } else if (o instanceof Date) { |
| 353 | + localDate = ((Date) o).toLocalDate(); |
| 354 | + } else if (o instanceof LocalDate) { |
| 355 | + localDate = (LocalDate) o; |
| 356 | + } else { |
| 357 | + throw new UnsupportedOperationException( |
| 358 | + "Unexpected date literal of class " + o.getClass().getName()); |
| 359 | + } |
| 360 | + LocalDate epochDay = |
| 361 | + Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).toLocalDate(); |
| 362 | + return (int) ChronoUnit.DAYS.between(epochDay, localDate); |
| 363 | + case TIME_WITHOUT_TIME_ZONE: |
| 364 | + LocalTime localTime; |
| 365 | + if (o instanceof java.sql.Time) { |
| 366 | + localTime = ((java.sql.Time) o).toLocalTime(); |
| 367 | + } else if (o instanceof java.time.LocalTime) { |
| 368 | + localTime = (java.time.LocalTime) o; |
| 369 | + } else { |
| 370 | + throw new UnsupportedOperationException( |
| 371 | + "Unexpected time literal of class " + o.getClass().getName()); |
| 372 | + } |
| 373 | + // return millis of a day |
| 374 | + return (int) (localTime.toNanoOfDay() / 1_000_000); |
| 375 | + case DECIMAL: |
| 376 | + DecimalType decimalType = (DecimalType) literalType; |
| 377 | + int precision = decimalType.getPrecision(); |
| 378 | + int scale = decimalType.getScale(); |
| 379 | + return Decimal.fromBigDecimal((BigDecimal) o, precision, scale); |
| 380 | + case TIMESTAMP_WITHOUT_TIME_ZONE: |
| 381 | + if (o instanceof java.sql.Timestamp) { |
| 382 | + return Timestamp.fromSQLTimestamp((java.sql.Timestamp) o); |
| 383 | + } else if (o instanceof Instant) { |
| 384 | + Instant o1 = (Instant) o; |
| 385 | + LocalDateTime dateTime = o1.atZone(ZoneId.systemDefault()).toLocalDateTime(); |
| 386 | + return Timestamp.fromLocalDateTime(dateTime); |
| 387 | + } else if (o instanceof LocalDateTime) { |
| 388 | + return Timestamp.fromLocalDateTime((LocalDateTime) o); |
| 389 | + } else { |
| 390 | + throw new UnsupportedOperationException( |
| 391 | + String.format( |
| 392 | + "Unsupported class %s for timestamp without timezone ", |
| 393 | + o.getClass())); |
| 394 | + } |
| 395 | + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: |
| 396 | + if (o instanceof java.sql.Timestamp) { |
| 397 | + java.sql.Timestamp timestamp = (java.sql.Timestamp) o; |
| 398 | + return Timestamp.fromInstant(timestamp.toInstant()); |
| 399 | + } else if (o instanceof Instant) { |
| 400 | + return Timestamp.fromInstant((Instant) o); |
| 401 | + } else { |
| 402 | + throw new UnsupportedOperationException( |
| 403 | + String.format( |
| 404 | + "Unsupported class %s for timestamp with local time zone ", |
| 405 | + o.getClass())); |
| 406 | + } |
| 407 | + default: |
| 408 | + throw new UnsupportedOperationException( |
| 409 | + "Unsupported predicate leaf type " + literalType.getTypeRoot().name()); |
| 410 | + } |
| 411 | + } |
| 412 | + |
305 | 413 | public static List<Predicate> pickTransformFieldMapping( |
306 | 414 | List<Predicate> predicates, List<String> inputFields, List<String> pickedFields) { |
307 | 415 | return pickTransformFieldMapping( |
@@ -380,6 +488,35 @@ public static List<Predicate> excludePredicateWithFields( |
380 | 488 | .collect(Collectors.toList()); |
381 | 489 | } |
382 | 490 |
|
| 491 | + @Nullable |
| 492 | + public static Predicate partition( |
| 493 | + Map<String, String> map, RowType rowType, String defaultPartValue) { |
| 494 | + Map<String, Object> internalValues = convertSpecToInternal(map, rowType, defaultPartValue); |
| 495 | + List<String> fieldNames = rowType.getFieldNames(); |
| 496 | + Predicate predicate = null; |
| 497 | + PredicateBuilder builder = new PredicateBuilder(rowType); |
| 498 | + for (Map.Entry<String, Object> entry : internalValues.entrySet()) { |
| 499 | + int idx = fieldNames.indexOf(entry.getKey()); |
| 500 | + Object literal = internalValues.get(entry.getKey()); |
| 501 | + Predicate predicateTemp = |
| 502 | + literal == null ? builder.isNull(idx) : builder.equal(idx, literal); |
| 503 | + if (predicate == null) { |
| 504 | + predicate = predicateTemp; |
| 505 | + } else { |
| 506 | + predicate = PredicateBuilder.and(predicate, predicateTemp); |
| 507 | + } |
| 508 | + } |
| 509 | + return predicate; |
| 510 | + } |
| 511 | + |
| 512 | + public static Predicate partitions( |
| 513 | + List<Map<String, String>> partitions, RowType rowType, String defaultPartValue) { |
| 514 | + return PredicateBuilder.or( |
| 515 | + partitions.stream() |
| 516 | + .map(p -> PredicateBuilder.partition(p, rowType, defaultPartValue)) |
| 517 | + .toArray(Predicate[]::new)); |
| 518 | + } |
| 519 | + |
383 | 520 | public static int[] fieldIdxToPartitionIdx(RowType tableType, List<String> partitionKeys) { |
384 | 521 | return tableType.getFieldNames().stream().mapToInt(partitionKeys::indexOf).toArray(); |
385 | 522 | } |
|
0 commit comments