Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,24 @@
<td><p>Enum</p></td>
<td>Because of the disorder of ChangeLog data caused by Shuffle in distributed system, the data received by Sink may not be the order of global upsert. So add upsert materialize operator before upsert sink. It receives the upstream changelog records and generate an upsert view for the downstream.<br />By default, the materialize operator will be added when a distributed disorder occurs on unique keys. You can also choose no materialization(NONE) or force materialization(FORCE).<br /><br />Possible values:<ul><li>"NONE"</li><li>"AUTO"</li><li>"FORCE"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sink.upsert-materialize.adaptive.threshold.high</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. If not specified, Flink uses state-backend specific defaults.<br />The option takes effect during job (re)starting<br /></td>
</tr>
<tr>
<td><h5>table.exec.sink.upsert-materialize.adaptive.threshold.low</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE. If not specified, Flink uses state-backend specific defaults.<br />The option takes effect during job (re)starting<br /></td>
</tr>
<tr>
<td><h5>table.exec.sink.upsert-materialize.strategy</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">(none)</td>
<td><p>Enum</p></td>
<td>Which strategy of SinkUpsertMaterializer to use. Supported strategies:<br />LEGACY: Simple implementation based on ValueState&lt;List&gt; (the original implementation).<br />MAP: OrderedMultiSetState-based implementation based on a combination of several MapState maintaining ordering and fast lookup properties.<br />VALUE: Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.<br />ADAPTIVE: Alternate between MAP and VALUE depending on the number of entries for the given key starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low).<br />The default is LEGACY<br />The option takes effect during planning / compile plan generation. Existing jobs won't be affected by this option.<br /><br /><br />Possible values:<ul><li>"LEGACY"</li><li>"MAP"</li><li>"VALUE"</li><li>"ADAPTIVE"</li></ul></td>
</tr>
<tr>
<td><h5>table.exec.sort.async-merge-enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,4 +338,13 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
org.apache.flink.api.common.state.v2.AggregatingStateDescriptor<
IN, ACC, OUT>
stateProperties);

