Skip to content

Commit 4b043b1

Browse files
authored
Refactor to Delta shallow clone (delta-io#2789)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description minor refactor to delta shallow clone to reorder and simplify logics. ## How was this patch tested? UT
1 parent ff9d819 commit 4b043b1

File tree

1 file changed

+16
-19
lines changed

1 file changed

+16
-19
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/CloneTableBase.scala

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -191,40 +191,37 @@ abstract class CloneTableBase(
191191
destinationTable.createLogDirectory()
192192
}
193193

194+
val metadataToUpdate = determineTargetMetadata(txn.snapshot, deltaOperation.name)
195+
// Don't merge in the default properties when cloning, or we'll end up with different sets of
196+
// properties between source and target.
197+
txn.updateMetadata(metadataToUpdate, ignoreDefaultProperties = true)
194198
val (
195-
datasetOfNewFilesToAdd
199+
addedFileList
196200
) = {
197201
// Make sure target table is empty before running clone
198202
if (txn.snapshot.allFiles.count() > 0) {
199203
throw DeltaErrors.cloneReplaceNonEmptyTable
200204
}
201-
sourceTable.allFiles
205+
val toAdd = sourceTable.allFiles
206+
// absolutize file paths
207+
handleNewDataFiles(
208+
deltaOperation.name,
209+
toAdd,
210+
qualifiedSource,
211+
destinationTable).collectAsList()
202212
}
203213

204-
val metadataToUpdate = determineTargetMetadata(txn.snapshot, deltaOperation.name)
205-
// Don't merge in the default properties when cloning, or we'll end up with different sets of
206-
// properties between source and target.
207-
txn.updateMetadata(metadataToUpdate, ignoreDefaultProperties = true)
208-
209-
val datasetOfAddedFileList = handleNewDataFiles(
210-
deltaOperation.name,
211-
datasetOfNewFilesToAdd,
212-
qualifiedSource,
213-
destinationTable)
214-
215-
val addedFileList = datasetOfAddedFileList.collectAsList()
216-
217214
val (addedFileCount, addedFilesSize) =
218-
(addedFileList.size.toLong, totalDataSize(addedFileList.iterator))
219-
220-
val operationTimestamp = sourceTable.clock.getTimeMillis()
215+
(addedFileList.size.toLong, totalDataSize(addedFileList.iterator))
221216

222217

223218
val newProtocol = determineTargetProtocol(spark, txn, deltaOperation.name)
219+
val addFileIter =
220+
addedFileList.iterator.asScala
224221

225222
try {
226223
var actions: Iterator[Action] =
227-
addedFileList.iterator.asScala.map { fileToCopy =>
224+
addFileIter.map { fileToCopy =>
228225
val copiedFile = fileToCopy.copy(dataChange = dataChangeInFileAction)
229226
// CLONE does not preserve Row IDs and Commit Versions
230227
copiedFile.copy(baseRowId = None, defaultRowCommitVersion = None)

0 commit comments

Comments
 (0)