Skip to content

Commit d7959fb

Browse files
authored
[core][flink] Remove legacy codes for 0.2, 0.3 and log system (#6923)
1 parent bce56bf commit d7959fb

File tree

152 files changed

+464
-8197
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

152 files changed

+464
-8197
lines changed

docs/content/maintenance/rescale-bucket.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,6 @@ Please note that
6666
```
6767
- During overwrite period, make sure there are no other jobs writing the same table/partition.
6868

69-
{{< hint info >}}
70-
__Note:__ For the table which enables log system(*e.g.* Kafka), please rescale the topic's partition as well to keep consistency.
71-
{{< /hint >}}
72-
7369
## Use Case
7470

7571
Rescale bucket helps to handle sudden spikes in throughput. Suppose there is a daily streaming ETL task to sync transaction data. The table's DDL and pipeline

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,12 +1272,6 @@
12721272
<td>Boolean</td>
12731273
<td>Whether to read the delta from append table's overwrite commit in streaming mode.</td>
12741274
</tr>
1275-
<tr>
1276-
<td><h5>streaming-read-mode</h5></td>
1277-
<td style="word-wrap: break-word;">(none)</td>
1278-
<td><p>Enum</p></td>
1279-
<td>The mode of streaming read that specifies to read the data of table file or log.<br /><br />Possible values:<ul><li>"log": Read from the data of table log store.</li><li>"file": Read from the data of table file store.</li></ul></td>
1280-
</tr>
12811275
<tr>
12821276
<td><h5>streaming-read-overwrite</h5></td>
12831277
<td style="word-wrap: break-word;">false</td>

docs/layouts/shortcodes/generated/kafka_log_configuration.html

Lines changed: 0 additions & 42 deletions
This file was deleted.

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 0 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.paimon.annotation.Documentation;
2222
import org.apache.paimon.annotation.Documentation.ExcludeFromDocumentation;
2323
import org.apache.paimon.annotation.Documentation.Immutable;
24-
import org.apache.paimon.annotation.VisibleForTesting;
2524
import org.apache.paimon.compression.CompressOptions;
2625
import org.apache.paimon.fileindex.FileIndexOptions;
2726
import org.apache.paimon.fs.Path;
@@ -969,42 +968,6 @@ public InlineElement getDescription() {
969968
.withDescription(
970969
"The delay duration of stream read when scan incremental snapshots.");
971970

972-
@ExcludeFromDocumentation("Confused without log system")
973-
public static final ConfigOption<LogConsistency> LOG_CONSISTENCY =
974-
key("log.consistency")
975-
.enumType(LogConsistency.class)
976-
.defaultValue(LogConsistency.TRANSACTIONAL)
977-
.withDescription("Specify the log consistency mode for table.");
978-
979-
@ExcludeFromDocumentation("Confused without log system")
980-
public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE =
981-
key("log.changelog-mode")
982-
.enumType(LogChangelogMode.class)
983-
.defaultValue(LogChangelogMode.AUTO)
984-
.withDescription("Specify the log changelog mode for table.");
985-
986-
@ExcludeFromDocumentation("Confused without log system")
987-
public static final ConfigOption<String> LOG_KEY_FORMAT =
988-
key("log.key.format")
989-
.stringType()
990-
.defaultValue("json")
991-
.withDescription(
992-
"Specify the key message format of log system with primary key.");
993-
994-
@ExcludeFromDocumentation("Confused without log system")
995-
public static final ConfigOption<String> LOG_FORMAT =
996-
key("log.format")
997-
.stringType()
998-
.defaultValue("debezium-json")
999-
.withDescription("Specify the message format of log system.");
1000-
1001-
@ExcludeFromDocumentation("Confused without log system")
1002-
public static final ConfigOption<Boolean> LOG_IGNORE_DELETE =
1003-
key("log.ignore-delete")
1004-
.booleanType()
1005-
.defaultValue(false)
1006-
.withDescription("Specify whether the log system ignores delete records.");
1007-
1008971
public static final ConfigOption<Boolean> AUTO_CREATE =
1009972
key("auto-create")
1010973
.booleanType()
@@ -1272,13 +1235,6 @@ public InlineElement getDescription() {
12721235
"Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' "
12731236
+ "dedicated internal streaming scan.");
12741237

1275-
public static final ConfigOption<StreamingReadMode> STREAMING_READ_MODE =
1276-
key("streaming-read-mode")
1277-
.enumType(StreamingReadMode.class)
1278-
.noDefaultValue()
1279-
.withDescription(
1280-
"The mode of streaming read that specifies to read the data of table file or log.");
1281-
12821238
@ExcludeFromDocumentation("Internal use only")
12831239
public static final ConfigOption<BatchScanMode> BATCH_SCAN_MODE =
12841240
key("batch-scan-mode")
@@ -2770,10 +2726,6 @@ public boolean scanPlanSortPartition() {
27702726
}
27712727

27722728
public StartupMode startupMode() {
2773-
return startupMode(options);
2774-
}
2775-
2776-
public static StartupMode startupMode(Options options) {
27772729
StartupMode mode = options.get(SCAN_MODE);
27782730
if (mode == StartupMode.DEFAULT) {
27792731
if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()
@@ -3001,10 +2953,6 @@ public Integer fullCompactionDeltaCommits() {
30012953
return options.get(FULL_COMPACTION_DELTA_COMMITS);
30022954
}
30032955

3004-
public static StreamingReadMode streamReadType(Options options) {
3005-
return options.get(STREAMING_READ_MODE);
3006-
}
3007-
30082956
public Duration consumerExpireTime() {
30092957
return options.get(CONSUMER_EXPIRATION_TIME);
30102958
}
@@ -3471,67 +3419,6 @@ public InlineElement getDescription() {
34713419
}
34723420
}
34733421

3474-
/** Specifies the log consistency mode for table. */
3475-
public enum LogConsistency implements DescribedEnum {
3476-
TRANSACTIONAL(
3477-
"transactional",
3478-
"Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
3479-
3480-
EVENTUAL(
3481-
"eventual",
3482-
"Immediate data visibility, you may see some intermediate states, "
3483-
+ "but eventually the right results will be produced, only works for table with primary key.");
3484-
3485-
private final String value;
3486-
private final String description;
3487-
3488-
LogConsistency(String value, String description) {
3489-
this.value = value;
3490-
this.description = description;
3491-
}
3492-
3493-
@Override
3494-
public String toString() {
3495-
return value;
3496-
}
3497-
3498-
@Override
3499-
public InlineElement getDescription() {
3500-
return text(description);
3501-
}
3502-
}
3503-
3504-
/** Specifies the log changelog mode for table. */
3505-
public enum LogChangelogMode implements DescribedEnum {
3506-
AUTO("auto", "Upsert for table with primary key, all for table without primary key."),
3507-
3508-
ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
3509-
3510-
UPSERT(
3511-
"upsert",
3512-
"The log system does not store the UPDATE_BEFORE changes, the log consumed job"
3513-
+ " will automatically add the normalized node, relying on the state"
3514-
+ " to generate the required update_before.");
3515-
3516-
private final String value;
3517-
private final String description;
3518-
3519-
LogChangelogMode(String value, String description) {
3520-
this.value = value;
3521-
this.description = description;
3522-
}
3523-
3524-
@Override
3525-
public String toString() {
3526-
return value;
3527-
}
3528-
3529-
@Override
3530-
public InlineElement getDescription() {
3531-
return text(description);
3532-
}
3533-
}
3534-
35353422
/** Specifies the changelog producer for table. */
35363423
public enum ChangelogProducer implements DescribedEnum {
35373424
NONE("none", "No changelog file."),
@@ -3563,49 +3450,6 @@ public InlineElement getDescription() {
35633450
}
35643451
}
35653452

3566-
/** Specifies the type for streaming read. */
3567-
public enum StreamingReadMode implements DescribedEnum {
3568-
LOG("log", "Read from the data of table log store."),
3569-
FILE("file", "Read from the data of table file store.");
3570-
3571-
private final String value;
3572-
private final String description;
3573-
3574-
StreamingReadMode(String value, String description) {
3575-
this.value = value;
3576-
this.description = description;
3577-
}
3578-
3579-
@Override
3580-
public String toString() {
3581-
return value;
3582-
}
3583-
3584-
@Override
3585-
public InlineElement getDescription() {
3586-
return text(description);
3587-
}
3588-
3589-
public String getValue() {
3590-
return value;
3591-
}
3592-
3593-
@VisibleForTesting
3594-
public static StreamingReadMode fromValue(String value) {
3595-
for (StreamingReadMode formatType : StreamingReadMode.values()) {
3596-
if (formatType.value.equals(value)) {
3597-
return formatType;
3598-
}
3599-
}
3600-
throw new IllegalArgumentException(
3601-
String.format(
3602-
"Invalid format type %s, only support [%s]",
3603-
value,
3604-
StringUtils.join(
3605-
Arrays.stream(StreamingReadMode.values()).iterator(), ",")));
3606-
}
3607-
}
3608-
36093453
/** Inner stream scan mode for some internal requirements. */
36103454
public enum StreamScanMode implements DescribedEnum {
36113455
NONE("none", "No requirement."),

0 commit comments

Comments
 (0)