Skip to content

Commit b52806c

Browse files
nsivabalanyihua
andcommitted
fix: Fixing streaming writes to metadata table for perf regression (#17477)
--------- Co-authored-by: Y Ethan Guo <[email protected]>
1 parent 77da434 commit b52806c

File tree

5 files changed

+15
-145
lines changed

5 files changed

+15
-145
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.hudi.client;
2020

21-
import org.apache.hudi.avro.model.HoodieClusteringPlan;
2221
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
2322
import org.apache.hudi.client.utils.SparkReleaseResources;
2423
import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -28,16 +27,11 @@
2827
import org.apache.hudi.common.model.HoodieRecord;
2928
import org.apache.hudi.common.model.HoodieWriteStat;
3029
import org.apache.hudi.common.model.WriteOperationType;
31-
import org.apache.hudi.common.util.ClusteringUtils;
3230
import org.apache.hudi.common.util.Option;
33-
import org.apache.hudi.common.util.StringUtils;
3431
import org.apache.hudi.common.util.ValidationUtils;
3532
import org.apache.hudi.common.util.VisibleForTesting;
36-
import org.apache.hudi.common.util.collection.Pair;
3733
import org.apache.hudi.config.HoodieWriteConfig;
3834
import org.apache.hudi.data.HoodieJavaRDD;
39-
import org.apache.hudi.exception.HoodieClusteringException;
40-
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
4135
import org.apache.hudi.metadata.HoodieTableMetadata;
4236
import org.apache.hudi.storage.StorageConfiguration;
4337
import org.apache.hudi.table.HoodieSparkTable;
@@ -50,8 +44,6 @@
5044
import java.util.List;
5145
import java.util.stream.Collectors;
5246

53-
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
54-
5547
public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
5648

5749
private final StreamingMetadataWriteHandler streamingMetadataWriteHandler;
@@ -102,30 +94,12 @@ protected HoodieWriteMetadata<HoodieData<WriteStatus>> partialUpdateTableMetadat
10294
String instantTime,
10395
WriteOperationType writeOperationType) {
10496
if (isStreamingWriteToMetadataEnabled(table)) {
105-
boolean enforceCoalesceWithRepartition = writeOperationType == WriteOperationType.CLUSTER; // for other table services, enforceCoalesceWithRepartition will be false.
106-
if (enforceCoalesceWithRepartition) {
107-
enforceCoalesceWithRepartition = computeEnforceCoalesceWithRepartitionForClustering(table, instantTime);
108-
}
10997
writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table, writeMetadata.getWriteStatuses(), instantTime,
110-
enforceCoalesceWithRepartition, config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
98+
config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
11199
}
112100
return writeMetadata;
113101
}
114102

