Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ private void createManifestFile(HoodieLSMTimelineManifest manifest, int currentV
int newVersion = currentVersion < 0 ? 1 : currentVersion + 1;
// create manifest file
final StoragePath manifestFilePath = LSMTimeline.getManifestFilePath(newVersion, archivePath);
// the version is basically the latest version plus 1, if the preceding failed write succeed
// to write a manifest file but failed to write the version file, a corrupt manifest file was left with just the `newVersion`.
deleteIfExists(manifestFilePath);
metaClient.getStorage().createImmutableFileInPath(manifestFilePath, Option.of(HoodieInstantWriter.convertByteArrayToWriter(content)));
// update version file
updateVersionFile(newVersion);
Expand All @@ -215,6 +218,7 @@ private void updateVersionFile(int newVersion) throws IOException {
byte[] content = getUTF8Bytes(String.valueOf(newVersion));
final StoragePath versionFilePath = LSMTimeline.getVersionFilePath(archivePath);
metaClient.getStorage().deleteFile(versionFilePath);
// if the step fails here, either the writer or reader would list the manifest files to find the latest snapshot version.
metaClient.getStorage().createImmutableFileInPath(versionFilePath, Option.of(HoodieInstantWriter.convertByteArrayToWriter(content)));
}

Expand Down Expand Up @@ -293,9 +297,11 @@ private Option<String> doCompact(HoodieLSMTimelineManifest manifest, int layer)
return Option.empty();
}

