Skip to content

Commit eeb0799

Browse files
linliu-codeyihua
authored andcommitted
fix: Skip payload class validation when merge mode is not custom with v9 tables (#14116)
1 parent ca9dea5 commit eeb0799

File tree

3 files changed

+76
-8
lines changed

3 files changed

+76
-8
lines changed

hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceOptionsHelper.allAlternatives
2222
import org.apache.hudi.DataSourceWriteOptions._
2323
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig, TypedProperties}
2424
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
25+
import org.apache.hudi.common.config.RecordMergeMode.CUSTOM
2526
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord, OverwriteWithLatestAvroPayload, WriteOperationType}
2627
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableVersion}
2728
import org.apache.hudi.common.util.StringUtils
@@ -173,15 +174,15 @@ object HoodieWriterUtils {
173174
|| key.equals(RECORD_MERGE_MODE.key())
174175
|| key.equals(RECORD_MERGE_STRATEGY_ID.key())))
175176

176-
ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && shouldIgnorePayloadValidation(value, params, tableConfig))
177+
ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key()) && shouldIgnorePayloadValidation(value, tableConfig))
177178
// If hoodie.database.name is empty, ignore validation.
178179
ignoreConfig = ignoreConfig || (key.equals(HoodieTableConfig.DATABASE_NAME.key()) && isNullOrEmpty(getStringFromTableConfigWithAlternatives(tableConfig, key)))
179180
ignoreConfig
180181
}
181182

182-
def shouldIgnorePayloadValidation(value: String, params: Map[String, String], tableConfig: HoodieConfig): Boolean = {
183+
def shouldIgnorePayloadValidation(incomingPayloadClass: String, tableConfig: HoodieConfig): Boolean = {
183184
//don't validate the payload only in the case that insert into is using fallback to some legacy configs
184-
val ignoreConfig = value.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
185+
val ignoreConfig = incomingPayloadClass.equals(VALIDATE_DUPLICATE_KEY_PAYLOAD_CLASS_NAME)
185186
if (ignoreConfig) {
186187
ignoreConfig
187188
} else {
@@ -201,10 +202,18 @@ object HoodieWriterUtils {
201202
HoodieTableVersion.current()
202203
}
203204

205+
val recordMergeMode = tableConfig.getStringOrDefault(HoodieTableConfig.RECORD_MERGE_MODE.key(), "")
204206
if (tableVersion == HoodieTableVersion.EIGHT && initTableVersion.lesserThan(HoodieTableVersion.EIGHT)
205-
&& value.equals(classOf[OverwriteWithLatestAvroPayload].getName)
207+
&& incomingPayloadClass.equals(classOf[OverwriteWithLatestAvroPayload].getName)
206208
&& tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()).equals(classOf[DefaultHoodieRecordPayload].getName)) {
207209
true
210+
} else if (tableVersion.greaterThanOrEquals(HoodieTableVersion.NINE) && !recordMergeMode.equals(CUSTOM.name)) {
211+
// When table version >= v9, if the merge mode is not CUSTOM, we can safely skip payload class check
212+
// since the payload class is ignored during these writes. Meanwhile, we should give a warning about this behavior.
213+
if (!StringUtils.isNullOrEmpty(incomingPayloadClass)) {
214+
log.warn(s"Payload class '$incomingPayloadClass' is ignored since merge behavior is determined by merge mode: $recordMergeMode")
215+
}
216+
true
208217
} else {
209218
ignoreConfig
210219
}

hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieWriterUtils.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hudi;
1919

2020
import org.apache.hudi.common.config.HoodieConfig;
21+
import org.apache.hudi.common.config.RecordMergeMode;
2122
import org.apache.hudi.common.config.TypedProperties;
2223
import org.apache.hudi.common.model.HoodieTableType;
2324
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -35,6 +36,8 @@
3536

3637
import static org.apache.hudi.common.testutils.HoodieTestUtils.getMetaClientBuilder;
3738
import static org.junit.jupiter.api.Assertions.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.assertFalse;
40+
import static org.junit.jupiter.api.Assertions.assertTrue;
3841

3942
class TestHoodieWriterUtils extends HoodieClientTestBase {
4043

@@ -116,4 +119,44 @@ void testFallbackToOriginalKey() {
116119
String result = HoodieWriterUtils.getKeyInTableConfig("my.custom.key", config);
117120
assertEquals("my.custom.key", result);
118121
}
119-
}
122+
123+
@Test
124+
void testShouldIgnorePayloadValidationVersion9WithCustomMergeMode() {
125+
HoodieConfig config = new HoodieConfig();
126+
config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
127+
config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.CUSTOM.name());
128+
129+
String payloadClass = "com.example.CustomPayload";
130+
assertFalse(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config));
131+
}
132+
133+
@Test
134+
void testShouldIgnorePayloadValidationVersion9WithEmptyPayload() {
135+
HoodieConfig config = new HoodieConfig();
136+
config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
137+
config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.COMMIT_TIME_ORDERING.name());
138+
139+
String payloadClass = "";
140+
assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config));
141+
}
142+
143+
@Test
144+
void testShouldIgnorePayloadValidationVersion9WithCommitTimeOrdering() {
145+
HoodieConfig config = new HoodieConfig();
146+
config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
147+
config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.COMMIT_TIME_ORDERING.name());
148+
149+
String payloadClass = "com.example.CustomPayload";
150+
assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config));
151+
}
152+
153+
@Test
154+
void testShouldIgnorePayloadValidationVersion9WithEventTimeOrdering() {
155+
HoodieConfig config = new HoodieConfig();
156+
config.setValue(HoodieTableConfig.VERSION, String.valueOf(HoodieTableVersion.NINE.versionCode()));
157+
config.setValue(HoodieTableConfig.RECORD_MERGE_MODE, RecordMergeMode.EVENT_TIME_ORDERING.name());
158+
159+
String payloadClass = "com.example.CustomPayload";
160+
assertTrue(HoodieWriterUtils.shouldIgnorePayloadValidation(payloadClass, config));
161+
}
162+
}

hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import org.apache.spark.sql.functions.lit
4242
import org.apache.spark.sql.types.StructType
4343
import org.junit.jupiter.api.Assertions
4444
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertTrue}
45+
import org.junit.jupiter.api.function.Executable
4546
import org.junit.jupiter.params.ParameterizedTest
4647
import org.junit.jupiter.params.provider.CsvSource
4748

@@ -475,14 +476,29 @@ class TestSparkDataSource extends SparkClientFunctionalTestHarness {
475476
.mode(SaveMode.Append)
476477
.save(basePath)
477478
})
478-
Assertions.assertThrows(classOf[HoodieException], () => {
479-
df1.write.format("hudi")
479+
if (mergeMode != RecordMergeMode.CUSTOM) {
480+
Assertions.assertDoesNotThrow(
481+
new Executable {
482+
override def execute(): Unit = {
483+
df1.write.format("hudi")
484+
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, classOf[EventTimeAvroPayload].getName)
485+
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
486+
.option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
487+
.mode(SaveMode.Append)
488+
.save(basePath)
489+
}
490+
}
491+
)
492+
} else {
493+
Assertions.assertThrows(classOf[HoodieException], () => {
494+
df1.write.format("hudi")
480495
.option(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key, classOf[EventTimeAvroPayload].getName)
481496
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
482497
.option(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key, "false")
483498
.mode(SaveMode.Append)
484499
.save(basePath)
485-
})
500+
})
501+
}
486502
Assertions.assertThrows(classOf[HoodieException], () => {
487503
df1.write.format("hudi")
488504
.option(HoodieWriteConfig.RECORD_MERGE_STRATEGY_ID.key, HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID)

0 commit comments

Comments
 (0)