Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ Support writing Parquet INT96 from a timestamp, only valid for parquet files.

### overwrite [boolean]

Flag to decide whether to use overwrite mode when inserting data into Hive. If set to true, for non-partitioned tables, the existing data in the table will be deleted before inserting new data. For partitioned tables, the data in the relevant partition will be deleted before inserting new data.

- Batch mode (BATCH): Delete existing data in the target path before commit (for non-partitioned tables, delete the table directory; for partitioned tables, delete the related partition directories), then write new data.
- Streaming mode (STREAMING): In streaming jobs with checkpointing enabled, `commit()` is invoked after each completed checkpoint. To avoid deleting on every checkpoint (which would wipe previously committed files), SeaTunnel deletes each target directory (table directory / partition directory) at most once (empty commits will skip deletion). On recovery, the delete step is best-effort and may be skipped to avoid deleting already committed data, so streaming overwrite is not a strict snapshot overwrite.

### data_save_mode [enum]

Select how to handle existing data on the target before writing new data.
Expand All @@ -112,8 +117,6 @@ Select how to handle existing data on the target before writing new data.

Note: overwrite=true and data_save_mode=DROP_DATA are equivalent. Use either one; do not set both.

Flag to decide whether to use overwrite mode when inserting data into Hive. If set to true, for non-partitioned tables, the existing data in the table will be deleted before inserting new data. For partitioned tables, the data in the relevant partition will be deleted before inserting new data.

### schema_save_mode [enum]

