Skip to content

Commit 283ac02

Browse files
authored
[Spark] Add read support for baseRowID (delta-io#2779)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Adding the `base_row_id` field to the `_metadata` column for Delta tables. This field contains the value in the `baseRowId` field of the `AddFile` action for the file, allowing us to read the `base_row_id` from the file metadata after it is stored. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Added UTs. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? No. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 4b043b1 commit 283ac02

File tree

6 files changed

+246
-13
lines changed

6 files changed

+246
-13
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ case class DeltaParquetFileFormat(
200200
// Parquet reader in Spark has a bug where a file containing 2b+ rows in a single rowgroup
201201
// causes it to run out of the `Integer` range (TODO: Create a SPARK issue)
202202
// For Delta Parquet readers don't expose the row_index field as a metadata field.
203-
super.metadataSchemaFields.filter(field => field != ParquetFileFormat.ROW_INDEX_FIELD)
203+
super.metadataSchemaFields.filter(field => field != ParquetFileFormat.ROW_INDEX_FIELD) ++
204+
RowId.createBaseRowIdField(protocol, metadata)
204205
}
205206

206207
override def prepareWrite(
@@ -224,6 +225,17 @@ case class DeltaParquetFileFormat(
224225
factory
225226
}
226227

228+
override def fileConstantMetadataExtractors: Map[String, PartitionedFile => Any] = {
229+
val extractBaseRowId: PartitionedFile => Any = { file =>
230+
file.otherConstantMetadataColumnValues.getOrElse(RowId.BASE_ROW_ID, {
231+
throw new IllegalStateException(
232+
s"Missing ${RowId.BASE_ROW_ID} value for file '${file.filePath}'")
233+
})
234+
}
235+
super.fileConstantMetadataExtractors
236+
.updated(RowId.BASE_ROW_ID, extractBaseRowId)
237+
}
238+
227239
def copyWithDVInfo(
228240
tablePath: String,
229241
broadcastDvMap: Broadcast[Map[URI, DeletionVectorDescriptorWithFilterType]],

spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ package org.apache.spark.sql.delta
1818

1919
import org.apache.spark.sql.delta.actions.{Action, AddFile, DomainMetadata, Metadata, Protocol}
2020
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey
21+
import org.apache.spark.sql.util.ScalaExtensions._
22+
23+
import org.apache.spark.sql.catalyst.expressions.FileSourceConstantMetadataStructField
24+
import org.apache.spark.sql.types
25+
import org.apache.spark.sql.types.{DataType, LongType, MetadataBuilder, StructField}
2126

2227
/**
2328
* Collection of helpers to handle Row IDs.
@@ -108,4 +113,48 @@ object RowId {
108113
*/
109114
private[delta] def extractHighWatermark(snapshot: Snapshot): Option[Long] =
110115
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
116+
117+
/** Base Row ID column name */
118+
val BASE_ROW_ID = "base_row_id"
119+
120+
/*
121+
* A specialization of [[FileSourceConstantMetadataStructField]] used to represent base RowId
122+
* columns.
123+
*/
124+
object BaseRowIdMetadataStructField {
125+
private val BASE_ROW_ID_METADATA_COL_ATTR_KEY = s"__base_row_id_metadata_col"
126+
127+
def metadata: types.Metadata = new MetadataBuilder()
128+
.withMetadata(FileSourceConstantMetadataStructField.metadata(BASE_ROW_ID))
129+
.putBoolean(BASE_ROW_ID_METADATA_COL_ATTR_KEY, value = true)
130+
.build()
131+
132+
def apply(): StructField =
133+
StructField(
134+
BASE_ROW_ID,
135+
LongType,
136+
nullable = false,
137+
metadata = metadata)
138+
139+
def unapply(field: StructField): Option[StructField] =
140+
Some(field).filter(isBaseRowIdColumn)
141+
142+
/** Return true if the column is a base Row ID column. */
143+
def isBaseRowIdColumn(structField: StructField): Boolean =
144+
isValid(structField.dataType, structField.metadata)
145+
146+
def isValid(dataType: DataType, metadata: types.Metadata): Boolean = {
147+
FileSourceConstantMetadataStructField.isValid(dataType, metadata) &&
148+
metadata.contains(BASE_ROW_ID_METADATA_COL_ATTR_KEY) &&
149+
metadata.getBoolean(BASE_ROW_ID_METADATA_COL_ATTR_KEY)
150+
}
151+
}
152+
153+
/**
154+
* The field readers can use to access the base row id column.
155+
*/
156+
def createBaseRowIdField(protocol: Protocol, metadata: Metadata): Option[StructField] =
157+
Option.when(RowId.isEnabled(protocol, metadata)) {
158+
BaseRowIdMetadataStructField()
159+
}
111160
}

spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@ package org.apache.spark.sql.delta.files
2020
import java.net.URI
2121
import java.util.Objects
2222

23+
import scala.collection.mutable
2324
import org.apache.spark.sql.delta.RowIndexFilterType
2425
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaErrors, DeltaLog, NoMapping, Snapshot, SnapshotDescriptor}
26+
import org.apache.spark.sql.delta.RowId
2527
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol}
2628
import org.apache.spark.sql.delta.implicits._
2729
import org.apache.spark.sql.delta.schema.SchemaUtils
@@ -107,23 +109,29 @@ abstract class TahoeFileIndex(
107109
matchingFiles(partitionFilters, dataFilters).groupBy(_.partitionValues)
108110
}
109111