/**
* @return fixed lower-case string identifying the type of the underlying state backend, e.g.
* rocksdb, hashmap, or unknown.
*/
@Experimental
default String getBackendTypeIdentifier() {
return "unknown";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,15 @@ private PrecomputedParameters(
this.stateful = stateful;
}

static PrecomputedParameters precompute(
public static PrecomputedParameters precompute(
boolean immutableTargetType, TypeSerializer<Object>[] fieldSerializers) {
return precompute(immutableTargetType, false, fieldSerializers);
}

public static PrecomputedParameters precompute(
boolean immutableTargetType,
boolean forceFieldsImmutable,
TypeSerializer<Object>[] fieldSerializers) {
Preconditions.checkNotNull(fieldSerializers);
int totalLength = 0;
boolean fieldsImmutable = true;
Expand All @@ -239,7 +246,7 @@ static PrecomputedParameters precompute(
if (fieldSerializer != fieldSerializer.duplicate()) {
stateful = true;
}
if (!fieldSerializer.isImmutableType()) {
if (!forceFieldsImmutable && !fieldSerializer.isImmutableType()) {
fieldsImmutable = false;
}
if (fieldSerializer.getLength() < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s
operator.snapshotState(checkpointId, timestamp, options, storage);

OperatorSubtaskState state =
new OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
OperatorSnapshotFinalizer.create(snapshotInProgress).getJobManagerOwnedState();

operator.notifyCheckpointComplete(checkpointId);
return new TaggedOperatorSubtaskState(index, state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.v2.State;
Expand Down Expand Up @@ -140,4 +141,13 @@ default boolean isSafeToReuseKVState() {

@Override
void dispose();

/**
* @return fixed lower-case string identifying the type of the underlying state backend, e.g.
* rocksdb, hashmap, or unknown.
*/
@Experimental
default String getBackendTypeIdentifier() {
return "unknown";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ org.apache.flink.api.common.state.v2.AggregatingState<IN, OUT> getAggregatingSta
}
}

@Override
public String getBackendTypeIdentifier() {
if (keyedStateBackend != null) {
return keyedStateBackend.getBackendTypeIdentifier();
} else {
return asyncKeyedStateBackend.getBackendTypeIdentifier();
}
}

protected <S extends org.apache.flink.api.common.state.v2.State, SV> S getPartitionedState(
org.apache.flink.api.common.state.v2.StateDescriptor<SV> stateDescriptor)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.state;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
Expand Down Expand Up @@ -166,6 +167,15 @@ default boolean isSafeToReuseKVState() {
return false;
}

/**
* @return fixed lower-case string identifying the type of the underlying state backend, e.g.
* rocksdb, hashmap, or unknown.
*/
@Experimental
default String getBackendTypeIdentifier() {
return "unknown";
}

/** Listener is given a callback when {@link #setCurrentKey} is called (key context changes). */
@FunctionalInterface
interface KeySelectionListener<K> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.util.concurrent.FutureUtils;

import javax.annotation.Nonnull;

import java.util.concurrent.ExecutionException;

import static org.apache.flink.runtime.checkpoint.StateObjectCollection.emptyIfNull;
import static org.apache.flink.runtime.checkpoint.StateObjectCollection.singletonOrEmpty;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This class finalizes {@link OperatorSnapshotFutures}. Each object is created with a {@link
Expand All @@ -47,8 +46,9 @@ public class OperatorSnapshotFinalizer {
/** Secondary replica of the operator subtask state for faster, local recovery on TM. */
private final OperatorSubtaskState taskLocalState;

public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFutures)
public static OperatorSnapshotFinalizer create(OperatorSnapshotFutures snapshotFutures)
throws ExecutionException, InterruptedException {
checkNotNull(snapshotFutures);

SnapshotResult<KeyedStateHandle> keyedManaged =
FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());
Expand All @@ -68,7 +68,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture
SnapshotResult<StateObjectCollection<OutputStateHandle>> resultSubpartition =
snapshotFutures.getResultSubpartitionStateFuture().get();

jobManagerOwnedState =
OperatorSubtaskState jobManagerOwnedState =
OperatorSubtaskState.builder()
.setManagedOperatorState(
singletonOrEmpty(operatorManaged.getJobManagerOwnedSnapshot()))
Expand All @@ -83,7 +83,7 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture
emptyIfNull(resultSubpartition.getJobManagerOwnedSnapshot()))
.build();

taskLocalState =
OperatorSubtaskState taskLocalState =
OperatorSubtaskState.builder()
.setManagedOperatorState(
singletonOrEmpty(operatorManaged.getTaskLocalSnapshot()))
Expand All @@ -94,6 +94,14 @@ public OperatorSnapshotFinalizer(@Nonnull OperatorSnapshotFutures snapshotFuture
.setResultSubpartitionState(
emptyIfNull(resultSubpartition.getTaskLocalSnapshot()))
.build();

return new OperatorSnapshotFinalizer(jobManagerOwnedState, taskLocalState);
}

public OperatorSnapshotFinalizer(
OperatorSubtaskState jobManagerOwnedState, OperatorSubtaskState taskLocalState) {
this.jobManagerOwnedState = checkNotNull(jobManagerOwnedState);
this.taskLocalState = checkNotNull(taskLocalState);
}

public OperatorSubtaskState getTaskLocalState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws Exception

// finalize the async part of all by executing all snapshot runnables
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
OperatorSnapshotFinalizer.create(snapshotInProgress);

jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID, finalizedSnapshots.getJobManagerOwnedState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void testRunAndExtract() throws Exception {
assertThat(f).isNotDone();
}

OperatorSnapshotFinalizer finalizer = new OperatorSnapshotFinalizer(snapshotFutures);
OperatorSnapshotFinalizer finalizer = OperatorSnapshotFinalizer.create(snapshotFutures);

for (Future<?> f : snapshotFutures.getAllFutures()) {
assertThat(f).isDone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ public OperatorSnapshotFinalizer snapshotWithLocalState(
checkpointStorageAccess.resolveCheckpointStorageLocation(
checkpointId, locationReference));

return new OperatorSnapshotFinalizer(operatorStateResult);
return OperatorSnapshotFinalizer.create(operatorStateResult);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,66 @@ public class ExecutionConfigOptions {
+ "or force materialization(FORCE).")
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long>
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_LOW =
key("table.exec.sink.upsert-materialize.adaptive.threshold.low")
.longType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from MAP to VALUE. If not specified, Flink uses state-backend specific defaults.")
.linebreak()
.text("The option takes effect during job (re)starting")
.linebreak()
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<Long>
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_ADAPTIVE_THRESHOLD_HIGH =
key("table.exec.sink.upsert-materialize.adaptive.threshold.high")
.longType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"When using strategy=ADAPTIVE, defines the number of entries per key when the implementation is changed from VALUE to MAP. If not specified, Flink uses state-backend specific defaults.")
.linebreak()
.text("The option takes effect during job (re)starting")
.linebreak()
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<SinkUpsertMaterializeStrategy>
TABLE_EXEC_SINK_UPSERT_MATERIALIZE_STRATEGY =
key("table.exec.sink.upsert-materialize.strategy")
.enumType(SinkUpsertMaterializeStrategy.class)
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"Which strategy of SinkUpsertMaterializer to use. Supported strategies:")
.linebreak()
.text(
"LEGACY: Simple implementation based on ValueState<List> (the original implementation).")
.linebreak()
.text(
"MAP: OrderedMultiSetState-based implementation based on a combination of several MapState maintaining ordering and fast lookup properties.")
.linebreak()
.text(
"VALUE: Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.")
.linebreak()
.text(
"ADAPTIVE: Alternate between MAP and VALUE depending on the number of entries for the given key starting with VALUE and switching to MAP upon reaching threshold.high value (and back to VALUE, when reaching low).")
.linebreak()
.text("The default is LEGACY")
.linebreak()
.text(
"The option takes effect during planning / compile plan generation. Existing jobs won't be affected by this option.")
.linebreak()
.build());

@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
public static final ConfigOption<SinkKeyedShuffle> TABLE_EXEC_SINK_KEYED_SHUFFLE =
key("table.exec.sink.keyed-shuffle")
Expand Down Expand Up @@ -883,6 +943,43 @@ public enum RetryStrategy {
FIXED_DELAY
}

/** SinkUpsertMaterializer strategy. */
@PublicEvolving
public enum SinkUpsertMaterializeStrategy {
/**
* Simple implementation based on ValueState<List> (the original implementation).
*
* <ul>
* <li>optimal for cases with history under approx. 100 elements
* <li>limited TTL support (per key granularity, i.e. no expiration for old history
* elements)
* </ul>
*/
LEGACY,
/**
* OrderedMultiSetState-based implementation based on a combination of several MapState
* maintaining ordering and fast lookup properties.
*
* <ul>
* <li>faster and more memory-efficient on long histories
* <li>slower on short histories
* <li>currently, no TTL support (to be added in the future)
* <li>requires more space
* </ul>
*/
MAP,
/**
* Similar to LEGACY, but compatible with MAP and therefore allows to switch to ADAPTIVE.
*/
VALUE,
/**
* Alternate between MAP and VALUE depending on the number of entries for the given key
* starting with VALUE and switching to MAP upon reaching threshold.high value (and back to
* VALUE, when reaching low).
*/
ADAPTIVE
}

/** Determine if CAST operates using the legacy behaviour or the new one. */
@Deprecated
public enum LegacyCastBehaviour implements DescribedEnum {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@
public class ProjectedRowData implements RowData {

private final int[] indexMapping;
private final boolean isNullAtNonProjected;

private RowData row;

private ProjectedRowData(int[] indexMapping) {
this(indexMapping, false);
}

protected ProjectedRowData(int[] indexMapping, boolean isNullAtNonProjected) {
this.indexMapping = indexMapping;
this.isNullAtNonProjected = isNullAtNonProjected;
}

/**
Expand Down Expand Up @@ -82,7 +88,8 @@ public void setRowKind(RowKind kind) {

@Override
public boolean isNullAt(int pos) {
return row.isNullAt(indexMapping[pos]);
return (pos >= indexMapping.length && isNullAtNonProjected)
|| row.isNullAt(indexMapping[pos]);
}

@Override
Expand Down Expand Up @@ -186,6 +193,15 @@ public String toString() {
+ '}';
}

/**
* Returns a new {@link ProjectedRowData} that, depending on isNullAtNonProjected, returns null
* from {@link #isNullAt} if the index is out of range or throws {@link
* ArrayIndexOutOfBoundsException}.
*/
public ProjectedRowData withNullAtNonProjected(boolean isNullAtNonProjected) {
return new ProjectedRowData(this.indexMapping, isNullAtNonProjected);
}

/**
* Like {@link #from(int[])}, but throws {@link IllegalArgumentException} if the provided {@code
* projection} array contains nested projections, which are not supported by {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.typeutils;
package org.apache.flink.table.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

Expand All @@ -28,7 +29,8 @@
import java.util.Collections;
import java.util.List;

/** Utils for deriving row types of {@link org.apache.calcite.rel.RelNode}s. */
/** Utils for deriving row types of org.apache.calcite.rel.RelNode. */
@Internal
public class RowTypeUtils {

public static String getUniqueName(String oldName, List<String> checklist) {
Expand Down
Loading