Skip to content

Commit bfc236b

Browse files
vamsikarnikaVamsinsivabalan
authored andcommitted
fix: fix downgrade to not delete unintended partitions in MDT (#14162)
--------- Co-authored-by: Vamsi <[email protected]> Co-authored-by: sivabalan <[email protected]>
1 parent 72f4452 commit bfc236b

File tree

8 files changed

+106
-22
lines changed

8 files changed

+106
-22
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,7 @@ public void dropIndex(List<String> metadataPartitions) {
11191119
});
11201120
});
11211121

1122-
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant);
1122+
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant, false, false);
11231123
// first update table config. Metadata writer initializes the inflight metadata
11241124
// partitions so we need to first remove the metadata before creating the writer
11251125
// Also the partitions need to be removed after creating the metadata writer since the writer

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,7 @@ public HoodieEngineContext getContext() {
10851085
* @return instance of {@link HoodieTableMetadataWriter}
10861086
*/
10871087
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp) {
1088-
return getMetadataWriter(triggeringInstantTimestamp, false);
1088+
return getMetadataWriter(triggeringInstantTimestamp, false, true);
10891089
}
10901090

10911091
/**
@@ -1094,8 +1094,8 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri
10941094
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
10951095
* @return instance of {@link HoodieTableMetadataWriter}
10961096
*/
1097-
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, boolean streamingWrites) {
1098-
return getMetadataWriter(triggeringInstantTimestamp, EAGER, streamingWrites);
1097+
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp, boolean streamingWrites, boolean autoDetectAndDeleteMetadataPartitions) {
1098+
return getMetadataWriter(triggeringInstantTimestamp, EAGER, streamingWrites, autoDetectAndDeleteMetadataPartitions);
10991099
}
11001100

11011101
/**
@@ -1105,7 +1105,7 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri
11051105
* @return An instance of {@link HoodieTableMetadataWriter}.
11061106
*/
11071107
public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String triggeringInstantTimestamp) {
1108-
return getMetadataWriter(triggeringInstantTimestamp, LAZY, false);
1108+
return getMetadataWriter(triggeringInstantTimestamp, LAZY, false, false);
11091109
}
11101110

