Skip to content

Commit 72f4452

Browse files
lokeshj1703Lokesh JainLokesh Jainnsivabalan
authored andcommitted
fix: Fix upgrade handling for MySqlDebeziumAvroPayload with deltastreamer (#14159)
--------- Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: sivabalan <[email protected]>
1 parent dfd26b4 commit 72f4452

File tree

10 files changed

+140
-106
lines changed

10 files changed

+140
-106
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848

4949
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
5050
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
51-
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
52-
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
5351
import static org.apache.hudi.common.model.HoodieRecordMerger.COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
5452
import static org.apache.hudi.common.model.HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID;
5553
import static org.apache.hudi.common.model.HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
@@ -127,22 +125,22 @@ && isComplexKeyGeneratorWithSingleRecordKeyField(tableConfig)) {
127125
metaClient.getTableConfig().getTableVersion());
128126
}
129127
// Handle merge mode config.
130-
reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
128+
reconcileMergeModeConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, config);
131129
// Handle partial update mode config.
132-
reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig);
130+
reconcilePartialUpdateModeConfig(tablePropsToAdd, tableConfig, config);
133131
// Handle merge properties config.
134132
reconcileMergePropertiesConfig(tablePropsToAdd, tableConfig, config);
135133
// Handle payload class configs.
136-
reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
134+
reconcilePayloadClassConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, config);
137135
// Handle ordering fields config.
138-
reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, tableConfig);
136+
reconcileOrderingFieldsConfig(tablePropsToAdd, tablePropsToRemove, tableConfig, config);
139137
return new UpgradeDowngrade.TableConfigChangeSet(tablePropsToAdd, tablePropsToRemove);
140138
}
141139