public void compactFiles(List<String> candidateFiles, String compactedFileName) {
public void compactFiles(List<String> candidateFiles, String compactedFileName) throws IOException {
LOG.info("Starting to compact source files.");
try (HoodieFileWriter writer = openWriter(new StoragePath(archivePath, compactedFileName))) {
StoragePath compactedFilePath = new StoragePath(archivePath, compactedFileName);
deleteIfExists(compactedFilePath);
try (HoodieFileWriter writer = openWriter(compactedFilePath)) {
for (String fileName : candidateFiles) {
// Read the input source file
try (HoodieAvroParquetReader reader = (HoodieAvroParquetReader) HoodieIOFactory.getIOFactory(metaClient.getStorage())
Expand Down Expand Up @@ -406,6 +412,14 @@ public static String compactedFileName(List<String> files) {
return newFileName(minInstant, maxInstant, currentLayer + 1);
}

private void deleteIfExists(StoragePath filePath) throws IOException {
if (metaClient.getStorage().exists(filePath)) {
// delete file if exists when try to overwrite file
metaClient.getStorage().deleteFile(filePath);
LOG.info("Delete corrupt file: {} left by failed write", filePath);
}
}

/**
* Get or create a writer config for parquet writer.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
Expand Down Expand Up @@ -1563,6 +1564,12 @@ public boolean isEmbeddedTimelineServerReuseEnabled() {
return getBoolean(EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED);
}

public boolean isRemoteViewStorageType() {
FileSystemViewStorageType storageType = getViewStorageConfig().getStorageType();
return storageType == FileSystemViewStorageType.REMOTE_ONLY
|| storageType == FileSystemViewStorageType.REMOTE_FIRST;
}

public int getEmbeddedTimelineServerPort() {
return Integer.parseInt(getStringOrDefault(EMBEDDED_TIMELINE_SERVER_PORT_NUM));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static WriteMarkers get(MarkerType markerType, HoodieTable table, String
case DIRECT:
return getDirectWriteMarkers(table, instantTime);
case TIMELINE_SERVER_BASED:
if (!table.getConfig().isEmbeddedTimelineServerEnabled()) {
if (!table.getConfig().isEmbeddedTimelineServerEnabled() && !table.getConfig().isRemoteViewStorageType()) {
LOG.warn("Timeline-server-based markers are configured as the marker type "
+ "but embedded timeline server is not enabled. Falling back to direct markers.");
return getDirectWriteMarkers(table, instantTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
Expand All @@ -48,6 +49,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
Expand Down Expand Up @@ -100,6 +102,27 @@ void testReadCompletionTime() throws Exception {
}
}

@Test
void testReadCompletionTimeWithCornerCase() throws Exception {
String tableName = "testTable";
String tablePath = tempFile.getAbsolutePath() + StoragePath.SEPARATOR + tableName;
HoodieTableMetaClient metaClient = HoodieTestUtils.init(
HoodieTestUtils.getDefaultStorageConf(), tablePath, HoodieTableType.COPY_ON_WRITE, tableName);
prepareTimeline(tablePath, metaClient, (lsmTimelineWriter, activeActions) -> {
// archive [1, 2], [3, 6] specifically to create corner cases
lsmTimelineWriter.write(activeActions.subList(0, 2), Option.empty(), Option.empty());
lsmTimelineWriter.write(activeActions.subList(2, 6), Option.empty(), Option.empty());
});
try (CompletionTimeQueryView view =
metaClient.getTimelineLayout().getTimelineFactory().createCompletionTimeQueryView(metaClient)) {
// 1. first time, we try to load archived instant from 5, so the cursor will move forward from 7 to 5
assertThat(view.getCompletionTime(String.format("%08d", 5)).orElse(""), is(String.format("%08d", 1005)));

// 2. then we try to load archived instant from 4, it should get the completion time correctly
assertThat(view.getCompletionTime(String.format("%08d", 4)).orElse(""), is(String.format("%08d", 1004)));
}
}

@Test
void testReadStartTime() throws Exception {
String tableName = "testTable";
Expand Down Expand Up @@ -148,7 +171,17 @@ private String getInstantTimeSetFormattedString(CompletionTimeQueryView view, in
.stream().sorted().collect(Collectors.joining(","));
}

private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient) throws Exception {
private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient, List<Pair<String, String>> instantRequestedAndCompletionTime, String requestedInstantTime) throws Exception {
prepareTimeline(tablePath, metaClient, instantRequestedAndCompletionTime, requestedInstantTime, (writer, activeActions) -> {
// archive [1,2], [3,4], [5,6] separately
writer.write(activeActions.subList(0, 2), Option.empty(), Option.empty());
writer.write(activeActions.subList(2, 4), Option.empty(), Option.empty());
writer.write(activeActions.subList(4, 6), Option.empty(), Option.empty());
});
}

private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient, List<Pair<String, String>> instantRequestedAndCompletionTime, String requestedInstantTime,
BiConsumer<LSMTimelineWriter, List<ActiveAction>> timelineWriterListBiConsumer) throws Exception {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withMarkersType("DIRECT")
Expand All @@ -168,10 +201,7 @@ private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient)
testTable.addRequestedCommit(String.format("%08d", 11));
List<HoodieInstant> instants = TIMELINE_FACTORY.createActiveTimeline(metaClient, false).getInstantsAsStream().sorted().collect(Collectors.toList());
LSMTimelineWriter writer = LSMTimelineWriter.getInstance(writeConfig, getMockHoodieTable(metaClient));
// archive [1,2], [3,4], [5,6] separately
writer.write(activeActions.subList(0, 2), Option.empty(), Option.empty());
writer.write(activeActions.subList(2, 4), Option.empty(), Option.empty());
writer.write(activeActions.subList(4, 6), Option.empty(), Option.empty());
timelineWriterListBiConsumer.accept(writer, activeActions);
// reconcile the active timeline
instants.subList(0, 3 * 6).forEach(
instant -> TimelineUtils.deleteInstantFile(metaClient.getStorage(),
Expand All @@ -181,6 +211,26 @@ private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient)
"should archive 6 instants with 4 as active");
}

private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient) throws Exception {
List<Pair<String, String>> instantRequestedAndCompletionTime = new ArrayList<>();
for (int i = 1; i < 11; i++) {
String instantTime = String.format("%08d", i);
String completionTime = String.format("%08d", i + 1000);
instantRequestedAndCompletionTime.add(Pair.of(instantTime, completionTime));
}
prepareTimeline(tablePath, metaClient, instantRequestedAndCompletionTime, String.format("%08d", 11));
}

private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient, BiConsumer<LSMTimelineWriter, List<ActiveAction>> timelineWriterListBiConsumer) throws Exception {
List<Pair<String, String>> instantRequestedAndCompletionTime = new ArrayList<>();
for (int i = 1; i < 11; i++) {
String instantTime = String.format("%08d", i);
String completionTime = String.format("%08d", i + 1000);
instantRequestedAndCompletionTime.add(Pair.of(instantTime, completionTime));
}
prepareTimeline(tablePath, metaClient, instantRequestedAndCompletionTime, String.format("%08d", 11), timelineWriterListBiConsumer);
}

@SuppressWarnings("rawtypes")
private HoodieTable getMockHoodieTable(HoodieTableMetaClient metaClient) {
HoodieTable hoodieTable = mock(HoodieTable.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,31 @@ public void testTimelineServerBasedMarkersWithHDFS() {
HoodieTableVersion.SIX, true, DirectWriteMarkersV1.class);
}

@Test
public void testTimelineServerBasedMarkersWithRemoteViewStorageType() {
// Fallback to direct markers should happen
testWriteMarkersFactory(
MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH,
HoodieTableVersion.current(), false, true, TimelineServerBasedWriteMarkers.class);
testWriteMarkersFactory(
MarkerType.TIMELINE_SERVER_BASED, NON_HDFS_BASE_PATH,
HoodieTableVersion.SIX, false, true, TimelineServerBasedWriteMarkersV1.class);
}

private void testWriteMarkersFactory(
MarkerType markerTypeConfig, String basePath, HoodieTableVersion tableVersion,
boolean isTimelineServerEnabled, Class<?> expectedWriteMarkersClass) {
testWriteMarkersFactory(markerTypeConfig, basePath, tableVersion, isTimelineServerEnabled, false, expectedWriteMarkersClass);
}

private void testWriteMarkersFactory(
MarkerType markerTypeConfig, String basePath, HoodieTableVersion tableVersion,
boolean isTimelineServerEnabled, boolean isRemoteViewStorageType, Class<?> expectedWriteMarkersClass) {
String instantTime = "001";
when(table.getConfig()).thenReturn(writeConfig);
when(writeConfig.isEmbeddedTimelineServerEnabled())
.thenReturn(isTimelineServerEnabled);
when(writeConfig.isRemoteViewStorageType()).thenReturn(isRemoteViewStorageType);
when(table.getMetaClient()).thenReturn(metaClient);
when(metaClient.getStorage()).thenReturn(storage);
when(storage.getFileSystem()).thenReturn(fileSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ public TimeRangeFilter(String startTs, String endTs) {
public boolean isInRange(String instantTime) {
return InstantComparison.isInRange(instantTime, this.startTs, this.endTs);
}

/**
* Returns whether the given instant time range has overlapping with the current range.
*/
public boolean hasOverlappingInRange(String startInstant, String endInstant) {
return isInRange(startInstant) || isInRange(endInstant) || isContained(startInstant, endInstant);
}

private boolean isContained(String startInstant, String endInstant) {
// the given range is finite and expected to be covered by the current range which naturally cannot be infinite.
return startTs != null && InstantComparison.compareTimestamps(startTs, InstantComparison.GREATER_THAN_OR_EQUALS, startInstant)
&& endTs != null && InstantComparison.compareTimestamps(endTs, InstantComparison.LESSER_THAN_OR_EQUALS, endInstant);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static Schema getReadSchema(HoodieArchivedTimeline.LoadMode loadMode) {
public static boolean isFileInRange(HoodieArchivedTimeline.TimeRangeFilter filter, String fileName) {
String minInstant = getMinInstantTime(fileName);
String maxInstant = getMaxInstantTime(fileName);
return filter.isInRange(minInstant) || filter.isInRange(maxInstant);
return filter.hasOverlappingInRange(minInstant, maxInstant);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -251,7 +252,12 @@ private DataStream<MergeOnReadInputSplit> addFileDistributionStrategy(SingleOutp
} else if (OptionsResolver.isAppendMode(conf)) {
return source.partitionCustom(new StreamReadAppendPartitioner(conf.getInteger(FlinkOptions.READ_TASKS)), new StreamReadAppendKeySelector());
} else {
return source.keyBy(MergeOnReadInputSplit::getFileId);
return source.keyBy(new KeySelector<MergeOnReadInputSplit, String>() {
@Override
public String getKey(MergeOnReadInputSplit split) throws Exception {
return split.getFileId();
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.config.HoodieCleanConfig;
Expand All @@ -32,7 +34,10 @@
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.utils.TestWriteBase;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestData;

import org.apache.flink.configuration.Configuration;
Expand All @@ -43,11 +48,13 @@

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test cases for stream write.
Expand Down Expand Up @@ -684,4 +691,25 @@ public void testRollbackFailedWritesWithLazyCleanPolicy() throws Exception {
.assertNextEvent()
.end();
}

@ParameterizedTest
@EnumSource(MarkerType.class)
public void testMarkType(MarkerType markerType) throws Exception {
conf.setString(HoodieWriteConfig.MARKERS_TYPE.key(), markerType.toString());
TestHarness testHarness =
preparePipeline(conf)
.consume(TestData.DATA_SET_INSERT)
// no checkpoint, so the coordinator does not accept any events
.emptyEventBuffer()
.checkpoint(1)
.assertNextEvent(4, "par1,par2,par3,par4");
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(conf);
List<StoragePathInfo> files = metaClient.getStorage().listFiles(new StoragePath(metaClient.getTempFolderPath()));
if (markerType == MarkerType.DIRECT) {
assertTrue(files.stream().allMatch(f -> f.getPath().getName().endsWith("marker.CREATE")));
} else {
assertTrue(files.stream().noneMatch(f -> f.getPath().getName().endsWith("marker.CREATE")));
}
testHarness.checkpointComplete(1).checkWrittenData(EXPECTED1).end();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -2362,6 +2363,21 @@ void testReadWithParquetPredicatePushDown() {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testWriteWithTimelineServerBasedMarker(HoodieTableType tableType) {
String hoodieTableDDL = sql("t1")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.TABLE_TYPE, tableType)
.option(HoodieWriteConfig.MARKERS_TYPE.key(), MarkerType.TIMELINE_SERVER_BASED.name())
.end();
batchTableEnv.executeSql(hoodieTableDDL);

execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
List<Row> rows = CollectionUtil.iteratorToList(batchTableEnv.executeSql("select * from t1").collect());
assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Loading
Loading