112+
/**
113+
* Generates a FileStatusWithMetadata using data extracted from a given AddFile.
114+
*/
115+
def fileStatusWithMetadataFromAddFile(addFile: AddFile): FileStatusWithMetadata = {
116+
val fs = new FileStatus(
117+
/* length */ addFile.size,
118+
/* isDir */ false,
119+
/* blockReplication */ 0,
120+
/* blockSize */ 1,
121+
/* modificationTime */ addFile.modificationTime,
122+
/* path */ absolutePath(addFile.path))
123+
val metadata = mutable.Map.empty[String, Any]
124+
addFile.baseRowId.foreach(baseRowId => metadata.put(RowId.BASE_ROW_ID, baseRowId))
125+
126+
FileStatusWithMetadata(fs, metadata.toMap)
127+
}
110128

111129
def makePartitionDirectories(
112130
partitionValuesToFiles: Seq[(InternalRow, Seq[AddFile])]): Seq[PartitionDirectory] = {
113131
val timeZone = spark.sessionState.conf.sessionLocalTimeZone
114132
partitionValuesToFiles.map {
115133
case (partitionValues, files) =>
116-
117-
val fileStatuses = files.map { f =>
118-
new FileStatus(
119-
/* length */ f.size,
120-
/* isDir */ false,
121-
/* blockReplication */ 0,
122-
/* blockSize */ 1,
123-
/* modificationTime */ f.modificationTime,
124-
absolutePath(f.path))
125-
}.toArray
126-
134+
val fileStatuses = files.map(f => fileStatusWithMetadataFromAddFile(f)).toArray
127135
PartitionDirectory(partitionValues, fileStatuses)
128136
}
129137
}

spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeRemoveFileIndex.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class TahoeRemoveFileIndex(
7373
modificationTime = 0,
7474
dataChange = r.dataChange,
7575
tags = r.tags,
76-
deletionVector = r.deletionVector
76+
deletionVector = r.deletionVector,
77+
baseRowId = r.baseRowId
7778
)
7879
}
7980
}

spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdSuite.scala

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.{QueryTest, Row}
3030
import org.apache.spark.sql.catalyst.TableIdentifier
3131
import org.apache.spark.sql.functions.col
3232
import org.apache.spark.sql.test.SharedSparkSession
33+
import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField, StructType}
3334