142-
private void reconcileMergeModeConfig(Map<ConfigProperty, String> tablePropsToAdd,
143-
Set<ConfigProperty> tablePropsToRemove,
144-
HoodieTableConfig tableConfig) {
145-
String payloadClass = tableConfig.getPayloadClass();
140+
private void reconcileMergeModeConfig(Map<ConfigProperty, String> tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
141+
HoodieTableConfig tableConfig, HoodieWriteConfig config) {
142+
String payloadClass = tableConfig.getPayloadClassIfPresent()
143+
.orElse(config.getPayloadClass());
146144
RecordMergeMode mergeMode = tableConfig.getRecordMergeMode();
147145
if (mergeMode != RecordMergeMode.CUSTOM) {
148146
// For commit time or event time based table, remove merge strategy id.
@@ -162,10 +160,10 @@ private void reconcileMergeModeConfig(Map<ConfigProperty, String> tablePropsToAd
162160
// else: No op, which means merge strategy id and merge mode are not changed.
163161
}
164162

165-
private void reconcilePayloadClassConfig(Map<ConfigProperty, String> tablePropsToAdd,
166-
Set<ConfigProperty> tablePropsToRemove,
167-
HoodieTableConfig tableConfig) {
168-
String payloadClass = tableConfig.getPayloadClass();
163+
private void reconcilePayloadClassConfig(Map<ConfigProperty, String> tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
164+
HoodieTableConfig tableConfig, HoodieWriteConfig config) {
165+
String payloadClass = tableConfig.getPayloadClassIfPresent()
166+
.orElse(config.getPayloadClass());
169167
if (StringUtils.isNullOrEmpty(payloadClass)) {
170168
return;
171169
}
@@ -176,8 +174,9 @@ private void reconcilePayloadClassConfig(Map<ConfigProperty, String> tablePropsT
176174
}
177175

178176
private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> tablePropsToAdd,
179-
HoodieTableConfig tableConfig) {
180-
String payloadClass = tableConfig.getPayloadClass();
177+
HoodieTableConfig tableConfig, HoodieWriteConfig config) {
178+
String payloadClass = tableConfig.getPayloadClassIfPresent()
179+
.orElse(config.getPayloadClass());
181180
if (StringUtils.isNullOrEmpty(payloadClass)) {
182181
return;
183182
}
@@ -190,7 +189,8 @@ private void reconcilePartialUpdateModeConfig(Map<ConfigProperty, String> tableP
190189
}
191190

192191
private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePropsToAdd, HoodieTableConfig tableConfig, HoodieWriteConfig writeConfig) {
193-
String payloadClass = tableConfig.getPayloadClass();
192+
String payloadClass = tableConfig.getPayloadClassIfPresent()
193+
.orElse(writeConfig.getPayloadClass());
194194
if (StringUtils.isNullOrEmpty(payloadClass)) {
195195
return;
196196
}
@@ -224,13 +224,13 @@ private void reconcileMergePropertiesConfig(Map<ConfigProperty, String> tablePro
224224
}
225225
}
226226

227-
private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> tablePropsToAdd,
228-
Set<ConfigProperty> tablePropsToRemove,
229-
HoodieTableConfig tableConfig) {
230-
String payloadClass = tableConfig.getPayloadClass();
227+
private void reconcileOrderingFieldsConfig(Map<ConfigProperty, String> tablePropsToAdd, Set<ConfigProperty> tablePropsToRemove,
228+
HoodieTableConfig tableConfig, HoodieWriteConfig config) {
229+
String payloadClass = tableConfig.getPayloadClassIfPresent()
230+
.orElse(config.getPayloadClass());
231231
Option<String> orderingFieldsOpt;
232232
if (MySqlDebeziumAvroPayload.class.getName().equals(payloadClass)) {
233-
orderingFieldsOpt = Option.of(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME);
233+
orderingFieldsOpt = Option.of(MySqlDebeziumAvroPayload.ORDERING_FIELDS);
234234
} else if (PostgresDebeziumAvroPayload.class.getName().equals(payloadClass)) {
235235
orderingFieldsOpt = Option.of(DebeziumConstants.FLATTENED_LSN_COL_NAME);
236236
} else {

hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java

Lines changed: 43 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@
5656
import java.io.ByteArrayOutputStream;
5757
import java.io.IOException;
5858
import java.nio.file.Path;
59+
import java.util.ArrayList;
5960
import java.util.Arrays;
6061
import java.util.Collections;
6162
import java.util.HashMap;
63+
import java.util.List;
6264
import java.util.Map;
6365
import java.util.Set;
6466
import java.util.stream.Stream;
@@ -117,6 +119,7 @@ public void setUp() throws IOException {
117119
when(table.getMetaClient()).thenReturn(metaClient);
118120
when(metaClient.getTableConfig()).thenReturn(tableConfig);
119121
when(config.autoUpgrade()).thenReturn(true);
122+
when(config.getPayloadClass()).thenReturn(null);
120123

121124
// Setup common mocks
122125
when(upgradeDowngradeHelper.getTable(config, context)).thenReturn(table);
@@ -125,6 +128,7 @@ public void setUp() throws IOException {
125128
when(metaClient.getStorage()).thenReturn(storage);
126129
when(tableConfig.getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
127130
when(tableConfig.getOrderingFieldsStr()).thenReturn(Option.empty());
131+
when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.empty());
128132

129133
// Use a temp file for index definition path
130134
indexDefPath = new StoragePath(tempDir.resolve("index.json").toString());
@@ -143,81 +147,55 @@ public void setUp() throws IOException {
143147
}
144148

145149
static Stream<Arguments> payloadClassTestCases() {
146-
return Stream.of(
147-
Arguments.of(
148-
DefaultHoodieRecordPayload.class.getName(),
149-
"",
150-
null,
151-
null,
152-
"DefaultHoodieRecordPayload"
153-
),
154-
Arguments.of(
155-
EventTimeAvroPayload.class.getName(),
156-
"",
157-
EVENT_TIME_ORDERING.name(),
158-
null,
159-
"EventTimeAvroPayload"
160-
),
161-
Arguments.of(
162-
OverwriteWithLatestAvroPayload.class.getName(),
163-
"",
164-
null,
165-
null,
166-
"OverwriteWithLatestAvroPayload"
167-
),
168-
Arguments.of(
169-
AWSDmsAvroPayload.class.getName(),
170-
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
171-
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // mergeProperties
172-
COMMIT_TIME_ORDERING.name(),
173-
null,
174-
"AWSDmsAvroPayload"
175-
),
176-
Arguments.of(
177-
PostgresDebeziumAvroPayload.class.getName(),
178-
RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + "=" + DEBEZIUM_UNAVAILABLE_VALUE + ","
179-
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type,"
180-
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
181-
EVENT_TIME_ORDERING.name(),
182-
FILL_UNAVAILABLE.name(),
183-
"PostgresDebeziumAvroPayload"
184-
),
185-
Arguments.of(
186-
PartialUpdateAvroPayload.class.getName(),
187-
"",
188-
EVENT_TIME_ORDERING.name(),
189-
PartialUpdateMode.IGNORE_DEFAULTS.name(),
190-
"PartialUpdateAvroPayload"
191-
),
192-
Arguments.of(
193-
MySqlDebeziumAvroPayload.class.getName(),
194-
RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type,"
195-
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
196-
EVENT_TIME_ORDERING.name(),
197-
null,
198-
"MySqlDebeziumAvroPayload"
199-
),
200-
Arguments.of(
201-
OverwriteNonDefaultsWithLatestAvroPayload.class.getName(),
202-
"",
203-
COMMIT_TIME_ORDERING.name(),
204-
PartialUpdateMode.IGNORE_DEFAULTS.name(),
205-
"OverwriteNonDefaultsWithLatestAvroPayload"
206-
)
207-
);
150+
List<Arguments> arguments = new ArrayList<>();
151+
arguments.addAll(getArguments(DefaultHoodieRecordPayload.class.getName(), "",
152+
null, null, "DefaultHoodieRecordPayload"));
153+
arguments.addAll(getArguments(EventTimeAvroPayload.class.getName(), "",
154+
EVENT_TIME_ORDERING.name(), null, "EventTimeAvroPayload"));
155+
arguments.addAll(getArguments(OverwriteWithLatestAvroPayload.class.getName(), "",
156+
null, null, "OverwriteWithLatestAvroPayload"));
157+
arguments.addAll(getArguments(AWSDmsAvroPayload.class.getName(), RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=Op,"
158+
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=D", // mergeProperties
159+
COMMIT_TIME_ORDERING.name(), null, "AWSDmsAvroPayload"));
160+
arguments.addAll(getArguments(PostgresDebeziumAvroPayload.class.getName(), RECORD_MERGE_PROPERTY_PREFIX + PARTIAL_UPDATE_UNAVAILABLE_VALUE + "=" + DEBEZIUM_UNAVAILABLE_VALUE + ","
161+
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type,"
162+
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
163+
EVENT_TIME_ORDERING.name(), FILL_UNAVAILABLE.name(), "PostgresDebeziumAvroPayload"));
164+
arguments.addAll(getArguments(PartialUpdateAvroPayload.class.getName(), "",
165+
EVENT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), "PartialUpdateAvroPayload"));
166+
arguments.addAll(getArguments(MySqlDebeziumAvroPayload.class.getName(), RECORD_MERGE_PROPERTY_PREFIX + DELETE_KEY + "=_change_operation_type,"
167+
+ RECORD_MERGE_PROPERTY_PREFIX + DELETE_MARKER + "=d",
168+
EVENT_TIME_ORDERING.name(), null, "MySqlDebeziumAvroPayload"));
169+
arguments.addAll(getArguments(OverwriteNonDefaultsWithLatestAvroPayload.class.getName(), "",
170+
COMMIT_TIME_ORDERING.name(), PartialUpdateMode.IGNORE_DEFAULTS.name(), "OverwriteNonDefaultsWithLatestAvroPayload"));
171+
return arguments.stream();
172+
}
173+
174+
private static List<Arguments> getArguments(String payloadClassName, String expectedMergeProperties,
175+
String expectedRecordMergeMode, String expectedPartialUpdateMode,
176+
String testName) {
177+
return Arrays.asList(
178+
Arguments.of(payloadClassName, expectedMergeProperties,
179+
expectedRecordMergeMode, expectedPartialUpdateMode, testName, true),
180+
Arguments.of(payloadClassName, expectedMergeProperties,
181+
expectedRecordMergeMode, expectedPartialUpdateMode, testName, false));
208182
}
209183

210184
@ParameterizedTest(name = "testUpgradeWith{4}")
211185
@MethodSource("payloadClassTestCases")
212186
void testUpgradeWithPayloadClass(String payloadClassName, String expectedMergeProperties,
213187
String expectedRecordMergeMode, String expectedPartialUpdateMode,
214-
String testName) {
188+
String testName, boolean isPayloadClassConfiguredInTableConfig) {
215189
try (org.mockito.MockedStatic<UpgradeDowngradeUtils> utilities =
216190
org.mockito.Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
217191
utilities.when(() -> UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
218192
any(), any(), any(), any(), anyBoolean(), any()))
219193
.thenAnswer(invocation -> null);
220-
when(tableConfig.getPayloadClass()).thenReturn(payloadClassName);
194+
if (isPayloadClassConfiguredInTableConfig) {
195+
when(tableConfig.getPayloadClassIfPresent()).thenReturn(Option.ofNullable(payloadClassName));
196+
} else {
197+
when(config.getPayloadClass()).thenReturn(payloadClassName);
198+
}
221199
when(tableConfig.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
222200
when(tableConfig.getRecordMergeStrategyId()).thenReturn(HoodieRecordMerger.CUSTOM_MERGE_STRATEGY_UUID);
223201
when(metaClient.getIndexMetadata()).thenReturn(Option.empty());
@@ -301,6 +279,7 @@ private void assertPayloadClassChange(Map<ConfigProperty, String> propertiesToAd
301279
if (payloadClass.equals(MySqlDebeziumAvroPayload.class.getName())) {
302280
assertTrue(propertiesToAdd.containsKey(HoodieTableConfig.ORDERING_FIELDS));
303281
assertEquals(FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME, propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS));
282+
assertTrue(propertiesToRemove.contains(HoodieTableConfig.PRECOMBINE_FIELD));
304283
} else if (payloadClass.equals(PostgresDebeziumAvroPayload.class.getName())) {
305284
assertEquals(FLATTENED_LSN_COL_NAME, propertiesToAdd.get(HoodieTableConfig.ORDERING_FIELDS));
306285
}

hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordPayload.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -184,19 +184,7 @@ static String getPayloadClassName(HoodieConfig config) {
184184
}
185185

186186
static String getPayloadClassName(Properties props) {
187-
Option<String> payloadOpt = getPayloadClassNameIfPresent(props);
188-
if (payloadOpt.isPresent()) {
189-
return payloadOpt.get();
190-
}
191-
// Note: starting from version 9, payload class is not necessary set, but
192-
// merge mode must exist. Therefore, we use merge mode to infer
193-
// the payload class for certain corner cases, like for MIT command.
194-
if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
195-
&& ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, StringUtils.EMPTY_STRING)
196-
.equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
197-
return OverwriteWithLatestAvroPayload.class.getName();
198-
}
199-
return HoodieTableConfig.getDefaultPayloadClassName();
187+
return getPayloadClassNameIfPresent(props).orElse(HoodieTableConfig.getDefaultPayloadClassName());
200188
}
201189

202190
// NOTE: PAYLOAD_CLASS_NAME is before LEGACY_PAYLOAD_CLASS_NAME to make sure
@@ -207,6 +195,13 @@ static Option<String> getPayloadClassNameIfPresent(Properties props) {
207195
payloadClassName = ConfigUtils.getStringWithAltKeys(props, PAYLOAD_CLASS_NAME);
208196
} else if (props.containsKey("hoodie.datasource.write.payload.class")) {
209197
payloadClassName = props.getProperty("hoodie.datasource.write.payload.class");
198+
} else if (ConfigUtils.containsConfigProperty(props, RECORD_MERGE_MODE)
199+
&& ConfigUtils.getStringWithAltKeys(props, RECORD_MERGE_MODE, StringUtils.EMPTY_STRING)
200+
.equals(RecordMergeMode.COMMIT_TIME_ORDERING.name())) {
201+
// Note: starting from version 9, payload class is not necessary set, but
202+
// merge mode must exist. Therefore, we use merge mode to infer
203+
// the payload class for certain corner cases, like for MIT command.
204+
payloadClassName = OverwriteWithLatestAvroPayload.class.getName();
210205
}
211206

212207
// There could be tables written with payload class from com.uber.hoodie.

hudi-common/src/main/java/org/apache/hudi/common/model/debezium/MySqlDebeziumAvroPayload.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import java.io.IOException;
3333
import java.util.Objects;
3434

35+
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_FILE_COL_NAME;
36+
import static org.apache.hudi.common.model.debezium.DebeziumConstants.FLATTENED_POS_COL_NAME;
37+
3538
/**
3639
* Provides support for seamlessly applying changes captured via Debezium for MysqlDB.
3740
* <p>
@@ -48,6 +51,8 @@ public class MySqlDebeziumAvroPayload extends AbstractDebeziumAvroPayload {
4851

4952
private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumAvroPayload.class);
5053

54+
public static final String ORDERING_FIELDS = FLATTENED_FILE_COL_NAME + "," + FLATTENED_POS_COL_NAME;
55+
5156
public MySqlDebeziumAvroPayload(GenericRecord record, Comparable orderingVal) {
5257
super(record, orderingVal);
5358
}

hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -801,6 +801,13 @@ public String getPayloadClass() {
801801
return HoodieRecordPayload.getPayloadClassName(this);
802802
}
803803

804+
/**
805+
* Read the payload class if present for HoodieRecords from the table properties.
806+
*/
807+
public Option<String> getPayloadClassIfPresent() {
808+
return HoodieRecordPayload.getPayloadClassNameIfPresent(this.getProps());
809+
}
810+
804811
public String getLegacyPayloadClass() {
805812
return getStringOrDefault(LEGACY_PAYLOAD_CLASS_NAME, EMPTY_STRING);
806813
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ public HoodieStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, Configur
161161
cfg.recordMergeMode, cfg.payloadClassName, cfg.recordMergeStrategyId, cfg.sourceOrderingFields,
162162
HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(this.properties, HoodieWriteConfig.WRITE_TABLE_VERSION)));
163163
cfg.recordMergeMode = mergingConfigs.getLeft();
164-
cfg.payloadClassName = mergingConfigs.getMiddle();
165164
cfg.recordMergeStrategyId = mergingConfigs.getRight();
166165
if (cfg.initialCheckpointProvider != null && cfg.checkpoint == null) {
167166
InitialCheckPointProvider checkPointProvider =

0 commit comments

Comments
 (0)