Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/content/flink/sql-lookup.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,17 @@ CREATE TABLE customers (
```sql
SELECT o.order_id, o.total, c.country, c.zip
FROM orders AS o
JOIN customers /*+ OPTIONS('lookup.dynamic-partition'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
JOIN customers /*+ OPTIONS('scan.partitions'='max_pt()', 'lookup.dynamic-partition.refresh-interval'='1 h') */
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```

The Lookup node will automatically refresh the latest partition and query the data of the latest partition.

The option `scan.partitions` can also specify fixed partitions in the form of `key1=value1,key2=value2`.
Multiple partitions should be separated by semicolon (`;`).
When specifying fixed partitions, this option can also be used in batch joins.

## Query Service

You can run a Flink Streaming Job to start query service for the table. When QueryService exists, Flink Lookup Join
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,6 @@
<td><p>Enum</p></td>
<td>The cache mode of lookup join.<br /><br />Possible values:<ul><li>"AUTO"</li><li>"FULL"</li></ul></td>
</tr>
<tr>
<td><h5>lookup.dynamic-partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specific dynamic partition for lookup, supports 'max_pt()' and 'max_two_pt()' currently.</td>
</tr>
<tr>
<td><h5>lookup.dynamic-partition.refresh-interval</h5></td>
<td style="word-wrap: break-word;">1 h</td>
Expand Down Expand Up @@ -134,6 +128,12 @@
<td>Integer</td>
<td>Define a custom parallelism for the scan source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. If user enable the scan.infer-parallelism, the planner will derive the parallelism by inferred parallelism.</td>
</tr>
<tr>
<td><h5>scan.partitions</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify the partitions to scan. Partitions should be given in the form of key1=value1,key2=value2. Partition keys not specified will be filled with the value of partition.default-name. Multiple partitions should be separated by semicolon (;). This option can support normal source tables and lookup join tables. For lookup joins, two special values max_pt() and max_two_pt() are also supported, specifying the (two) partition(s) with the largest partition value.</td>
</tr>
<tr>
<td><h5>scan.remove-normalize</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,20 @@ public class FlinkConnectorOptions {
.defaultValue(LookupCacheMode.AUTO)
.withDescription("The cache mode of lookup join.");

public static final ConfigOption<String> LOOKUP_DYNAMIC_PARTITION =
ConfigOptions.key("lookup.dynamic-partition")
public static final ConfigOption<String> SCAN_PARTITIONS =
ConfigOptions.key("scan.partitions")
.stringType()
.noDefaultValue()
.withDescription(
"Specific dynamic partition for lookup, supports 'max_pt()' and 'max_two_pt()' currently.");
.withFallbackKeys("lookup.dynamic-partition")
.withDescription(
"Specify the partitions to scan. "
+ "Partitions should be given in the form of key1=value1,key2=value2. "
+ "Partition keys not specified will be filled with the value of "
+ CoreOptions.PARTITION_DEFAULT_NAME.key()
+ ". Multiple partitions should be separated by semicolon (;). "
+ "This option can support normal source tables and lookup join tables. "
+ "For lookup joins, two special values max_pt() and max_two_pt() are also supported, "
+ "specifying the (two) partition(s) with the largest partition value.");

public static final ConfigOption<Duration> LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL =
ConfigOptions.key("lookup.dynamic-partition.refresh-interval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,107 +21,46 @@
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION;
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Dynamic partition for lookup. */
public class DynamicPartitionLoader implements Serializable {
public class DynamicPartitionLoader extends PartitionLoader {

private static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionLoader.class);

private static final long serialVersionUID = 1L;

private static final String MAX_PT = "max_pt()";
private static final long serialVersionUID = 2L;

private static final String MAX_TWO_PT = "max_two_pt()";

private final Table table;
private final Duration refreshInterval;
private final int maxPartitionNum;
private final RowDataToObjectArrayConverter partitionConverter;

private Comparator<InternalRow> comparator;

private LocalDateTime lastRefresh;
private List<BinaryRow> partitions;
private transient Comparator<InternalRow> comparator;
private transient LocalDateTime lastRefresh;

private DynamicPartitionLoader(Table table, Duration refreshInterval, int maxPartitionNum) {
this.table = table;
DynamicPartitionLoader(FileStoreTable table, Duration refreshInterval, int maxPartitionNum) {
super(table);
this.refreshInterval = refreshInterval;
this.maxPartitionNum = maxPartitionNum;
this.partitionConverter =
new RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
}

@Override
public void open() {
super.open();
RowType partitionType = table.rowType().project(table.partitionKeys());
this.comparator = CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
this.partitions = Collections.emptyList();
}

public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields) {
List<String> partitionKeys = table.partitionKeys();
checkArgument(joinKeys.stream().noneMatch(partitionKeys::contains));
joinKeys.addAll(partitionKeys);

partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add);
}

public List<BinaryRow> partitions() {
return partitions;
}

public Predicate createSpecificPartFilter() {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec = partitionConverter.convert(partition);
Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
for (int i = 0; i < partitionSpec.length; i++) {
partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}

// create partition predicate base on rowType instead of partitionType
return createPartitionPredicate(rowType, partitionMap);
}

/** @return true if partition changed. */
@Override
public boolean checkRefresh() {
if (lastRefresh != null
&& !lastRefresh.plus(refreshInterval).isBefore(LocalDateTime.now())) {
Expand Down Expand Up @@ -187,35 +126,4 @@ private List<BinaryRow> getMaxPartitions() {
return newPartitions.subList(0, maxPartitionNum);
}
}

@Nullable
public static DynamicPartitionLoader of(Table table) {
Options options = Options.fromMap(table.options());
String dynamicPartition = options.get(LOOKUP_DYNAMIC_PARTITION);
if (dynamicPartition == null) {
return null;
}

checkArgument(
!table.partitionKeys().isEmpty(),
"{} is not supported for non-partitioned table.",
LOOKUP_DYNAMIC_PARTITION);

int maxPartitionNum;
switch (dynamicPartition.toLowerCase()) {
case MAX_PT:
maxPartitionNum = 1;
break;
case MAX_TWO_PT:
maxPartitionNum = 2;
break;
default:
throw new UnsupportedOperationException(
"Unsupported dynamic partition pattern: " + dynamicPartition);
}

Duration refresh =
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);

private final Table table;
@Nullable private final DynamicPartitionLoader partitionLoader;
@Nullable private final PartitionLoader partitionLoader;
private final List<String> projectFields;
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
Expand All @@ -101,7 +101,10 @@ public class FileStoreLookupFunction implements Serializable, Closeable {
@Nullable private Filter<InternalRow> cacheRowFilter;

public FileStoreLookupFunction(
Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
FileStoreTable table,
int[] projection,
int[] joinKeyIndex,
@Nullable Predicate predicate) {
if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
TableScanUtils.streamingReadingValidate(table);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.lookup;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import javax.annotation.Nullable;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/** Specify partitions for lookup tables. */
public abstract class PartitionLoader implements Serializable {

private static final long serialVersionUID = 1L;

private static final String MAX_PT = "max_pt()";
private static final String MAX_TWO_PT = "max_two_pt()";

protected final FileStoreTable table;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use FileStoreTable instead of Table?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need specific methods like schema() in its subclass.

private final RowDataToObjectArrayConverter partitionConverter;

protected transient List<BinaryRow> partitions;

protected PartitionLoader(FileStoreTable table) {
this.table = table;
this.partitionConverter =
new RowDataToObjectArrayConverter(table.rowType().project(table.partitionKeys()));
}

public void open() {
this.partitions = new ArrayList<>();
}

public List<BinaryRow> partitions() {
return partitions;
}

public void addPartitionKeysTo(List<String> joinKeys, List<String> projectFields) {
List<String> partitionKeys = table.partitionKeys();
Preconditions.checkArgument(
joinKeys.stream().noneMatch(partitionKeys::contains),
"Currently, Paimon lookup table with partitions does not support partition keys in join keys.");
joinKeys.addAll(partitionKeys);

partitionKeys.stream().filter(k -> !projectFields.contains(k)).forEach(projectFields::add);
}

public Predicate createSpecificPartFilter() {
Predicate partFilter = null;
for (BinaryRow partition : partitions) {
if (partFilter == null) {
partFilter = createSinglePartFilter(partition);
} else {
partFilter = PredicateBuilder.or(partFilter, createSinglePartFilter(partition));
}
}
return partFilter;
}

private Predicate createSinglePartFilter(BinaryRow partition) {
RowType rowType = table.rowType();
List<String> partitionKeys = table.partitionKeys();
Object[] partitionSpec = partitionConverter.convert(partition);
Map<String, Object> partitionMap = new HashMap<>(partitionSpec.length);
for (int i = 0; i < partitionSpec.length; i++) {
partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
}

// create partition predicate base on rowType instead of partitionType
return PartitionPredicate.createPartitionPredicate(rowType, partitionMap);
}

/** @return true if partition changed. */
public abstract boolean checkRefresh();

@Nullable
public static PartitionLoader of(FileStoreTable table) {
Options options = Options.fromMap(table.options());
String scanPartitions = options.get(FlinkConnectorOptions.SCAN_PARTITIONS);
if (scanPartitions == null) {
return null;
}

Preconditions.checkArgument(
!table.partitionKeys().isEmpty(),
"{} is not supported for non-partitioned table.",
FlinkConnectorOptions.SCAN_PARTITIONS.key());

int maxPartitionNum = -1;
switch (scanPartitions.toLowerCase()) {
case MAX_PT:
maxPartitionNum = 1;
break;
case MAX_TWO_PT:
maxPartitionNum = 2;
break;
}

if (maxPartitionNum == -1) {
return new StaticPartitionLoader(
table, ParameterUtils.getPartitions(scanPartitions.split(";")));
} else {
Duration refresh =
options.get(FlinkConnectorOptions.LOOKUP_DYNAMIC_PARTITION_REFRESH_INTERVAL);
return new DynamicPartitionLoader(table, refresh, maxPartitionNum);
}
}
}
Loading