Skip to content

Commit 45f66da

Browse files
Fokkokevinjqliu
andauthored
Add test for migrated tables (#2290)
Identified two issues that can be worked on in parallel <!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change # Are these changes tested? # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Kevin Liu <[email protected]>
1 parent a32c3dc commit 45f66da

File tree

2 files changed

+98
-1
lines changed

2 files changed

+98
-1
lines changed

tests/conftest.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2502,9 +2502,13 @@ def spark() -> "SparkSession":
25022502
spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2])
25032503
scala_version = "2.12"
25042504
iceberg_version = "1.9.2"
2505+
hadoop_version = "3.3.4"
2506+
aws_sdk_version = "1.12.753"
25052507

25062508
os.environ["PYSPARK_SUBMIT_ARGS"] = (
25072509
f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version},"
2510+
f"org.apache.hadoop:hadoop-aws:{hadoop_version},"
2511+
f"com.amazonaws:aws-java-sdk-bundle:{aws_sdk_version},"
25082512
f"org.apache.iceberg:iceberg-aws-bundle:{iceberg_version} pyspark-shell"
25092513
)
25102514
os.environ["AWS_REGION"] = "us-east-1"
@@ -2526,14 +2530,21 @@ def spark() -> "SparkSession":
25262530
.config("spark.sql.catalog.integration.warehouse", "s3://warehouse/wh/")
25272531
.config("spark.sql.catalog.integration.s3.endpoint", "http://localhost:9000")
25282532
.config("spark.sql.catalog.integration.s3.path-style-access", "true")
2529-
.config("spark.sql.defaultCatalog", "integration")
25302533
.config("spark.sql.catalog.hive", "org.apache.iceberg.spark.SparkCatalog")
25312534
.config("spark.sql.catalog.hive.type", "hive")
25322535
.config("spark.sql.catalog.hive.uri", "http://localhost:9083")
25332536
.config("spark.sql.catalog.hive.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
25342537
.config("spark.sql.catalog.hive.warehouse", "s3://warehouse/hive/")
25352538
.config("spark.sql.catalog.hive.s3.endpoint", "http://localhost:9000")
25362539
.config("spark.sql.catalog.hive.s3.path-style-access", "true")
2540+
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
2541+
.config("spark.sql.catalog.spark_catalog.type", "hive")
2542+
.config("spark.sql.catalog.spark_catalog.uri", "http://localhost:9083")
2543+
.config("spark.sql.catalog.spark_catalog.warehouse", "s3://warehouse/hive/")
2544+
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
2545+
.config("spark.hadoop.fs.s3a.path.style.access", "true")
2546+
.config("spark.sql.catalogImplementation", "hive")
2547+
.config("spark.sql.defaultCatalog", "integration")
25372548
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
25382549
.getOrCreate()
25392550
)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import time
18+
19+
import pytest
20+
from pyspark.sql import SparkSession
21+
22+
from pyiceberg.catalog import Catalog
23+
24+
25+
@pytest.mark.integration
26+
def test_migrate_table(
27+
session_catalog_hive: Catalog,
28+
spark: SparkSession,
29+
) -> None:
30+
"""
31+
Imported tables are an edge case since the partition column is not stored
32+
in the Parquet files:
33+
34+
test_migrate_table_hive_1754486926/dt=2022-01-01/part-00000-30a9798b-7597-4027-86d9-79d7c529bc87.c000.snappy.parquet
35+
{
36+
"type" : "record",
37+
"name" : "spark_schema",
38+
"fields" : [ {
39+
"name" : "number",
40+
"type" : "int"
41+
} ]
42+
}
43+
44+
PyIceberg will project this column when the table is being read
45+
"""
46+
# Create new tables to avoid complex cleanup
47+
src_table_identifier = f"spark_catalog.default.test_migrate_table_hive_{int(time.time())}"
48+
dst_table_identifier = f"default.test_migrate_table_{int(time.time())}"
49+
50+
spark.sql(f"""
51+
CREATE TABLE {src_table_identifier} (
52+
number INTEGER
53+
)
54+
PARTITIONED BY (dt date)
55+
STORED AS parquet
56+
""")
57+
58+
spark.sql(f"""
59+
INSERT OVERWRITE TABLE {src_table_identifier}
60+
PARTITION (dt='2022-01-01')
61+
VALUES (1), (2), (3)
62+
""")
63+
64+
spark.sql(f"""
65+
INSERT OVERWRITE TABLE {src_table_identifier}
66+
PARTITION (dt='2023-01-01')
67+
VALUES (4), (5), (6)
68+
""")
69+
70+
# Docs: https://iceberg.apache.org/docs/latest/hive-migration/#snapshot-hive-table-to-iceberg
71+
spark.sql(f"""
72+
CALL hive.system.snapshot('{src_table_identifier}', 'hive.{dst_table_identifier}')
73+
""")
74+
75+
tbl = session_catalog_hive.load_table(dst_table_identifier)
76+
assert tbl.schema().column_names == ["number", "dt"]
77+
78+
# TODO: Returns the primitive type (int), rather than the logical type
79+
# assert set(tbl.scan().to_arrow().column(1).combine_chunks().tolist()) == {'2022-01-01', '2023-01-01'}
80+
81+
assert tbl.scan(row_filter="number > 3").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6]
82+
83+
assert tbl.scan(row_filter="dt == '2023-01-01'").to_arrow().column(0).combine_chunks().tolist() == [4, 5, 6]
84+
85+
# TODO: Issue with filtering the projected column
86+
# assert tbl.scan(row_filter="dt == '2022-01-01'").to_arrow().column(0).combine_chunks().tolist() == [1, 2, 3]

0 commit comments

Comments
 (0)