3435
class RowIdSuite extends QueryTest
3536
with SharedSparkSession
@@ -380,4 +381,162 @@ class RowIdSuite extends QueryTest
380381
}
381382
}
382383
}
384+
385+
test("Base Row ID metadata field has the expected type") {
386+
withRowTrackingEnabled(enabled = true) {
387+
withTempDir { tempDir =>
388+
spark.range(start = 0, end = 20).toDF("id")
389+
.write.format("delta").save(tempDir.getAbsolutePath)
390+
391+
val df = spark.read.format("delta").load(tempDir.getAbsolutePath)
392+
.select(QUALIFIED_BASE_ROW_ID_COLUMN_NAME)
393+
394+
val expectedBaseRowIdMetadata = new MetadataBuilder()
395+
.putBoolean("__base_row_id_metadata_col", value = true)
396+
.build()
397+
398+
val expectedBaseRowIdField = StructField(
399+
RowId.BASE_ROW_ID,
400+
LongType,
401+
nullable = false,
402+
metadata = expectedBaseRowIdMetadata)
403+
404+
Seq(df.schema, df.queryExecution.analyzed.schema, df.queryExecution.optimizedPlan.schema)
405+
.foreach { schema =>
406+
assert(schema === new StructType().add(expectedBaseRowIdField))
407+
}
408+
}
409+
}
410+
}
411+
412+
test("Base Row IDs can be read with conflicting metadata column name") {
413+
withRowTrackingEnabled(enabled = true) {
414+
withTempDir { tempDir =>
415+
// Generate 2 files with base Row ID 0 and 20 resp.
416+
spark.range(start = 0, end = 20).toDF("_metadata").repartition(1)
417+
.write.format("delta").save(tempDir.getAbsolutePath)
418+
spark.range(start = 20, end = 30).toDF("_metadata").repartition(1)
419+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
420+
421+
val df = spark.read.format("delta").load(tempDir.getAbsolutePath).select("_metadata")
422+
423+
val dfWithConflict = df
424+
.select(
425+
col("_metadata"),
426+
df.metadataColumn("_metadata")
427+
.getField({RowId.BASE_ROW_ID})
428+
.as("real_base_row_id"))
429+
.where("real_base_row_id % 5 = 0")
430+
431+
checkAnswer(dfWithConflict,
432+
(0 until 20).map(Row(_, 0)) ++
433+
(20 until 30).map(Row(_, 20)))
434+
}
435+
}
436+
}
437+
438+
test("Base Row IDs can be read through the Scala syntax") {
439+
withRowTrackingEnabled(enabled = true) {
440+
withTempDir { tempDir =>
441+
// Generate 2 files with base Row ID 0 and 20 resp.
442+
spark.range(start = 0, end = 20).toDF("id").repartition(1)
443+
.write.format("delta").save(tempDir.getAbsolutePath)
444+
spark.range(start = 20, end = 30).toDF("id").repartition(1)
445+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
446+
447+
val df = spark.read.format("delta").load(tempDir.getAbsolutePath)
448+
.select("id", QUALIFIED_BASE_ROW_ID_COLUMN_NAME)
449+
450+
checkAnswer(df,
451+
(0 until 20).map(Row(_, 0)) ++
452+
(20 until 30).map(Row(_, 20)))
453+
}
454+
}
455+
}
456+
457+
test("Base Row IDs can be read through the SQL syntax") {
458+
withRowTrackingEnabled(enabled = true) {
459+
withTempDir { tempDir =>
460+
// Generate 2 files with base Row ID 0 and 20 resp.
461+
spark.range(start = 0, end = 20).toDF("id").repartition(1)
462+
.write.format("delta").save(tempDir.getAbsolutePath)
463+
spark.range(start = 20, end = 30).toDF("id").repartition(1)
464+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
465+
466+
val rows = sql(
467+
s"""
468+
|SELECT id, $QUALIFIED_BASE_ROW_ID_COLUMN_NAME FROM delta.`${tempDir.getAbsolutePath}`
469+
""".stripMargin
470+
)
471+
472+
checkAnswer(rows,
473+
(0 until 20).map(Row(_, 0)) ++
474+
(20 until 30).map(Row(_, 20)))
475+
}
476+
}
477+
}
478+
479+
test("Filter by base Row IDs") {
480+
withRowTrackingEnabled(enabled = true) {
481+
withTempDir { tempDir =>
482+
// Generate 3 files with base Row ID 0, 10 and 20 resp.
483+
spark.range(start = 0, end = 10).toDF("id").repartition(1)
484+
.write.format("delta").save(tempDir.getAbsolutePath)
485+
spark.range(start = 10, end = 20).toDF("id").repartition(1)
486+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
487+
spark.range(start = 20, end = 30).toDF("id").repartition(1)
488+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
489+
490+
val df = spark.read.format("delta").load(tempDir.getAbsolutePath)
491+
.where(col(QUALIFIED_BASE_ROW_ID_COLUMN_NAME) === 10)
492+
493+
checkAnswer(df, (10 until 20).map(Row(_)))
494+
}
495+
}
496+
}
497+
498+
test("Base Row IDs can be read in subquery") {
499+
withRowTrackingEnabled(enabled = true) {
500+
withTempDir { tempDir =>
501+
// Generate 2 files with base Row ID 0 and 20 resp.
502+
spark.range(start = 0, end = 20).toDF("id").repartition(1)
503+
.write.format("delta").save(tempDir.getAbsolutePath)
504+
spark.range(start = 20, end = 30).toDF("id").repartition(1)
505+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
506+
507+
val rows = sql(
508+
s"""
509+
|SELECT * FROM delta.`${tempDir.getAbsolutePath}`
510+
|WHERE id IN (
511+
| SELECT $QUALIFIED_BASE_ROW_ID_COLUMN_NAME
512+
| FROM delta.`${tempDir.getAbsolutePath}`)
513+
""".stripMargin)
514+
515+
checkAnswer(rows, Seq(Row(0), Row(20)))
516+
}
517+
}
518+
}
519+
520+
test("Filter by base Row IDs in subquery") {
521+
withRowTrackingEnabled(enabled = true) {
522+
withTempDir { tempDir =>
523+
// Generate 2 files with base Row ID 0 and 20 resp.
524+
spark.range(start = 0, end = 20).toDF("id").repartition(1)
525+
.write.format("delta").save(tempDir.getAbsolutePath)
526+
spark.range(start = 20, end = 30).toDF("id").repartition(1)
527+
.write.format("delta").mode("append").save(tempDir.getAbsolutePath)
528+
529+
val rows = sql(
530+
s"""
531+
|SELECT * FROM delta.`${tempDir.getAbsolutePath}`
532+
|WHERE id IN (
533+
| SELECT id
534+
| FROM delta.`${tempDir.getAbsolutePath}`
535+
| WHERE $QUALIFIED_BASE_ROW_ID_COLUMN_NAME = 20)
536+
""".stripMargin)
537+
538+
checkAnswer(rows, (20 until 30).map(Row(_)))
539+
}
540+
}
541+
}
383542
}

spark/src/test/scala/org/apache/spark/sql/delta/rowid/RowIdTestUtils.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@ import org.apache.spark.sql.delta.actions.AddFile
2121
import org.apache.spark.sql.delta.rowtracking.RowTrackingTestUtils
2222
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
2323

24+
import org.apache.spark.sql.execution.datasources.FileFormat
25+
2426
trait RowIdTestUtils extends RowTrackingTestUtils with DeltaSQLCommandTest {
27+
val QUALIFIED_BASE_ROW_ID_COLUMN_NAME = s"${FileFormat.METADATA_NAME}.${RowId.BASE_ROW_ID}"
28+
2529
protected def getRowIdRangeInclusive(f: AddFile): (Long, Long) = {
2630
val min = f.baseRowId.get
2731
val max = min + f.numPhysicalRecords.get - 1L

0 commit comments

Comments
 (0)