|
47 | 47 | LongType, |
48 | 48 | NestedField, |
49 | 49 | StringType, |
| 50 | + StructType, |
50 | 51 | TimestampType, |
51 | 52 | TimestamptzType, |
52 | 53 | ) |
@@ -216,23 +217,150 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found( |
216 | 217 |
|
217 | 218 |
|
218 | 219 | @pytest.mark.integration |
219 | | -def test_add_files_to_unpartitioned_table_raises_has_field_ids( |
| 220 | +def test_add_files_to_unpartitioned_table_with_field_ids( |
220 | 221 | spark: SparkSession, session_catalog: Catalog, format_version: int |
221 | 222 | ) -> None: |
222 | | - identifier = f"default.unpartitioned_raises_field_ids_v{format_version}" |
| 223 | + identifier = f"default.unpartitioned_with_field_ids_v{format_version}" |
223 | 224 | tbl = _create_table(session_catalog, identifier, format_version) |
224 | 225 |
|
225 | | - file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] |
226 | | - # write parquet files |
| 226 | + file_paths = [f"s3://warehouse/default/unpartitioned_with_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] |
| 227 | + # write parquet files with field IDs matching the table schema |
227 | 228 | for file_path in file_paths: |
228 | 229 | fo = tbl.io.new_output(file_path) |
229 | 230 | with fo.create(overwrite=True) as fos: |
230 | 231 | with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer: |
231 | 232 | writer.write_table(ARROW_TABLE_WITH_IDS) |
232 | 233 |
|
233 | 234 | # add the parquet files as data files |
234 | | - with pytest.raises(NotImplementedError): |
235 | | - tbl.add_files(file_paths=file_paths) |
| 235 | + tbl.add_files(file_paths=file_paths) |
| 236 | + |
| 237 | + # NameMapping should still be set even though files have field IDs |
| 238 | + assert tbl.name_mapping() is not None |
| 239 | + |
| 240 | + # Verify files were added successfully |
| 241 | + rows = spark.sql( |
| 242 | + f""" |
| 243 | + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count |
| 244 | + FROM {identifier}.all_manifests |
| 245 | + """ |
| 246 | + ).collect() |
| 247 | + |
| 248 | + assert [row.added_data_files_count for row in rows] == [5] |
| 249 | + assert [row.existing_data_files_count for row in rows] == [0] |
| 250 | + assert [row.deleted_data_files_count for row in rows] == [0] |
| 251 | + |
| 252 | + # Verify data can be read back correctly |
| 253 | + df = spark.table(identifier).toPandas() |
| 254 | + assert len(df) == 5 |
| 255 | + assert all(df["foo"] == True) # noqa: E712 |
| 256 | + assert all(df["bar"] == "bar_string") |
| 257 | + assert all(df["baz"] == 123) |
| 258 | + assert all(df["qux"] == date(2024, 3, 7)) |
| 259 | + |
| 260 | + |
| 261 | +@pytest.mark.integration |
| 262 | +def test_add_files_with_mismatched_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| 263 | + identifier = f"default.unpartitioned_mismatched_field_ids_v{format_version}" |
| 264 | + tbl = _create_table(session_catalog, identifier, format_version) |
| 265 | + |
| 266 | + # Create schema with field IDs that don't match the table schema |
| 267 | + # Table has: 1=foo, 2=bar, 3=baz, 4=qux (assigned by catalog) |
| 268 | + # This file has: 1=foo, 2=bar, 5=baz, 6=qux (wrong IDs for baz and qux) |
| 269 | + mismatched_schema = pa.schema( |
| 270 | + [ |
| 271 | + pa.field("foo", pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), |
| 272 | + pa.field("bar", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), |
| 273 | + pa.field("baz", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "5"}), # Wrong: should be 3 |
| 274 | + pa.field("qux", pa.date32(), nullable=False, metadata={"PARQUET:field_id": "6"}), # Wrong: should be 4 |
| 275 | + ] |
| 276 | + ) |
| 277 | + |
| 278 | + file_path = f"s3://warehouse/default/unpartitioned_mismatched_field_ids/v{format_version}/test.parquet" |
| 279 | + fo = tbl.io.new_output(file_path) |
| 280 | + with fo.create(overwrite=True) as fos: |
| 281 | + with pq.ParquetWriter(fos, schema=mismatched_schema) as writer: |
| 282 | + writer.write_table(ARROW_TABLE_WITH_IDS) |
| 283 | + |
| 284 | + # Adding files with mismatched field IDs should fail |
| 285 | + with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): |
| 286 | + tbl.add_files(file_paths=[file_path]) |
| 287 | + |
| 288 | + |
| 289 | +@pytest.mark.integration |
| 290 | +def test_add_files_with_mismatched_nested_field_ids(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: |
| 291 | + """Test that files with mismatched nested (struct) field IDs are rejected.""" |
| 292 | + identifier = f"default.nested_mismatched_field_ids_v{format_version}" |
| 293 | + |
| 294 | + # Create a table with a nested struct field |
| 295 | + try: |
| 296 | + session_catalog.drop_table(identifier=identifier) |
| 297 | + except NoSuchTableError: |
| 298 | + pass |
| 299 | + |
| 300 | + nested_schema = Schema( |
| 301 | + NestedField(1, "id", IntegerType(), required=False), |
| 302 | + NestedField( |
| 303 | + 2, |
| 304 | + "user", |
| 305 | + StructType( |
| 306 | + NestedField(3, "name", StringType(), required=False), |
| 307 | + NestedField(4, "age", IntegerType(), required=False), |
| 308 | + ), |
| 309 | + required=False, |
| 310 | + ), |
| 311 | + schema_id=0, |
| 312 | + ) |
| 313 | + |
| 314 | + tbl = session_catalog.create_table( |
| 315 | + identifier=identifier, |
| 316 | + schema=nested_schema, |
| 317 | + properties={"format-version": str(format_version)}, |
| 318 | + ) |
| 319 | + |
| 320 | + # Create PyArrow schema with MISMATCHED nested field IDs |
| 321 | + # The table expects: id=1, user=2, user.name=3, user.age=4 |
| 322 | + # This file has: id=1, user=2, user.name=99, user.age=100 (wrong nested IDs) |
| 323 | + pa_schema_mismatched = pa.schema( |
| 324 | + [ |
| 325 | + pa.field("id", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"1"}), |
| 326 | + pa.field( |
| 327 | + "user", |
| 328 | + pa.struct( |
| 329 | + [ |
| 330 | + pa.field("name", pa.string(), nullable=True, metadata={b"PARQUET:field_id": b"99"}), # Wrong! |
| 331 | + pa.field("age", pa.int32(), nullable=True, metadata={b"PARQUET:field_id": b"100"}), # Wrong! |
| 332 | + ] |
| 333 | + ), |
| 334 | + nullable=True, |
| 335 | + metadata={b"PARQUET:field_id": b"2"}, |
| 336 | + ), |
| 337 | + ] |
| 338 | + ) |
| 339 | + |
| 340 | + pa_table = pa.table( |
| 341 | + { |
| 342 | + "id": pa.array([1, 2, 3], type=pa.int32()), |
| 343 | + "user": pa.array( |
| 344 | + [ |
| 345 | + {"name": "Alice", "age": 30}, |
| 346 | + {"name": "Bob", "age": 25}, |
| 347 | + {"name": "Charlie", "age": 35}, |
| 348 | + ], |
| 349 | + type=pa_schema_mismatched.field("user").type, |
| 350 | + ), |
| 351 | + }, |
| 352 | + schema=pa_schema_mismatched, |
| 353 | + ) |
| 354 | + |
| 355 | + file_path = f"s3://warehouse/default/nested_mismatched_field_ids/v{format_version}/test.parquet" |
| 356 | + fo = tbl.io.new_output(file_path) |
| 357 | + with fo.create(overwrite=True) as fos: |
| 358 | + with pq.ParquetWriter(fos, schema=pa_schema_mismatched) as writer: |
| 359 | + writer.write_table(pa_table) |
| 360 | + |
| 361 | + # Adding files with mismatched nested field IDs should fail |
| 362 | + with pytest.raises(ValueError, match="Field IDs in Parquet file do not match table schema"): |
| 363 | + tbl.add_files(file_paths=[file_path]) |
236 | 364 |
|
237 | 365 |
|
238 | 366 | @pytest.mark.integration |
|
0 commit comments