Skip to content

Commit 7afbdd6

Browse files
committed
refactor: Add Lombok Builders
- Add Lombok Builder to HoodieFileGroupReader - Add Lombok Builder to InputSplit - Add Lombok Builder to ReaderParameters
1 parent c177e2b commit 7afbdd6

File tree

33 files changed

+290
-376
lines changed

33 files changed

+290
-376
lines changed

hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,12 @@ storage, new StoragePath(logFilePathPattern)).stream()
243243
Option.empty(),
244244
Option.empty(),
245245
fileGroupReaderProperties);
246-
try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>newBuilder()
246+
try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>builder()
247247
.withReaderContext(readerContext)
248248
.withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
249-
.withFileSlice(fileSlice)
249+
.withBaseFileOption(fileSlice.getBaseFile())
250+
.withLogFiles(fileSlice.getLogFiles())
251+
.withPartitionPath(fileSlice.getPartitionPath())
250252
.withDataSchema(readerSchema)
251253
.withRequestedSchema(readerSchema)
252254
.withLatestCommitTime(client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(HoodieInstantTimeGenerator.getCurrentInstantTimeStr()))

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,16 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
353353
Option<InternalSchema> internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
354354
FileSlice fileSlice = fileSliceOption.get();
355355
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
356-
HoodieFileGroupReader<R> fileGroupReader = HoodieFileGroupReader.<R>newBuilder()
356+
HoodieFileGroupReader<R> fileGroupReader = HoodieFileGroupReader.<R>builder()
357357
.withReaderContext(readerContext)
358358
.withHoodieTableMetaClient(metaClient)
359359
.withLatestCommitTime(instantTime.get())
360-
.withFileSlice(fileSlice)
360+
.withBaseFileOption(fileSlice.getBaseFile())
361+
.withLogFiles(fileSlice.getLogFiles())
362+
.withPartitionPath(fileSlice.getPartitionPath())
361363
.withDataSchema(dataSchema)
362364
.withRequestedSchema(dataSchema)
363-
.withInternalSchema(internalSchemaOption)
365+
.withInternalSchemaOpt(internalSchemaOption)
364366
.withProps(metaClient.getTableConfig().getProps())
365367
.withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan())
366368
.build();

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedAppendHandle.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,20 @@ public void doAppend() {
8383
new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
8484
config.getBasePath(), operation.getPartitionPath()), logFileName)));
8585
// Initializes the record iterator, log compaction requires writing the deletes into the delete block of the resulting log file.
86-
try (HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
87-
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields)
88-
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
89-
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
86+
try (HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>builder()
87+
.withReaderContext(readerContext)
88+
.withHoodieTableMetaClient(hoodieTable.getMetaClient())
89+
.withLatestCommitTime(instantTime)
90+
.withPartitionPath(partitionPath)
91+
.withLogFiles(logFiles)
92+
.withBaseFileOption(Option.empty())
93+
.withDataSchema(writeSchemaWithMetaFields)
94+
.withRequestedSchema(writeSchemaWithMetaFields)
95+
.withInternalSchemaOpt(internalSchemaOption)
96+
.withProps(props)
97+
.withEmitDelete(true)
98+
.withShouldUseRecordPosition(usePosition)
99+
.withSortOutput(hoodieTable.requireSortedRecords())
90100
// instead of using config.enableOptimizedLogBlocksScan(), we set to true as log compaction blocks only supported in scanV2
91101
.withEnableOptimizedLogBlockScan(true).build()) {
92102
recordItr = new CloseableMappingIterator<>(fileGroupReader.getLogRecordsOnly(), record -> {
@@ -97,7 +107,7 @@ public void doAppend() {
97107
header.put(HoodieLogBlock.HeaderMetadataType.COMPACTED_BLOCK_TIMES,
98108
StringUtils.join(fileGroupReader.getValidBlockInstants(), ","));
99109
super.doAppend();
100-
this.readStats = fileGroupReader.getStats();
110+
this.readStats = fileGroupReader.getReadStats();
101111
} catch (IOException e) {
102112
throw new HoodieIOException("Failed to initialize file group reader for " + fileId, e);
103113
}

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ public void doMerge() {
297297

298298
// The stats of inserts, updates, and deletes are updated once at the end
299299
// These will be set in the write stat when closing the merge handle
300-
this.readStats = fileGroupReader.getStats();
300+
this.readStats = fileGroupReader.getReadStats();
301301
this.insertRecordsWritten = readStats.getNumInserts();
302302
this.updatedRecordsWritten = readStats.getNumUpdates();
303303
this.recordsDeleted = readStats.getNumDeletes();
@@ -314,10 +314,10 @@ protected long getMaxMemoryForMerge() {
314314

315315
private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, Option<InternalSchema> internalSchemaOption, TypedProperties props,
316316
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> incomingRecordsItr) {
317-
HoodieFileGroupReader.Builder<T> fileGroupBuilder = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
317+
HoodieFileGroupReader.HoodieFileGroupReaderBuilder<T> fileGroupBuilder = HoodieFileGroupReader.<T>builder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
318318
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
319319
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
320-
.withInternalSchema(internalSchemaOption).withProps(props)
320+
.withInternalSchemaOpt(internalSchemaOption).withProps(props)
321321
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
322322
.withFileGroupUpdateCallback(createCallback());
323323

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -874,14 +874,16 @@ private static <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(
874874
HoodieSchema requestedSchema = metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
875875
: HoodieSchemaUtils.projectSchema(dataSchema, Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0])));
876876
Option<InternalSchema> internalSchemaOption = SerDeHelper.fromJson(dataWriteConfig.getInternalSchema());
877-
HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder()
877+
HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>builder()
878878
.withReaderContext(readerContext)
879879
.withHoodieTableMetaClient(metaClient)
880-
.withFileSlice(fileSlice)
880+
.withBaseFileOption(fileSlice.getBaseFile())
881+
.withLogFiles(fileSlice.getLogFiles())
882+
.withPartitionPath(fileSlice.getPartitionPath())
881883
.withLatestCommitTime(instantTime.get())
882884
.withDataSchema(dataSchema)
883885
.withRequestedSchema(requestedSchema)
884-
.withInternalSchema(internalSchemaOption)
886+
.withInternalSchemaOpt(internalSchemaOption)
885887
.withShouldUseRecordPosition(false)
886888
.withProps(metaClient.getTableConfig().getProps())
887889
.withEnableOptimizedLogBlockScan(dataWriteConfig.enableOptimizedLogBlocksScan())

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public static <T> HoodieData<HoodieRecord> convertWriteStatsToSecondaryIndexReco
126126
// validate that for a given fileId, either we have 1 parquet file or N log files.
127127
AtomicInteger totalParquetFiles = new AtomicInteger();
128128
AtomicInteger totalLogFiles = new AtomicInteger();
129-
writeStats.stream().forEach(writeStat -> {
129+
writeStats.forEach(writeStat -> {
130130
if (FSUtils.isLogFile(new StoragePath(basePath, writeStat.getPath()))) {
131131
totalLogFiles.getAndIncrement();
132132
} else {
@@ -159,7 +159,7 @@ public static <T> HoodieData<HoodieRecord> convertWriteStatsToSecondaryIndexReco
159159
} else { // log files are added in current commit
160160
// add new log files to existing latest file slice and compute the secondary index to primary key mapping.
161161
FileSlice latestFileSlice = fileSliceOption.get();
162-
writeStats.stream().forEach(writeStat -> {
162+
writeStats.forEach(writeStat -> {
163163
StoragePathInfo logFile = new StoragePathInfo(new StoragePath(basePath, writeStat.getPath()), writeStat.getFileSizeInBytes(), false, (short) 0, 0, 0);
164164
latestFileSlice.addLogFile(new HoodieLogFile(logFile));
165165
});
@@ -289,9 +289,11 @@ private static <T> ClosableIterator<Pair<String, String>> createSecondaryIndexRe
289289
boolean allowInflightInstants) throws IOException {
290290
String secondaryKeyField = indexDefinition.getSourceFieldsKey();
291291
HoodieSchema requestedSchema = getRequestedSchemaForSecondaryIndex(metaClient, tableSchema, secondaryKeyField);
292-
HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder()
292+
HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>builder()
293293
.withReaderContext(readerContext)
294-
.withFileSlice(fileSlice)
294+
.withBaseFileOption(fileSlice.getBaseFile())
295+
.withLogFiles(fileSlice.getLogFiles())
296+
.withPartitionPath(fileSlice.getPartitionPath())
295297
.withHoodieTableMetaClient(metaClient)
296298
.withProps(props)
297299
.withLatestCommitTime(instantTime)

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/ClusteringExecutionStrategy.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,19 @@ protected static <R> HoodieFileGroupReader<R> getFileGroupReader(HoodieTableMeta
143143
ReaderContextFactory<R> readerContextFactory, String instantTime,
144144
TypedProperties properties, boolean usePosition, boolean enableLogBlocksScan) {
145145
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
146-
return HoodieFileGroupReader.<R>newBuilder()
147-
.withReaderContext(readerContext).withHoodieTableMetaClient(metaClient).withLatestCommitTime(instantTime)
148-
.withFileSlice(fileSlice).withDataSchema(readerSchema).withRequestedSchema(readerSchema).withInternalSchema(internalSchemaOption)
149-
.withShouldUseRecordPosition(usePosition).withEnableOptimizedLogBlockScan(enableLogBlocksScan).withProps(properties).build();
146+
return HoodieFileGroupReader.<R>builder()
147+
.withReaderContext(readerContext)
148+
.withHoodieTableMetaClient(metaClient)
149+
.withLatestCommitTime(instantTime)
150+
.withBaseFileOption(fileSlice.getBaseFile())
151+
.withLogFiles(fileSlice.getLogFiles())
152+
.withPartitionPath(fileSlice.getPartitionPath())
153+
.withDataSchema(readerSchema)
154+
.withRequestedSchema(readerSchema)
155+
.withInternalSchemaOpt(internalSchemaOption)
156+
.withShouldUseRecordPosition(usePosition)
157+
.withEnableOptimizedLogBlockScan(enableLogBlocksScan)
158+
.withProps(properties)
159+
.build();
150160
}
151161
}

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ private void verifyMetadataMergedRecords(HoodieTableMetaClient metadataMetaClien
946946
HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.getClassSchema()));
947947
HoodieAvroReaderContext readerContext = new HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), metadataMetaClient.getTableConfig(), Option.empty(), Option.empty(),
948948
new TypedProperties());
949-
HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>newBuilder()
949+
HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>builder()
950950
.withReaderContext(readerContext)
951951
.withHoodieTableMetaClient(metadataMetaClient)
952952
.withLogFiles(logFiles.stream())

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ private static Iterator<Row> getExpressionIndexRecordsIterator(HoodieReaderConte
334334
baseFileOption = Option.empty();
335335
logFileStream = Stream.of(new HoodieLogFile(filePath));
336336
}
337-
HoodieFileGroupReader<InternalRow> fileGroupReader = HoodieFileGroupReader.<InternalRow>newBuilder()
337+
HoodieFileGroupReader<InternalRow> fileGroupReader = HoodieFileGroupReader.<InternalRow>builder()
338338
.withReaderContext(readerContext)
339339
.withHoodieTableMetaClient(metaClient)
340340
.withDataSchema(tableSchema)

hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,12 @@ private void readFromMDTFileSliceAndValidate(HoodieTableMetaClient metadataMetaC
218218
HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(metadataSchema);
219219

220220
HoodieAvroReaderContext readerContext = new HoodieAvroReaderContext(metadataMetaClient.getStorageConf(), metadataMetaClient.getTableConfig(), Option.of(instantRange), Option.of(predicate));
221-
HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>newBuilder()
221+
HoodieFileGroupReader<IndexedRecord> fileGroupReader = HoodieFileGroupReader.<IndexedRecord>builder()
222222
.withReaderContext(readerContext)
223223
.withHoodieTableMetaClient(metadataMetaClient)
224-
.withFileSlice(fileSlice)
224+
.withBaseFileOption(fileSlice.getBaseFile())
225+
.withLogFiles(fileSlice.getLogFiles())
226+
.withPartitionPath(fileSlice.getPartitionPath())
225227
.withLatestCommitTime(validMetadataInstant)
226228
.withRequestedSchema(metadataSchema)
227229
.withDataSchema(schema)

0 commit comments

Comments
 (0)