|
14 | 14 | # KIND, either express or implied. See the License for the |
15 | 15 | # specific language governing permissions and limitations |
16 | 16 | # under the License. |
17 | | -import math |
18 | 17 |
|
19 | 18 | from pyspark.sql import SparkSession |
20 | 19 | from pyspark.sql.functions import current_date, date_add, expr |
|
23 | 22 | from pyiceberg.schema import Schema |
24 | 23 | from pyiceberg.types import FixedType, NestedField, UUIDType |
25 | 24 |
|
26 | | -# The configuration is important, otherwise we get many small |
27 | | -# parquet files with a single row. When a positional delete |
28 | | -# hits the Parquet file with one row, the parquet file gets |
29 | | -# dropped instead of having a merge-on-read delete file. |
30 | | -spark = ( |
31 | | - SparkSession |
32 | | - .builder |
33 | | - .config("spark.sql.shuffle.partitions", "1") |
34 | | - .config("spark.default.parallelism", "1") |
35 | | - .getOrCreate() |
36 | | -) |
| 25 | +# Create SparkSession against the remote Spark Connect server |
| 26 | +spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate() |
37 | 27 |
|
38 | 28 | catalogs = { |
39 | | - 'rest': load_catalog( |
| 29 | + "rest": load_catalog( |
40 | 30 | "rest", |
41 | 31 | **{ |
42 | 32 | "type": "rest", |
43 | | - "uri": "http://rest:8181", |
44 | | - "s3.endpoint": "http://minio:9000", |
| 33 | + "uri": "http://localhost:8181", |
| 34 | + "s3.endpoint": "http://localhost:9000", |
45 | 35 | "s3.access-key-id": "admin", |
46 | 36 | "s3.secret-access-key": "password", |
47 | 37 | }, |
48 | 38 | ), |
49 | | - 'hive': load_catalog( |
| 39 | + "hive": load_catalog( |
50 | 40 | "hive", |
51 | 41 | **{ |
52 | 42 | "type": "hive", |
53 | | - "uri": "thrift://hive:9083", |
54 | | - "s3.endpoint": "http://minio:9000", |
| 43 | + "uri": "thrift://localhost:9083", |
| 44 | + "s3.endpoint": "http://localhost:9000", |
55 | 45 | "s3.access-key-id": "admin", |
56 | 46 | "s3.secret-access-key": "password", |
57 | 47 | }, |
|
119 | 109 | # v3: Using deletion vectors |
120 | 110 |
|
121 | 111 | for format_version in [2, 3]: |
122 | | - identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}' |
| 112 | + identifier = f"{catalog_name}.default.test_positional_mor_deletes_v{format_version}" |
123 | 113 | spark.sql( |
124 | 114 | f""" |
125 | 115 | CREATE OR REPLACE TABLE {identifier} ( |
|
137 | 127 | """ |
138 | 128 | ) |
139 | 129 |
|
140 | | - spark.sql( |
141 | | - f""" |
142 | | - INSERT INTO {identifier} |
143 | | - VALUES |
| 130 | + spark.sql(""" |
| 131 | + SELECT * FROM VALUES |
144 | 132 | (CAST('2023-03-01' AS date), 1, 'a'), |
145 | 133 | (CAST('2023-03-02' AS date), 2, 'b'), |
146 | 134 | (CAST('2023-03-03' AS date), 3, 'c'), |
|
152 | 140 | (CAST('2023-03-09' AS date), 9, 'i'), |
153 | 141 | (CAST('2023-03-10' AS date), 10, 'j'), |
154 | 142 | (CAST('2023-03-11' AS date), 11, 'k'), |
155 | | - (CAST('2023-03-12' AS date), 12, 'l'); |
156 | | - """ |
157 | | - ) |
| 143 | + (CAST('2023-03-12' AS date), 12, 'l') |
| 144 | + AS t(dt, number, letter) |
| 145 | + """).coalesce(1).writeTo(identifier).append() |
158 | 146 |
|
159 | 147 | spark.sql(f"ALTER TABLE {identifier} CREATE TAG tag_12") |
160 | 148 |
|
|
164 | 152 |
|
165 | 153 | spark.sql(f"DELETE FROM {identifier} WHERE number = 9") |
166 | 154 |
|
167 | | - identifier = f'{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}' |
| 155 | + identifier = f"{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}" |
168 | 156 |
|
169 | 157 | spark.sql( |
170 | 158 | f""" |
|
178 | 166 | 'write.delete.mode'='merge-on-read', |
179 | 167 | 'write.update.mode'='merge-on-read', |
180 | 168 | 'write.merge.mode'='merge-on-read', |
181 | | - 'format-version'='2' |
| 169 | + 'format-version'='{format_version}' |
182 | 170 | ); |
183 | 171 | """ |
184 | 172 | ) |
185 | 173 |
|
186 | | - spark.sql( |
187 | | - f""" |
188 | | - INSERT INTO {identifier} |
189 | | - VALUES |
| 174 | + spark.sql(""" |
| 175 | + SELECT * FROM VALUES |
190 | 176 | (CAST('2023-03-01' AS date), 1, 'a'), |
191 | 177 | (CAST('2023-03-02' AS date), 2, 'b'), |
192 | 178 | (CAST('2023-03-03' AS date), 3, 'c'), |
|
198 | 184 | (CAST('2023-03-09' AS date), 9, 'i'), |
199 | 185 | (CAST('2023-03-10' AS date), 10, 'j'), |
200 | 186 | (CAST('2023-03-11' AS date), 11, 'k'), |
201 | | - (CAST('2023-03-12' AS date), 12, 'l'); |
202 | | - """ |
203 | | - ) |
| 187 | + (CAST('2023-03-12' AS date), 12, 'l') |
| 188 | + AS t(dt, number, letter) |
| 189 | + """).coalesce(1).writeTo(identifier).append() |
204 | 190 |
|
205 | 191 | # Perform two deletes, should produce: |
206 | 192 | # v2: two positional delete files in v2 |
|
0 commit comments