115-
private boolean computeEnforceCoalesceWithRepartitionForClustering(HoodieTable table, String instantTime) {
116-
// check clustering plan for sort columns. only if there are no sort columns, then fallback to sort mode from write config.
117-
HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
118-
table.getMetaClient(), ClusteringUtils.getRequestedClusteringInstant(instantTime, table.getActiveTimeline(), table.getInstantGenerator()).get())
119-
.map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException(
120-
"Unable to read clustering plan for instant: " + instantTime));
121-
if (clusteringPlan.getStrategy().getStrategyParams().containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())
122-
&& !StringUtils.isNullOrEmpty(clusteringPlan.getStrategy().getStrategyParams().get(PLAN_STRATEGY_SORT_COLUMNS.key()))) {
123-
// sorting enabled.
124-
return false;
125-
}
126-
return config.getBulkInsertSortMode() == BulkInsertSortMode.NONE;
127-
}
128-
129103
@Override
130104
protected void writeToMetadataTable(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, List<HoodieWriteStat> partialMetadataWriteStats) {
131105
if (isStreamingWriteToMetadataEnabled(table)) {

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.hudi.client;
2020

2121
import org.apache.hudi.callback.common.WriteStatusValidator;
22-
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
2322
import org.apache.hudi.index.HoodieSparkIndexClient;
2423
import org.apache.hudi.client.common.HoodieSparkEngineContext;
2524
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -109,9 +108,8 @@ public boolean commit(String instantTime, JavaRDD<WriteStatus> rawWriteStatuses,
109108
final JavaRDD<WriteStatus> writeStatuses;
110109
if (WriteOperationType.streamingWritesToMetadataSupported((getOperationType())) && isStreamingWriteToMetadataEnabled(table)) {
111110
// this code block is expected to create a new Metadata Writer, start a new commit in metadata table and trigger streaming write to metadata table.
112-
boolean enforceCoalesceWithRepartition = getOperationType() == WriteOperationType.BULK_INSERT && config.getBulkInsertSortMode() == BulkInsertSortMode.NONE;
113111
writeStatuses = HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table, HoodieJavaRDD.of(rawWriteStatuses), instantTime,
114-
enforceCoalesceWithRepartition, config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
112+
config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
115113
} else {
116114
writeStatuses = rawWriteStatuses;
117115
}

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,25 +54,23 @@ public class StreamingMetadataWriteHandler {
5454
* @param table The {@link HoodieTable} instance for data table of interest.
5555
* @param dataTableWriteStatuses The {@link WriteStatus} from data table writes.
5656
* @param instantTime The instant time of interest.
57-
* @param enforceCoalesceWithRepartition true when repartition has to be added to dag to coalesce data table write statuses to 1. false otherwise.
5857
* @param coalesceDivisorForDataTableWrites assist with determining the coalesce parallelism for data table write statuses. N data table write status
5958
* spark partitions will be divied by this value to find the coalesce parallelism.
6059
* @return {@link HoodieData} of {@link WriteStatus} referring to both data table writes and partial metadata table writes.
6160
*/
6261
public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime,
63-
boolean enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) {
62+
int coalesceDivisorForDataTableWrites) {
6463
Option<HoodieTableMetadataWriter> metadataWriterOpt = getMetadataWriter(instantTime, table);
6564
ValidationUtils.checkState(metadataWriterOpt.isPresent(),
6665
"Cannot instantiate metadata writer for the table of interest " + table.getMetaClient().getBasePath());
67-
return streamWriteToMetadataTable(dataTableWriteStatuses, metadataWriterOpt.get(), table, instantTime, enforceCoalesceWithRepartition,
68-
coalesceDivisorForDataTableWrites);
66+
return streamWriteToMetadataTable(dataTableWriteStatuses, metadataWriterOpt.get(), table, instantTime, coalesceDivisorForDataTableWrites);
6967
}
7068

7169
/**
7270
* To be invoked by write client or table service client to complete the write to metadata table.
7371
*
7472
* <p>When streaming writes is enabled, writes to left over metadata partitions
75-
* which is not covered in {@link #streamWriteToMetadataTable(HoodieTable, HoodieData, String, Boolean, Integer)},
73+
* which is not covered in {@link #streamWriteToMetadataTable(HoodieTable, HoodieData, String, Integer)},
7674
* otherwise writes to metadata table in legacy way(batch update without partial updates).
7775
*
7876
* @param table The {@link HoodieTable} instance for data table of interest.
@@ -99,22 +97,19 @@ private HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieData<WriteStatu
9997
HoodieTableMetadataWriter metadataWriter,
10098
HoodieTable table,
10199
String instantTime,
102-
boolean enforceCoalesceWithRepartition,
103100
int coalesceDivisorForDataTableWrites) {
104101
HoodieData<WriteStatus> mdtWriteStatuses = metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses, instantTime);
105102
mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(), HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(), instantTime));
106103
HoodieData<WriteStatus> coalescedDataWriteStatuses;
107104
int coalesceParallelism = Math.max(1, dataTableWriteStatuses.getNumPartitions() / coalesceDivisorForDataTableWrites);
108-
if (enforceCoalesceWithRepartition) {
109-
// with bulk insert and NONE sort mode, simple coalesce on datatable write statuses also impact record key generation stages.
110-
// and hence we are adding a partitioner to cut the chain so that coalesce(1) here does not impact record key generation stages.
111-
coalescedDataWriteStatuses = HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
112-
.mapToPair((PairFunction<WriteStatus, String, WriteStatus>) writeStatus -> new Tuple2(writeStatus.getStat().getPath(), writeStatus))
113-
.partitionBy(new CoalescingPartitioner(coalesceParallelism))
114-
.map((Function<Tuple2<String, WriteStatus>, WriteStatus>) entry -> entry._2));
115-
} else {
116-
coalescedDataWriteStatuses = dataTableWriteStatuses.coalesce(coalesceParallelism);
117-
}
105+
// lets coalesce to lesser number of spark tasks so that, when unioned along with metadata table write status,
106+
// we only allocate very less number of tasks for data table write statuses.
107+
// In fact, data table writes should have triggered in previous stage before coalesce (partition by below forces the writes
108+
// to data table is triggered in previous stage and with the coalesced stage)
109+
coalescedDataWriteStatuses = HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
110+
.mapToPair((PairFunction<WriteStatus, String, WriteStatus>) writeStatus -> new Tuple2(writeStatus.getStat().getPath(), writeStatus))
111+
.partitionBy(new CoalescingPartitioner(coalesceParallelism))
112+
.map((Function<Tuple2<String, WriteStatus>, WriteStatus>) entry -> entry._2));
118113
return coalescedDataWriteStatuses.union(mdtWriteStatuses);
119114
}
120115

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java

Lines changed: 0 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,20 @@
1919

2020
package org.apache.hudi.client;
2121

22-
import org.apache.hudi.avro.model.HoodieClusteringPlan;
23-
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
2422
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
2523
import org.apache.hudi.common.config.HoodieMetadataConfig;
26-
import org.apache.hudi.common.data.HoodieData;
2724
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
2825
import org.apache.hudi.common.model.HoodieRecord;
2926
import org.apache.hudi.common.model.HoodieTableType;
30-
import org.apache.hudi.common.model.WriteOperationType;
3127
import org.apache.hudi.common.table.HoodieTableConfig;
3228
import org.apache.hudi.common.table.HoodieTableMetaClient;
33-
import org.apache.hudi.common.table.timeline.HoodieInstant;
3429
import org.apache.hudi.common.table.timeline.InstantComparison;
3530
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
3631
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
37-
import org.apache.hudi.common.util.ClusteringUtils;
3832
import org.apache.hudi.common.util.Option;
39-
import org.apache.hudi.common.util.collection.Pair;
4033
import org.apache.hudi.config.HoodieWriteConfig;
4134
import org.apache.hudi.data.HoodieJavaRDD;
4235
import org.apache.hudi.metadata.HoodieTableMetadata;
43-
import org.apache.hudi.table.HoodieTable;
44-
import org.apache.hudi.table.action.HoodieWriteMetadata;
4536
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
4637

4738
import org.apache.avro.generic.GenericRecord;
@@ -51,29 +42,20 @@
5142
import org.junit.jupiter.params.provider.Arguments;
5243
import org.junit.jupiter.params.provider.CsvSource;
5344
import org.junit.jupiter.params.provider.MethodSource;
54-
import org.mockito.MockedStatic;
5545

5646
import java.io.IOException;
5747
import java.net.URI;
58-
import java.util.Arrays;
5948
import java.util.Collections;
60-
import java.util.HashMap;
6149
import java.util.List;
62-
import java.util.Map;
6350
import java.util.Properties;
6451
import java.util.stream.Collectors;
6552
import java.util.stream.Stream;
6653

6754
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
68-
import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
6955
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
7056
import static org.junit.jupiter.api.Assertions.assertEquals;
7157
import static org.junit.jupiter.api.Assertions.assertFalse;
7258
import static org.junit.jupiter.api.Assertions.assertTrue;
73-
import static org.mockito.ArgumentMatchers.any;
74-
import static org.mockito.Mockito.mock;
75-
import static org.mockito.Mockito.mockStatic;
76-
import static org.mockito.Mockito.when;
7759

7860
class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness {
7961

@@ -223,67 +205,6 @@ public void testCompletionTimeGreaterThanRequestedTime() throws IOException {
223205
testAndAssertCompletionIsEarlierThanRequested(basePath, props);
224206
}
225207

226-
private static Stream<Arguments> streamingMetadataWritesTestArgs() {
227-
return Arrays.stream(new Object[][] {
228-
{"COMPACT", "NONE", false, false, false},
229-
{"COMPACT", "NONE", true, false, false},
230-
{"COMPACT", "GLOBAL_SORT", true, false, false},
231-
{"COMPACT", "GLOBAL_SORT", false, false, false},
232-
{"LOG_COMPACT", "NONE", true, false, false},
233-
{"LOG_COMPACT", "NONE", false, false, false},
234-
{"LOG_COMPACT", "GLOBAL_SORT", true, false, false},
235-
{"LOG_COMPACT", "GLOBAL_SORT", false, false, false},
236-
{"CLUSTER", "NONE", true, false, true},
237-
{"CLUSTER", "NONE", false, false, true},
238-
{"CLUSTER", "GLOBAL_SORT", true, false, false},
239-
{"CLUSTER", "GLOBAL_SORT", false, false, false},
240-
{"CLUSTER", "NONE", true, true, false},
241-
{"CLUSTER", "NONE", false, true, true},
242-
{"CLUSTER", "GLOBAL_SORT", true, true, false},
243-
{"CLUSTER", "GLOBAL_SORT", false, true, false},
244-
}).map(Arguments::of);
245-
}
246-
247-
@ParameterizedTest
248-
@MethodSource("streamingMetadataWritesTestArgs")
249-
public void testStreamingMetadataWrites(WriteOperationType writeOperationType,
250-
String bulkInsertSortMode, boolean setSortColsinClusteringPlan,
251-
boolean setNonEmptyValueForSortcols,
252-
boolean expectedEnforceRepartitionWithCoalesce) throws IOException {
253-
HoodieTableMetaClient metaClient =
254-
getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), new Properties());
255-
HoodieWriteConfig writeConfig = getConfigBuilder(true)
256-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withStreamingWriteEnabled(true).build())
257-
.withBulkInsertSortMode(bulkInsertSortMode)
258-
.withPath(metaClient.getBasePath())
259-
.build();
260-
MockStreamingMetadataWriteHandler mockMetadataWriteHandler = new MockStreamingMetadataWriteHandler();
261-
262-
try (MockedStatic<ClusteringUtils> mocked = mockStatic(ClusteringUtils.class);) {
263-
HoodieClusteringPlan clusteringPlan = mock(HoodieClusteringPlan.class);
264-
HoodieClusteringStrategy clusteringStrategy = mock(HoodieClusteringStrategy.class);
265-
when(clusteringPlan.getStrategy()).thenReturn(clusteringStrategy);
266-
Map<String, String> strategyParams = new HashMap<>();
267-
if (setSortColsinClusteringPlan) {
268-
strategyParams.put(PLAN_STRATEGY_SORT_COLUMNS.key(), setNonEmptyValueForSortcols ? "abc" : "");
269-
}
270-
when(clusteringStrategy.getStrategyParams()).thenReturn(strategyParams);
271-
272-
HoodieInstant hoodieInstant = mock(HoodieInstant.class);
273-
mocked.when(() -> ClusteringUtils.getClusteringPlan(any(), any())).thenReturn(Option.of(Pair.of(hoodieInstant, clusteringPlan)));
274-
mocked.when(() -> ClusteringUtils.getRequestedClusteringInstant(any(), any(), any())).thenReturn(Option.of(hoodieInstant));
275-
276-
SparkRDDTableServiceClient tableServiceClient = new SparkRDDTableServiceClient(context(), writeConfig, Option.empty(), mockMetadataWriteHandler);
277-
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = mock(HoodieWriteMetadata.class);
278-
HoodieData<WriteStatus> hoodieData = mock(HoodieData.class);
279-
when(writeMetadata.getWriteStatuses()).thenReturn(hoodieData);
280-
HoodieTable table = mock(HoodieTable.class);
281-
when(table.getMetaClient()).thenReturn(metaClient);
282-
tableServiceClient.partialUpdateTableMetadata(table, writeMetadata, "00001", writeOperationType);
283-
assertEquals(expectedEnforceRepartitionWithCoalesce, mockMetadataWriteHandler.enforceCoalesceWithRepartition);
284-
}
285-
}
286-
287208
private void testAndAssertCompletionIsEarlierThanRequested(String basePath, Properties properties) throws IOException {
288209
HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), basePath, properties);
289210

@@ -303,19 +224,4 @@ private void testAndAssertCompletionIsEarlierThanRequested(String basePath, Prop
303224
assertTrue(InstantComparison.compareTimestamps(hoodieInstant.requestedTime(), InstantComparison.LESSER_THAN, hoodieInstant.getCompletionTime()));
304225
});
305226
}
306-
307-
class MockStreamingMetadataWriteHandler extends StreamingMetadataWriteHandler {
308-
309-
boolean enforceCoalesceWithRepartition;
310-
int coalesceDivisorForDataTableWrites;
311-
312-
@Override
313-
public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime,
314-
boolean enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) {
315-
this.enforceCoalesceWithRepartition = enforceCoalesceWithRepartition;
316-
this.coalesceDivisorForDataTableWrites = coalesceDivisorForDataTableWrites;
317-
return dataTableWriteStatuses;
318-
}
319-
}
320-
321227
}

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ void setUp() {
6161
private static Stream<Arguments> coalesceDivisorTestArgs() {
6262
return Arrays.stream(new Object[][] {
6363
{100, 20, 1000, true},
64-
{100, 20, 1000, false},
6564
{1, 1, 1000, true},
66-
{1, 1, 1000, false},
6765
{10000, 100, 5000, true},
6866
{10000, 100, 5000, true},
6967
{10000, 100, 20000, true},
@@ -73,15 +71,14 @@ private static Stream<Arguments> coalesceDivisorTestArgs() {
7371

7472
@ParameterizedTest
7573
@MethodSource("coalesceDivisorTestArgs")
76-
public void testCoalesceDividentConfig(int numDataTableWriteStatuses, int numMdtWriteStatus, int coalesceDividentForDataTableWrites,
77-
boolean enforceCoalesceWithRepartition) {
74+
public void testCoalesceDividentConfig(int numDataTableWriteStatuses, int numMdtWriteStatus, int coalesceDividentForDataTableWrites) {
7875
HoodieData<WriteStatus> dataTableWriteStatus = mockWriteStatuses(numDataTableWriteStatuses);
7976
HoodieData<WriteStatus> mdtWriteStatus = mockWriteStatuses(numMdtWriteStatus);
8077
HoodieTableMetadataWriter mdtWriter = mock(HoodieTableMetadataWriter.class);
8178
when(mdtWriter.streamWriteToMetadataPartitions(any(), any())).thenReturn(mdtWriteStatus);
8279
StreamingMetadataWriteHandler metadataWriteHandler = new MockStreamingMetadataWriteHandler(mdtWriter);
8380

84-
HoodieData<WriteStatus> allWriteStatuses = metadataWriteHandler.streamWriteToMetadataTable(mockHoodieTable, dataTableWriteStatus, "00001", enforceCoalesceWithRepartition,
81+
HoodieData<WriteStatus> allWriteStatuses = metadataWriteHandler.streamWriteToMetadataTable(mockHoodieTable, dataTableWriteStatus, "00001",
8582
coalesceDividentForDataTableWrites);
8683
assertEquals(Math.max(1, numDataTableWriteStatuses / coalesceDividentForDataTableWrites) + numMdtWriteStatus, allWriteStatuses.getNumPartitions());
8784
}

0 commit comments

Comments
 (0)