Skip to content

Commit 842a4bf

Browse files
[Build] Run the Spark master PYTHON tests using the Spark 4.0 RC4 (delta-io#4513)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Currently we don't run our python tests for our Spark master build since there is no nightly pyspark snapshot, this PR runs those python tests using the latest RC for the Spark 4.0 release. When future RCs are released (or the release published) we will use those instead. This unblocks the Delta Connect python client development in master, which previously was only merged to the `branch-4.0-preview1` branch since we could not run the python tests in master. delta-io#4514 is based off of this one and adds the python delta connect client. ## How was this patch tested? CI tests.
1 parent 8c581cd commit 842a4bf

File tree

12 files changed

+162
-38
lines changed

12 files changed

+162
-38
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
name: "Delta Spark Master Python"
2+
on: [push, pull_request]
3+
jobs:
4+
test:
5+
name: "DSMP"
6+
runs-on: ubuntu-24.04
7+
strategy:
8+
matrix:
9+
# These Scala versions must match those in the build.sbt
10+
scala: [2.13.13]
11+
env:
12+
SCALA_VERSION: ${{ matrix.scala }}
13+
steps:
14+
- uses: actions/checkout@v3
15+
- uses: technote-space/get-diff-action@v4
16+
id: git-diff
17+
with:
18+
PATTERNS: |
19+
**
20+
.github/workflows/**
21+
!kernel/**
22+
!connectors/**
23+
- name: install java
24+
uses: actions/setup-java@v3
25+
with:
26+
distribution: "zulu"
27+
java-version: "17"
28+
- name: Cache Scala, SBT
29+
uses: actions/cache@v3
30+
with:
31+
path: |
32+
~/.sbt
33+
~/.ivy2
34+
~/.cache/coursier
35+
!~/.cache/coursier/v1/https/repository.apache.org/content/groups/snapshots
36+
# Change the key if dependencies are changed. For each key, GitHub Actions will cache the
37+
# the above directories when we use the key for the first time. After that, each run will
38+
# just use the cache. The cache is immutable so we need to use a new key when trying to
39+
# cache new stuff.
40+
key: delta-sbt-cache-spark-master-scala${{ matrix.scala }}
41+
- name: Install Job dependencies
42+
# TODO: update pyspark installation once Spark preview is formally released
43+
run: |
44+
sudo apt-get update
45+
sudo apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python3-openssl git
46+
sudo apt install libedit-dev
47+
curl -LO https://github.com/bufbuild/buf/releases/download/v1.28.1/buf-Linux-x86_64.tar.gz
48+
mkdir -p ~/buf
49+
tar -xvzf buf-Linux-x86_64.tar.gz -C ~/buf --strip-components 1
50+
rm buf-Linux-x86_64.tar.gz
51+
sudo apt install python3-pip --fix-missing
52+
sudo pip3 install pipenv==2024.4.1
53+
curl https://pyenv.run | bash
54+
export PATH="~/.pyenv/bin:$PATH"
55+
eval "$(pyenv init -)"
56+
eval "$(pyenv virtualenv-init -)"
57+
pyenv install 3.9
58+
pyenv global system 3.9
59+
pipenv --python 3.9 install
60+
# Update the pip version to 24.0. By default `pyenv.run` installs the latest pip version
61+
# available. From version 24.1, `pip` doesn't allow installing python packages
62+
# with version string containing `-`. In Delta-Spark case, the pypi package generated has
63+
# `-SNAPSHOT` in version (e.g. `3.3.0-SNAPSHOT`) as the version is picked up from
64+
# the`version.sbt` file.
65+
pipenv run pip install pip==24.0 setuptools==69.5.1 wheel==0.43.0
66+
pipenv run pip install flake8==3.9.0
67+
pipenv run pip install black==23.12.1
68+
pipenv run pip install mypy==1.8.0
69+
pipenv run pip install mypy-protobuf==3.3.0
70+
pipenv run pip install cryptography==37.0.4
71+
pipenv run pip install twine==4.0.1
72+
pipenv run pip install wheel==0.33.4
73+
pipenv run pip install setuptools==41.1.0
74+
pipenv run pip install pydocstyle==3.0.0
75+
pipenv run pip install pandas==2.2.0
76+
pipenv run pip install pyarrow==11.0.0
77+
pipenv run pip install numpy==1.21
78+
pipenv run pip install https://dist.apache.org/repos/dist/dev/spark/v4.0.0-rc4-bin/pyspark-4.0.0.tar.gz
79+
if: steps.git-diff.outputs.diff
80+
- name: Run Python tests
81+
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_master_test.yaml
82+
run: |
83+
# We use the SBT version to choose our dependencies in our python packaging in setup.py
84+
echo 'ThisBuild / version := "4.0.0-SNAPSHOT"' > version.sbt
85+
TEST_PARALLELISM_COUNT=4 USE_SPARK_MASTER=true pipenv run python run-tests.py --group spark-python
86+
if: steps.git-diff.outputs.diff

.github/workflows/spark_python_test.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ jobs:
6565
pipenv run pip install flake8==3.5.0 pypandoc==1.3.3
6666
pipenv run pip install black==23.9.1
6767
pipenv run pip install importlib_metadata==3.10.0
68-
pipenv run pip install mypy==0.982
68+
# The mypy versions 0.982 and 1.8.0 have conflicting rules (cannot get style checks to
69+
# pass for both versions on the same file) so we upgrade this to match Spark 4.0
70+
pipenv run pip install mypy==1.8.0
6971
pipenv run pip install mypy-protobuf==3.3.0
7072
pipenv run pip install cryptography==37.0.4
7173
pipenv run pip install twine==4.0.1

dev/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
[tool.black]
1818
# When changing the version, we have to update
1919
# GitHub workflow version
20-
required-version = "23.9.1"
20+
required-version = "23.12.1"
2121
line-length = 100
2222
target-version = ['py38']
2323
include = '\.pyi?$'

dev/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Linter
2-
mypy==0.982
2+
mypy==1.8.0
33
flake8==3.9.0
44

55
# Code Formatter

examples/python/table_exists.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def exists(spark, filepath):
2323
try:
2424
spark.read.load(path=filepath, format="delta")
2525
except AnalysisException as exception:
26-
if "is not a Delta table" in exception.desc or "Path does not exist" in exception.desc:
26+
if "is not a Delta table" in exception.getMessage() or "Path does not exist" in exception.getMessage():
2727
return False
2828
raise exception
2929
return True

project/Mima.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ object Mima {
4444
def getPrevSparkName(currentVersion: String): String = {
4545
val (major, minor, patch) = getMajorMinorPatch(currentVersion)
4646
// name change in version 3.0.0, so versions > 3.0.0 should have delta-spark are prev version.
47-
if (major >= 3 && (minor > 0 || patch > 0)) "delta-spark" else "delta-core"
47+
if (major < 3 || (major == 3 && minor == 0 && patch == 0)) {
48+
"delta-core"
49+
} else {
50+
"delta-spark"
51+
}
4852
}
4953

5054
def getPrevSparkVersion(currentVersion: String): String = {
@@ -53,9 +57,10 @@ object Mima {
5357
val lastVersionInMajorVersion = Map(
5458
0 -> "0.8.0",
5559
1 -> "1.2.1",
56-
2 -> "2.4.0"
60+
2 -> "2.4.0",
61+
3 -> "3.3.1"
5762
)
58-
if (minor == 0) { // 1.0.0 or 2.0.0 or 3.0.0
63+
if (minor == 0) { // 1.0.0 or 2.0.0 or 3.0.0 or 4.0.0
5964
lastVersionInMajorVersion.getOrElse(major - 1, {
6065
throw new Exception(s"Last version of ${major - 1}.x.x not configured.")
6166
})
@@ -73,7 +78,8 @@ object Mima {
7378
// We skip from 0.6.0 to 3.0.0 when migrating connectors to the main delta repo
7479
0 -> "0.6.0",
7580
1 -> "0.6.0",
76-
2 -> "0.6.0"
81+
2 -> "0.6.0",
82+
3 -> "3.3.1"
7783
)
7884
if (minor == 0) { // 1.0.0
7985
majorToLastMinorVersions.getOrElse(major - 1, {

python/delta/pip_utils.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ def configure_spark_with_delta_pip(
7474
'''
7575
raise Exception(msg) from e
7676

77-
scala_version = "2.12"
77+
if int(delta_version.split(".")[0]) >= 4:
78+
# For Delta 4.0+ (thus Spark 4.0+) Scala 2.12 is not supported
79+
scala_version = "2.13"
80+
else:
81+
scala_version = "2.12"
7882
maven_artifact = f"io.delta:delta-spark_{scala_version}:{delta_version}"
7983

8084
extra_packages = extra_packages if extra_packages is not None else []

python/delta/tests/test_deltatable.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
# limitations under the License.
1515
#
1616

17-
# mypy: disable-error-code="union-attr"
18-
# mypy: disable-error-code="attr-defined"
19-
# type: ignore[union-attr]
17+
# mypy: disable-error-code="union-attr, attr-defined"
2018

2119
import unittest
2220
import os
@@ -159,7 +157,7 @@ def reset_table() -> None:
159157
.whenNotMatchedBySourceUpdate(set={"value": "value + 0"}) \
160158
.execute()
161159
self.__checkAnswer(merge_output,
162-
([Row(6, # affected rows
160+
([Row(6, # type: ignore[call-overload]
163161
4, # updated rows (a and b in WHEN MATCHED
164162
# and c and d in WHEN NOT MATCHED BY SOURCE)
165163
0, # deleted rows
@@ -502,7 +500,7 @@ def test_merge_with_inconsistent_sessions(self) -> None:
502500
target_path = os.path.join(self.tempFile, "target")
503501
spark = self.spark
504502

505-
def f(spark):
503+
def f(spark): # type: ignore[no-untyped-def]
506504
spark.range(20) \
507505
.withColumn("x", col("id")) \
508506
.withColumn("y", col("id")) \
@@ -542,7 +540,7 @@ def test_history(self) -> None:
542540
[Row("Overwrite")],
543541
StructType([StructField("operationParameters.mode", StringType(), True)]))
544542

545-
def test_cdc(self):
543+
def test_cdc(self) -> None:
546544
self.spark.range(0, 5).write.format("delta").save(self.tempFile)
547545
deltaTable = DeltaTable.forPath(self.spark, self.tempFile)
548546
# Enable Change Data Feed
@@ -976,7 +974,7 @@ def test_verify_paritionedBy_compatibility(self) -> None:
976974
from pyspark.sql.column import _to_seq # type: ignore[attr-defined]
977975
except ImportError:
978976
# Spark 4
979-
from pyspark.sql.classic.column import _to_seq # type: ignore[attr-defined]
977+
from pyspark.sql.classic.column import _to_seq # type: ignore
980978

981979
with self.table("testTable"):
982980
tableBuilder = DeltaTable.create(self.spark).tableName("testTable") \
@@ -1102,8 +1100,8 @@ def test_delta_table_builder_with_bad_args(self) -> None:
11021100
builder.addColumn(
11031101
"a",
11041102
"bigint",
1105-
generatedByDefaultAs=""
1106-
) # type: ignore[arg-type]
1103+
generatedByDefaultAs="" # type: ignore[arg-type]
1104+
)
11071105

11081106
# bad generatedByDefaultAs - identity column data type must be Long
11091107
with self.assertRaises(UnsupportedOperationException):

python/delta/tests/test_sql.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,12 @@ def test_convert(self) -> None:
9999
def test_ddls(self) -> None:
100100
table = "deltaTable"
101101
table2 = "deltaTable2"
102-
with self.table(table, table2):
102+
with self.table(table, table + "_part", table2):
103103
def read_table() -> DataFrame:
104104
return self.spark.sql(f"SELECT * FROM {table}")
105105

106106
self.spark.sql(f"DROP TABLE IF EXISTS {table}")
107+
self.spark.sql(f"DROP TABLE IF EXISTS {table}_part")
107108
self.spark.sql(f"DROP TABLE IF EXISTS {table2}")
108109

109110
self.spark.sql(f"CREATE TABLE {table}(a LONG, b String NOT NULL) USING delta")

python/run-tests.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@
2222
from os import path
2323

2424

25-
def test(root_dir, package):
26-
# Run all of the test under test/python directory, each of them
27-
# has main entry point to execute, which is python's unittest testing
25+
def test(root_dir, code_dir, packages):
26+
# Test the codes in the code_dir directory using its "tests" subdirectory,
27+
# each of them has main entry point to execute, which is python's unittest testing
2828
# framework.
2929
python_root_dir = path.join(root_dir, "python")
30-
test_dir = path.join(python_root_dir, path.join("delta", "tests"))
30+
test_dir = path.join(python_root_dir, path.join(code_dir, "tests"))
3131
test_files = [os.path.join(test_dir, f) for f in os.listdir(test_dir)
3232
if os.path.isfile(os.path.join(test_dir, f)) and
3333
f.endswith(".py") and not f.startswith("_")]
34-
extra_class_path = path.join(python_root_dir, path.join("delta", "testing"))
34+
extra_class_path = path.join(python_root_dir, path.join(code_dir, "testing"))
3535

3636
for test_file in test_files:
3737
try:
@@ -40,7 +40,7 @@ def test(root_dir, package):
4040
"--repositories",
4141
("https://maven-central.storage-download.googleapis.com/maven2/,"
4242
"https://repo1.maven.org/maven2/"),
43-
"--packages", package, test_file]
43+
"--packages", ",".join(packages), test_file]
4444
print("Running tests in %s\n=============" % test_file)
4545
print("Command: %s" % str(cmd))
4646
run_cmd(cmd, stream_output=True)
@@ -56,20 +56,30 @@ def delete_if_exists(path):
5656
print("Deleted %s " % path)
5757

5858

59-
def prepare(root_dir):
59+
def prepare(root_dir, use_spark_master):
6060
print("##### Preparing python tests & building packages #####")
6161
# Build package with python files in it
6262
sbt_path = path.join(root_dir, path.join("build", "sbt"))
6363
delete_if_exists(os.path.expanduser("~/.ivy2/cache/io.delta"))
6464
delete_if_exists(os.path.expanduser("~/.m2/repository/io/delta/"))
65-
run_cmd([sbt_path, "clean", "sparkGroup/publishM2"], stream_output=True)
65+
sbt_command = [sbt_path]
66+
packages = ["spark/publishM2", "storage/publishM2"]
67+
if use_spark_master:
68+
sbt_command = sbt_command + ["-DsparkVersion=master"]
69+
packages = packages + ["connectCommon/publishM2", "connectServer/publishM2"]
70+
run_cmd(sbt_command + ["clean"] + packages, stream_output=True)
6671

72+
73+
def get_local_package(package_name, use_spark_master):
6774
# Get current release which is required to be loaded
6875
version = '0.0.0'
6976
with open(os.path.join(root_dir, "version.sbt")) as fd:
7077
version = fd.readline().split('"')[1]
71-
package = "io.delta:delta-spark_2.12:" + version
72-
return package
78+
79+
if use_spark_master:
80+
return f"io.delta:{package_name}_2.13:" + version
81+
else:
82+
return f"io.delta:{package_name}_2.12:" + version
7383

7484

7585
def run_cmd(cmd, throw_on_error=True, env=None, stream_output=False, print_cmd=True, **kwargs):
@@ -179,10 +189,15 @@ def run_delta_connect_codegen_python(root_dir):
179189
if __name__ == "__main__":
180190
print("##### Running python tests #####")
181191
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
182-
package = prepare(root_dir)
192+
use_spark_master = os.getenv("USE_SPARK_MASTER") or False
193+
prepare(root_dir, use_spark_master)
194+
delta_spark_package = get_local_package("delta-spark", use_spark_master)
183195

184196
run_python_style_checks(root_dir)
185197
run_mypy_tests(root_dir)
186198
run_pypi_packaging_tests(root_dir)
187-
run_delta_connect_codegen_python(root_dir)
188-
test(root_dir, package)
199+
test(root_dir, "delta", [delta_spark_package])
200+
201+
# For versions 4.0+ run Delta Connect tests as well
202+
if use_spark_master:
203+
run_delta_connect_codegen_python(root_dir)

0 commit comments

Comments
 (0)