Skip to content

Commit 0dceda6

Browse files
authored
[Delta] Support absolute path in Clone (delta-io#4729)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This change allows CLONE Iceberg to accept Iceberg tables with data files not under the table path. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Unit test <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> NO
1 parent cb36530 commit 0dceda6

File tree

5 files changed

+14
-4
lines changed

5 files changed

+14
-4
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class IcebergFileManifest(
7979
_sizeInBytes.get
8080
}
8181

82+
override def supportAbsolutePath: Boolean = true
83+
8284
def allFiles: Dataset[ConvertTargetFile] = {
8385
if (fileSparkResults.isEmpty) fileSparkResults = Some(getFileSparkResults())
8486
fileSparkResults.get

spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableCommand.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,14 @@ abstract class CloneConvertedSource(spark: SparkSession) extends CloneSource {
239239
val partitionSchema = convertTargetTable.partitionSchema
240240

241241
{
242-
convertTargetTable.fileManifest.allFiles.mapPartitions { targetFile =>
242+
val fileManifest = convertTargetTable.fileManifest
243+
val supportAbsolutePath = fileManifest.supportAbsolutePath
244+
fileManifest.allFiles.mapPartitions { targetFile =>
243245
val basePath = new Path(baseDir)
244246
val fs = basePath.getFileSystem(conf.value.value)
245247
targetFile.map(ConvertUtils.createAddFile(
246-
_, basePath, fs, SQLConf.get, Some(partitionSchema)))
248+
_, basePath, fs, SQLConf.get, Some(partitionSchema),
249+
supportAbsolutePath))
247250
}
248251
}
249252
}

spark/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,13 @@ abstract class ConvertToDeltaCommandBase(
305305
val shouldCollectStats = collectStats && statsEnabled
306306
val statsBatchSize = conf.getConf(DeltaSQLConf.DELTA_IMPORT_BATCH_SIZE_STATS_COLLECTION)
307307
var numFiles = 0L
308+
val useAbsolutePath = manifest.supportAbsolutePath
308309
manifest.getFiles.grouped(statsBatchSize).flatMap { batch =>
309310
val adds = batch.map(
310311
ConvertUtils.createAddFile(
311-
_, txn.deltaLog.dataPath, fs, conf, Some(partitionSchema), deltaPath.isDefined))
312+
_, txn.deltaLog.dataPath, fs, conf, Some(partitionSchema),
313+
useAbsolutePath || deltaPath.isDefined
314+
))
312315
if (shouldCollectStats) {
313316
logInfo(
314317
log"Collecting stats for a batch of " +

spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/ConvertUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ trait ConvertUtilsBase extends DeltaLogging {
221221
s"Fail to relativize path $path against base path $basePath.")
222222
relativePath.toUri.toString
223223
} else {
224-
path.toUri.toString
224+
fs.makeQualified(path).toUri.toString
225225
}
226226

227227
AddFile(pathStrForAddFile, partition, file.length, file.modificationTime, dataChange = true,

spark/src/main/scala/org/apache/spark/sql/delta/commands/convert/interfaces.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ trait ConvertTargetFileManifest extends Closeable {
6767
/** Return the active files for a table in sequence */
6868
def getFiles: Iterator[ConvertTargetFile] = allFiles.toLocalIterator().asScala
6969

70+
/** All data file paths are required to be relative to table path when false */
71+
def supportAbsolutePath: Boolean = false
7072
/** Return the number of files for the table */
7173
def numFiles: Long = allFiles.count()
7274

0 commit comments

Comments
 (0)