11111111
/**
@@ -1121,12 +1121,14 @@ public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String trigge
11211121
* @param triggeringInstantTimestamp The instant that is triggering this metadata write
11221122
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
11231123
* @param streamingWrites Whether streaming write is enabled
1124+
* @param autoDetectAndDeleteMetadataPartitions true when metadata partitions could be deleted based on incoming write config properties.
11241125
* @return instance of {@link HoodieTableMetadataWriter}
11251126
*/
11261127
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
11271128
String triggeringInstantTimestamp,
11281129
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
1129-
boolean streamingWrites) {
1130+
boolean streamingWrites,
1131+
boolean autoDetectAndDeleteMetadataPartitions) {
11301132
// Each engine is expected to override this and
11311133
// provide the actual metadata writer, if enabled.
11321134
return Option.empty();

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ public void run(HoodieTableVersion toVersion, String instantTime) {
269269

270270
HoodieTable table = upgradeDowngradeHelper.getTable(updatedConfig, context);
271271
String newInstant = table.getMetaClient().createNewInstantTime(false);
272-
Option<HoodieTableMetadataWriter> mdtWriterOpt = table.getMetadataWriter(newInstant);
272+
Option<HoodieTableMetadataWriter> mdtWriterOpt = table.getMetadataWriter(newInstant, false, false);
273273
mdtWriterOpt.ifPresent(mdtWriter -> {
274274
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
275275
commitMetadata.setOperationType(WriteOperationType.UPSERT);

hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
100100
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
101101
String triggeringInstantTimestamp,
102102
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
103-
boolean streamingWrites) {
103+
boolean streamingWrites,
104+
boolean autoDetectAndDeleteMetadataPartitions) {
104105
if (isMetadataTable()) {
105106
return Option.empty();
106107
}

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
8181
@Override
8282
protected Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
8383
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
84-
boolean streamingWrites) {
84+
boolean streamingWrites,
85+
boolean autoDetectAndDeleteMetadataPartitions) {
8586
if (isMetadataTable()) {
8687
return Option.empty();
8788
}
@@ -93,8 +94,10 @@ protected Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringI
9394
getContext().getStorageConf(), config, failedWritesCleaningPolicy, getContext(),
9495
Option.of(triggeringInstantTimestamp));
9596
// even with metadata enabled, some index could have been disabled
96-
// delete metadata partitions corresponding to such indexes
97-
deleteMetadataIndexIfNecessary();
97+
// delete metadata partitions corresponding to such indexes if autoDetectAndDeleteMdtPartitions is enabled
98+
if (autoDetectAndDeleteMetadataPartitions) {
99+
deleteMetadataIndexIfNecessary();
100+
}
98101
try {
99102
if (isMetadataTableExists || metaClient.getStorage().exists(
100103
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) {

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri
137137
return this.metadataWriterMap.get(triggeringInstant);
138138
}
139139

140-
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(triggeringInstant, true);
140+
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(triggeringInstant, true, true);
141141
metadataWriterMap.put(triggeringInstant, metadataWriterOpt); // populate this for every new instant time.
142142
// if metadata table does not exist, the map will contain an entry, with value Option.empty.
143143
// if not, it will contain the metadata writer instance.

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,17 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
9696
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
9797
String triggeringInstantTimestamp,
9898
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
99-
boolean streamingWrites) {
99+
boolean streamingWrites,
100+
boolean autoDetectAndDeleteMetadataPartitions) {
100101
if (isMetadataTable()) {
101102
return Option.empty();
102103
}
103104
if (config.isMetadataTableEnabled()) {
104105
// if any partition is deleted, we need to reload the metadata table writer so that new table configs are picked up
105106
// to reflect the delete mdt partitions.
106-
deleteMetadataIndexIfNecessary();
107+
if (autoDetectAndDeleteMetadataPartitions) {
108+
deleteMetadataIndexIfNecessary();
109+
}
107110

108111
// Create the metadata table writer. First time after the upgrade this creation might trigger
109112
// metadata table bootstrapping. Bootstrapping process could fail and checking the table

hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,32 @@
1818

1919
package org.apache.hudi.table.upgrade;
2020

21+
import org.apache.hudi.client.SparkRDDWriteClient;
22+
import org.apache.hudi.client.WriteClientTestUtils;
23+
import org.apache.hudi.common.config.HoodieMetadataConfig;
2124
import org.apache.hudi.common.config.RecordMergeMode;
2225
import org.apache.hudi.common.model.HoodieIndexMetadata;
26+
import org.apache.hudi.common.model.HoodieRecord;
27+
import org.apache.hudi.common.model.HoodieTableType;
2328
import org.apache.hudi.common.table.HoodieTableConfig;
2429
import org.apache.hudi.common.table.HoodieTableMetaClient;
2530
import org.apache.hudi.common.table.HoodieTableVersion;
2631
import org.apache.hudi.common.table.timeline.HoodieInstant;
2732
import org.apache.hudi.common.table.timeline.HoodieTimeline;
2833
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
2934
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
35+
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
3036
import org.apache.hudi.common.testutils.HoodieTestUtils;
3137
import org.apache.hudi.common.util.Option;
3238
import org.apache.hudi.config.HoodieWriteConfig;
3339
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
3440
import org.apache.hudi.keygen.constant.KeyGeneratorType;
3541
import org.apache.hudi.metadata.HoodieTableMetadata;
42+
import org.apache.hudi.metadata.MetadataPartitionType;
3643
import org.apache.hudi.storage.StoragePath;
3744
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
3845

46+
import org.apache.spark.api.java.JavaRDD;
3947
import org.apache.spark.sql.Dataset;
4048
import org.apache.spark.sql.Row;
4149
import org.junit.jupiter.api.Disabled;
@@ -49,6 +57,7 @@
4957
import org.slf4j.LoggerFactory;
5058

5159
import java.io.IOException;
60+
import java.net.URI;
5261
import java.util.Arrays;
5362
import java.util.HashSet;
5463
import java.util.List;
@@ -57,6 +66,7 @@
5766
import java.util.stream.Collectors;
5867
import java.util.stream.Stream;
5968

69+
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
6070
import static org.apache.hudi.keygen.KeyGenUtils.getComplexKeygenErrorMessage;
6171
import static org.junit.jupiter.api.Assertions.assertEquals;
6272
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,31 +108,31 @@ public void testUpgradeOrDowngrade(HoodieTableVersion fromVersion, HoodieTableVe
98108
boolean isUpgrade = fromVersion.lesserThan(toVersion);
99109
String operation = isUpgrade ? "upgrade" : "downgrade";
100110
LOG.info("Testing {} from version {} to {}", operation, fromVersion, toVersion);
101-
111+
102112
HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion, suffix);
103113
assertEquals(fromVersion, originalMetaClient.getTableConfig().getTableVersion(),
104114
"Fixture table should be at expected version");
105-
115+
106116
HoodieWriteConfig config = createWriteConfig(originalMetaClient, true);
107-
117+
108118
int initialPendingCommits = originalMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
109119
int initialCompletedCommits = originalMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
110-
120+
111121
Dataset<Row> originalData = readTableData(originalMetaClient, "before " + operation);
112-
122+
113123
// Confirm that there are log files before rollback and compaction operations
114124
if (isRollbackAndCompactTransition(fromVersion, toVersion)) {
115125
validateLogFilesCount(originalMetaClient, operation, suffix.equals("-mor"));
116126
}
117-
127+
118128
new UpgradeDowngrade(originalMetaClient, config, context(), SparkUpgradeDowngradeHelper.getInstance())
119129
.run(toVersion, null);
120-
130+
121131
HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
122132
.setConf(storageConf().newInstance())
123133
.setBasePath(originalMetaClient.getBasePath())
124134
.build();
125-
135+
126136
assertTableVersionOnDataAndMetadataTable(resultMetaClient, toVersion);
127137
validateVersionSpecificProperties(resultMetaClient, toVersion);
128138
validateDataConsistency(originalData, resultMetaClient, "after " + operation);
@@ -376,6 +386,62 @@ public void testComplexKeygenValidationDuringUpgradeDowngrade(HoodieTableVersion
376386
}
377387
}
378388

389+
@ParameterizedTest
390+
@MethodSource("testMdtValidationDowngrade")
391+
public void testMdtPartitionNotDroppedWhenDowngradedFromTableVersionNine(HoodieTableType tableType, boolean mdtEnabled) throws Exception {
392+
HoodieTableVersion fromVersion = HoodieTableVersion.NINE;
393+
HoodieTableVersion toVersion = HoodieTableVersion.EIGHT;
394+
395+
Properties props = new Properties();
396+
props.put(HoodieTableConfig.TYPE.key(), tableType.name());
397+
HoodieTableMetaClient metaClient =
398+
getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), props);
399+
400+
HoodieWriteConfig writeConfig = getConfigBuilder(true)
401+
.withPath(metaClient.getBasePath())
402+
.withWriteTableVersion(fromVersion.versionCode())
403+
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
404+
.withEnableRecordIndex(true).build())
405+
.withProps(props)
406+
.build();
407+
408+
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(), writeConfig);
409+
String partitionPath = "2021/09/11";
410+
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath});
411+
412+
String instant1 = getCommitTimeAtUTC(1);
413+
List<HoodieRecord> records = dataGenerator.generateInserts(instant1, 100);
414+
JavaRDD<HoodieRecord> dataset = jsc().parallelize(records, 2);
415+
416+
WriteClientTestUtils.startCommitWithTime(writeClient, instant1);
417+
writeClient.commit(instant1, writeClient.insert(dataset, instant1));
418+
metaClient.reloadTableConfig();
419+
420+
// verify record index partition exists before downgrade
421+
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
422+
423+
HoodieWriteConfig.Builder upgradeWriteConfig = HoodieWriteConfig.newBuilder()
424+
.withPath(metaClient.getBasePath())
425+
.withProps(props);
426+
if (mdtEnabled) {
427+
upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(false).build());
428+
} else {
429+
upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
430+
}
431+
432+
new UpgradeDowngrade(metaClient, upgradeWriteConfig.build(), context(), SparkUpgradeDowngradeHelper.getInstance())
433+
.run(toVersion, null);
434+
435+
HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
436+
.setConf(storageConf().newInstance())
437+
.setBasePath(metaClient.getBasePath())
438+
.build();
439+
440+
resultMetaClient.reloadTableConfig();
441+
// verify record index partition exists after downgrade
442+
assertTrue(resultMetaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
443+
}
444+
379445
/**
380446
* Load a fixture table from resources and copy it to a temporary location for testing.
381447
*/
@@ -536,6 +602,15 @@ private static Stream<Arguments> testArgsUpgradeDowngrade() {
536602
);
537603
}
538604

605+
private static Stream<Arguments> testMdtValidationDowngrade() {
606+
return Stream.of(
607+
Arguments.of(HoodieTableType.COPY_ON_WRITE, true),
608+
Arguments.of(HoodieTableType.COPY_ON_WRITE, false),
609+
Arguments.of(HoodieTableType.MERGE_ON_READ, true),
610+
Arguments.of(HoodieTableType.MERGE_ON_READ, false)
611+
);
612+
}
613+
539614
private static Stream<Arguments> testArgsPayloadUpgradeDowngrade() {
540615
String[] payloadTypes = {
541616
"default", "overwrite", "partial", "postgres", "mysql",

0 commit comments

Comments
 (0)