Skip to content

Commit a8ec23d

Browse files
Merge remote-tracking branch 'delta-io/master' into spark-4.0-upgrade-merge
2 parents e14d2cf + d4ffc42 commit a8ec23d

File tree

168 files changed

+7371
-1288
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

168 files changed

+7371
-1288
lines changed

.github/workflows/new_pull_request.yaml

Lines changed: 0 additions & 16 deletions
This file was deleted.

.github/workflows/new_updated_issue.yaml

Lines changed: 0 additions & 32 deletions
This file was deleted.

.github/workflows/spark_test.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ jobs:
1616
with:
1717
PATTERNS: |
1818
**
19+
.github/workflows/**
1920
!kernel/**
2021
!connectors/**
2122
- name: install java
@@ -56,9 +57,9 @@ jobs:
5657
pipenv run pip install cryptography==37.0.4
5758
pipenv run pip install twine==4.0.1
5859
pipenv run pip install wheel==0.33.4
59-
pipenv run pip install setuptools==41.0.1
60+
pipenv run pip install setuptools==41.1.0
6061
pipenv run pip install pydocstyle==3.0.0
61-
pipenv run pip install pandas==1.0.5
62+
pipenv run pip install pandas==1.1.3
6263
pipenv run pip install pyarrow==8.0.0
6364
pipenv run pip install numpy==1.20.3
6465
if: steps.git-diff.outputs.diff

.github/workflows/unidoc.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
name: "Unidoc generation"
2+
on: [push, pull_request]
3+
jobs:
4+
build:
5+
name: "Generate unidoc"
6+
runs-on: ubuntu-20.04
7+
strategy:
8+
matrix:
9+
# These Scala versions must match those in the build.sbt
10+
scala: [2.13.8, 2.12.17]
11+
steps:
12+
- name: install java
13+
uses: actions/setup-java@v3
14+
with:
15+
distribution: "zulu"
16+
java-version: "8"
17+
- uses: actions/checkout@v3
18+
- name: generate unidoc
19+
run: build/sbt "++ ${{ matrix.scala }}" unidoc

.github/workflows/updated_pull_request.yaml

Lines changed: 0 additions & 22 deletions
This file was deleted.

PROTOCOL.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1660,7 +1660,7 @@ The concrete format is as follows, with all numerical values written in big endi
16601660

16611661
Bytes | Name | Description
16621662
-|-|-
1663-
0 — 1 | version | The format version of this file: `1` for the format described here.
1663+
0 | version | The format version of this file: `1` for the format described here.
16641664
`repeat for each DV i` | | For each DV
16651665
`<start of i>``<start of i> + 3` | dataSize | Size of this DV’s data (without the checksum)
16661666
`<start of i> + 4``<start of i> + 4 + dataSize - 1` | bitmapData | One 64-bit RoaringBitmap serialised as described above.

build.sbt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,11 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
300300
"commons-io" % "commons-io" % "2.8.0" % "test",
301301
"com.novocode" % "junit-interface" % "0.11" % "test",
302302
"org.slf4j" % "slf4j-log4j12" % "1.7.36" % "test",
303+
// JMH dependencies allow writing micro-benchmarks for testing performance of components.
304+
// JMH has framework to define benchmarks and takes care of many common functionalities
305+
// such as warm runs, cold runs, defining benchmark parameter variables etc.
306+
"org.openjdk.jmh" % "jmh-core" % "1.37" % "test",
307+
"org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test",
303308

304309
"org.apache.spark" %% "spark-hive" % sparkVersion % "test" classifier "tests",
305310
"org.apache.spark" %% "spark-sql" % sparkVersion % "test" classifier "tests",

connectors/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ public void testDeltaSink(boolean isPartitioned, boolean triggerFailover) throws
192192
*
193193
* @param exceptionMode whether to throw an exception before or after Delta log commit.
194194
*/
195+
@Disabled(
196+
"This test is flaky, for some runs it fails with unexpected numbers of files. "
197+
+ "Investigation of if this is a connector or test issue is ongoing")
195198
@ResourceLock("StreamingFailoverDeltaGlobalCommitter")
196199
@ParameterizedTest(name = "isPartitioned = {0}, exceptionMode = {1}")
197200
@CsvSource({
@@ -335,6 +338,9 @@ public void shouldResumeSink_savepointNoDrainState() throws Exception {
335338
.hasNoDuplicateAddFiles();
336339
}
337340

341+
@Disabled(
342+
"This test is flaky, for some runs it fails with 'Seems there was a duplicated AddFile in"
343+
+ " Delta log. Investigation of if this is a connector or test issue is ongoing")
338344
@ParameterizedTest(
339345
name = "init parallelism level = {0}, parallelism level after resuming job = {1}")
340346
@CsvSource({"3, 3", "3, 6", "6, 3"})

connectors/golden-tables/src/test/scala/io/delta/golden/GoldenTables.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,7 @@ class GoldenTables extends QueryTest with SharedSparkSession {
378378

379379
val commitInfoFile = CommitInfo(
380380
version = Some(0L),
381+
inCommitTimestamp = None,
381382
timestamp = new Timestamp(1540415658000L),
382383
userId = Some("user_0"),
383384
userName = Some("username_0"),

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala

Lines changed: 59 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.{DeltaColumnMapping, SerializableFileStatus}
2222
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2323
import org.apache.spark.sql.delta.util.{DateFormatter, TimestampFormatter}
2424
import org.apache.hadoop.fs.Path
25-
import org.apache.iceberg.{PartitionData, RowLevelOperationMode, Table, TableProperties}
25+
import org.apache.iceberg.{PartitionData, RowLevelOperationMode, StructLike, Table, TableProperties}
2626
import org.apache.iceberg.transforms.IcebergPartitionUtil
2727

2828
import org.apache.spark.internal.Logging
@@ -48,6 +48,31 @@ class IcebergFileManifest(
4848

4949
val basePath = table.location()
5050

51+
val icebergSchema = table.schema()
52+
53+
// we must use field id to look up the partition value; consider scenario with iceberg
54+
// behavior chance since 1.4.0:
55+
// 1) create table with partition schema (a[col_name]: 1[field_id]), add file1;
56+
// The partition data for file1 is (a:1:some_part_value)
57+
// 2) add new partition col b and the partition schema becomes (a: 1, b: 2), add file2;
58+
// the partition data for file2 is (a:1:some_part_value, b:2:some_part_value)
59+
// 3) remove partition col a, then add file3;
60+
// for iceberg < 1.4.0: the partFields is (a:1(void), b:2); the partition data for
61+
// file3 is (a:1(void):null, b:2:some_part_value);
62+
// for iceberg 1.4.0: the partFields is (b:2); When it reads file1 (a:1:some_part_value),
63+
// it must use the field_id instead of index to look up the partition
64+
// value, as the partField and partitionData from file1 have different
65+
// ordering and thus same index indicates different column.
66+
val physicalNameToField = table.spec().fields().asScala.collect {
67+
case field if field.transform().toString != VOID_TRANSFORM =>
68+
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
69+
}.toMap
70+
71+
val dateFormatter = DateFormatter()
72+
73+
val timestampFormatter =
74+
TimestampFormatter(ConvertUtils.timestampPartitionPattern, java.util.TimeZone.getDefault)
75+
5176
override def numFiles: Long = {
5277
if (_numFiles.isEmpty) getFileSparkResults()
5378
_numFiles.get
@@ -77,30 +102,6 @@ class IcebergFileManifest(
77102
val schemaBatchSize =
78103
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_IMPORT_BATCH_SIZE_SCHEMA_INFERENCE)
79104

80-
val partFields = table.spec().fields().asScala
81-
val icebergSchema = table.schema()
82-
// we must use field id to look up the partition value; consider scenario with iceberg
83-
// behavior chance since 1.4.0:
84-
// 1) create table with partition schema (a[col_name]: 1[field_id]), add file1;
85-
// The partition data for file1 is (a:1:some_part_value)
86-
// 2) add new partition col b and the partition schema becomes (a: 1, b: 2), add file2;
87-
// the partition data for file2 is (a:1:some_part_value, b:2:some_part_value)
88-
// 3) remove partition col a, then add file3;
89-
// for iceberg < 1.4.0: the partFields is (a:1(void), b:2); the partition data for
90-
// file3 is (a:1(void):null, b:2:some_part_value);
91-
// for iceberg 1.4.0: the partFields is (b:2); When it reads file1 (a:1:some_part_value),
92-
// it must use the field_id instead of index to look up the partition
93-
// value, as the partField and partitionData from file1 have different
94-
// ordering and thus same index indicates different column.
95-
val physicalNameToField = partFields.collect {
96-
case field if field.transform().toString != VOID_TRANSFORM =>
97-
DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> field
98-
}.toMap
99-
100-
val dateFormatter = DateFormatter()
101-
val timestampFormatter = TimestampFormatter(ConvertUtils.timestampPartitionPattern,
102-
java.util.TimeZone.getDefault)
103-
104105
// This flag is strongly not recommended to turn on, but we still provide a flag for regression
105106
// purpose.
106107
val unsafeConvertMorTable =
@@ -131,23 +132,9 @@ class IcebergFileManifest(
131132
s"Please trigger an Iceberg compaction and retry the command.")
132133
}
133134
val partitionValues = if (spark.sessionState.conf.getConf(
134-
DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) {
135-
136-
val icebergPartition = fileScanTask.file().partition()
137-
val icebergPartitionData = icebergPartition.asInstanceOf[PartitionData]
138-
val fieldIdToIdx = icebergPartitionData.getPartitionType.fields().asScala.zipWithIndex
139-
.map(kv => kv._1.fieldId() -> kv._2).toMap
140-
val physicalNameToPartValueMap = physicalNameToField
141-
.map { case (physicalName, field) =>
142-
val fieldIndex = fieldIdToIdx.get(field.fieldId())
143-
val partValueAsString = fieldIndex.map {idx =>
144-
val partValue = icebergPartitionData.get(idx)
145-
IcebergPartitionUtil.partitionValueToString(
146-
field, partValue, icebergSchema, dateFormatter, timestampFormatter)
147-
}.getOrElse(null)
148-
physicalName -> partValueAsString
149-
}
150-
Some(physicalNameToPartValueMap)
135+
DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) {
136+
Some(convertIcebergPartitionToPartitionValues(
137+
fileScanTask.file().partition()))
151138
} else None
152139
(filePath, partitionValues)
153140
}
@@ -171,4 +158,34 @@ class IcebergFileManifest(
171158
}
172159

173160
override def close(): Unit = fileSparkResults.map(_.unpersist())
161+
162+
def convertIcebergPartitionToPartitionValues(partition: StructLike):
163+
Map[String, String] = {
164+
val icebergPartitionData = partition.asInstanceOf[PartitionData]
165+
val fieldIdToIdx = icebergPartitionData.getPartitionType
166+
.fields()
167+
.asScala
168+
.zipWithIndex
169+
.map(kv => kv._1.fieldId() -> kv._2)
170+
.toMap
171+
val physicalNameToPartValueMap = physicalNameToField
172+
.map {
173+
case (physicalName, field) =>
174+
val fieldIndex = fieldIdToIdx.get(field.fieldId())
175+
val partValueAsString = fieldIndex
176+
.map { idx =>
177+
val partValue = icebergPartitionData.get(idx)
178+
IcebergPartitionUtil.partitionValueToString(
179+
field,
180+
partValue,
181+
icebergSchema,
182+
dateFormatter,
183+
timestampFormatter
184+
)
185+
}
186+
.getOrElse(null)
187+
physicalName -> partValueAsString
188+
}
189+
physicalNameToPartValueMap
190+
}
174191
}

0 commit comments

Comments
 (0)