Before starting the synchronization task, different processing schemes are selected for the existing table structure on the target side.
Expand Down Expand Up @@ -574,4 +577,4 @@ sink {
```
## Changelog

<ChangeLog />
<ChangeLog />
11 changes: 9 additions & 2 deletions docs/zh/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ Kerberos 的 keytab 文件路径

支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。

### schema_save_mode [枚举]
### overwrite [boolean]

是否以覆盖写入(Overwrite)方式写入 Hive。

- 批模式(BATCH):在提交前删除目标路径中已有数据(非分区表删除表目录;分区表删除本次提交涉及的分区目录),再写入新数据。
- 流模式(STREAMING):在启用 checkpoint 的流式运行时,commit 会在每个 checkpoint 完成后触发一次。为避免每个 checkpoint 都重复删除导致数据丢失,SeaTunnel 会对每个目标目录(表目录/分区目录)最多删除一次(空提交会跳过删除)。恢复(recovery)场景下为避免误删已提交数据,删除行为为 best-effort,可能会被跳过,因此不保证严格的“全量覆盖”语义。

### data_save_mode [enum]

Expand All @@ -112,6 +117,8 @@ Kerberos 的 keytab 文件路径

注意:overwrite=true 与 data_save_mode=DROP_DATA 行为等价,二者择一配置即可,勿同时设置。

### schema_save_mode [枚举]

在开始同步任务之前,针对目标端已存在的表结构选择不同的处理方案。

**默认值**: `CREATE_SCHEMA_WHEN_NOT_EXIST`
Expand Down Expand Up @@ -563,4 +570,4 @@ sink {
```
## 变更日志

<ChangeLog />
<ChangeLog />
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Slf4j
Expand All @@ -42,6 +43,29 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
private final boolean abortDropPartitionMetadata;
private final org.apache.seatunnel.api.sink.DataSaveMode dataSaveMode;

/**
* Guard for overwrite semantics in Flink streaming engine.
*
* <p>In streaming mode, {@code commit()} is invoked on every completed checkpoint. For
* overwrite (DROP_DATA), we must avoid deleting target directories on every checkpoint;
* otherwise previously committed files will be wiped and only the last checkpoint's files
* remain.
*
* <p>We delete each target directory (partition directory / table directory) at most once per
* job attempt so that dynamic partitions can still be overwritten when first written.
*/
private final Set<String> deletedTargetDirectories = ConcurrentHashMap.newKeySet();

/**
* Best-effort recovery detection based on the first seen checkpoint id embedded in transaction
* directory name (e.g. .../T_xxx_0_2 means checkpoint 2).
*
* <p>If the first seen checkpoint id is greater than 1, it usually indicates the job is
* recovering from a previous checkpoint. In that case, deleting the target directories would
* destroy already committed data that is consistent with the restored state.
*/
private volatile Long minCheckpointIdSeen = null;

private final ReadonlyConfig readonlyConfig;
private final HiveMetaStoreCatalog hiveMetaStore;

Expand Down Expand Up @@ -69,8 +93,15 @@ public List<FileAggregatedCommitInfo> commit(
List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
log.info("Aggregated commit infos: {}", aggregatedCommitInfos);
if (dataSaveMode == org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA) {
log.info("DataSaveMode=DROP_DATA: delete existing target directories before commit.");
deleteDirectories(aggregatedCommitInfos);
updateMinCheckpointIdSeen(aggregatedCommitInfos);
if (minCheckpointIdSeen != null && minCheckpointIdSeen > 1) {
log.info(
"DataSaveMode=DROP_DATA: skip deleting target directories before commit."
+ " Recovery is detected, minCheckpointIdSeen={}",
minCheckpointIdSeen);
} else {
deleteDirectories(aggregatedCommitInfos);
}
}

List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
Expand Down Expand Up @@ -134,12 +165,14 @@ public void close() throws IOException {
*
* @param aggregatedCommitInfos
*/
private void deleteDirectories(List<FileAggregatedCommitInfo> aggregatedCommitInfos)
private boolean deleteDirectories(List<FileAggregatedCommitInfo> aggregatedCommitInfos)
throws IOException {
if (aggregatedCommitInfos.isEmpty()) {
return;
return false;
}

boolean anyDeleted = false;

for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
LinkedHashMap<String, LinkedHashMap<String, String>> transactionMap =
aggregatedCommitInfo.getTransactionMap();
Expand All @@ -162,28 +195,144 @@ private void deleteDirectories(List<FileAggregatedCommitInfo> aggregatedCommitIn
if (aggregatedCommitInfo.getPartitionDirAndValuesMap().isEmpty()) {
// For non-partitioned table, extract and delete table directory
// Example: hdfs://hadoop-master1:8020/warehouse/test_overwrite_1/
String tableDir = targetPath.substring(0, targetPath.lastIndexOf('/'));
hadoopFileSystemProxy.deleteFile(tableDir);
log.info("Deleted table directory: {}", tableDir);
int lastSeparator =
Math.max(targetPath.lastIndexOf('/'), targetPath.lastIndexOf('\\'));
if (lastSeparator <= 0) {
log.warn(
"Skip deleting table directory because target path has no separator: {}",
targetPath);
continue;
}
String tableDir = targetPath.substring(0, lastSeparator);
if (deleteTargetDirectoryOnce(tableDir)) {
log.info("Deleted table directory: {}", tableDir);
anyDeleted = true;
}
} else {
// For partitioned table, extract and delete partition directories
// Example:
// hdfs://hadoop-master1:8020/warehouse/test_overwrite_partition/age=26/
Set<String> partitionDirs =
transactionMap.values().stream()
.flatMap(m -> m.values().stream())
.map(path -> path.substring(0, path.lastIndexOf('/')))
.map(
path -> {
int sep =
Math.max(
path.lastIndexOf('/'),
path.lastIndexOf('\\'));
if (sep <= 0) {
return null;
}
return path.substring(0, sep);
})
.filter(p -> p != null && !p.isEmpty())
.collect(Collectors.toSet());

for (String partitionDir : partitionDirs) {
hadoopFileSystemProxy.deleteFile(partitionDir);
log.info("Deleted partition directory: {}", partitionDir);
if (deleteTargetDirectoryOnce(partitionDir)) {
log.info("Deleted partition directory: {}", partitionDir);
anyDeleted = true;
}
}
}
} catch (IOException e) {
log.error("Failed to delete directories", e);
throw e;
}
}

return anyDeleted;
}

private boolean deleteTargetDirectoryOnce(String directory) throws IOException {
if (directory == null || directory.isEmpty()) {
return false;
}

String normalized = normalizeDirectoryPath(directory);
if (normalized.isEmpty()) {
return false;
}

if (!deletedTargetDirectories.add(normalized)) {
return false;
}

hadoopFileSystemProxy.deleteFile(directory);
return true;
}

private String normalizeDirectoryPath(String directory) {
String normalized = directory.replace('\\', '/');
while (normalized.endsWith("/")) {
normalized = normalized.substring(0, normalized.length() - 1);
}
return normalized;
}

private void updateMinCheckpointIdSeen(List<FileAggregatedCommitInfo> aggregatedCommitInfos) {
if (aggregatedCommitInfos == null || aggregatedCommitInfos.isEmpty()) {
return;
}

long minInThisCommit = Long.MAX_VALUE;
boolean found = false;

for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
if (aggregatedCommitInfo == null || aggregatedCommitInfo.getTransactionMap() == null) {
continue;
}
for (String transactionDir : aggregatedCommitInfo.getTransactionMap().keySet()) {
long checkpointId = parseCheckpointIdFromTransactionDir(transactionDir);
if (checkpointId > 0) {
minInThisCommit = Math.min(minInThisCommit, checkpointId);
found = true;
}
}
}

if (!found) {
return;
}

if (minCheckpointIdSeen == null) {
minCheckpointIdSeen = minInThisCommit;
} else {
minCheckpointIdSeen = Math.min(minCheckpointIdSeen, minInThisCommit);
}
}

/**
* Parses checkpoint id from transaction directory.
*
* <p>Expected pattern in transaction dir name: .../T_..._<subtaskIndex>_<checkpointId>
*/
private long parseCheckpointIdFromTransactionDir(String transactionDir) {
if (transactionDir == null || transactionDir.isEmpty()) {
return -1;
}

String normalized = transactionDir.replace('\\', '/');
while (normalized.endsWith("/")) {
normalized = normalized.substring(0, normalized.length() - 1);
}
int lastSlash = normalized.lastIndexOf('/');
String baseName = lastSlash >= 0 ? normalized.substring(lastSlash + 1) : normalized;
if (baseName.isEmpty()) {
return -1;
}

int lastUnderscore = baseName.lastIndexOf('_');
if (lastUnderscore < 0 || lastUnderscore == baseName.length() - 1) {
return -1;
}

String lastToken = baseName.substring(lastUnderscore + 1);
try {
return Long.parseLong(lastToken);
} catch (NumberFormatException ignored) {
return -1;
}
}
}
Loading