Skip to content

Conversation

@NikhilSuthar
Copy link

No description provided.

Nikhil Suthar added 19 commits December 30, 2025 18:57
… support

- Implement PyIceberg upsert for incremental models on partitioned tables
- Support partition transforms: identity, day, month, year, hour (bucket and truncate unsupported by AWS S3 Tables)
- Add automatic PyIceberg path detection when partition_by is configured
- Add manual override with use_pyiceberg_writes=true flag
- Preserve fast DuckDB SQL path for non-partitioned tables
- Update documentation with partition transform examples and limitations
- Add pyiceberg_incremental_write() wrapper method to adapter
- Update incremental.sql with conditional PyIceberg logic
- Use cursor() method to get cursor from DuckDBConnectionWrapper
- Fix 'DuckDBConnectionWrapper' object has no attribute 'execute' error
- Properly access cursor.execute().arrow() for PyIceberg upsert
- Call read_all() on RecordBatchReader to get PyArrow Table
- Fix 'RecordBatchReader' object has no attribute 'num_rows' error
- Properly convert DuckDB arrow result to PyArrow Table for PyIceberg
- Create case-insensitive mapping of PyArrow column names
- Match unique_key columns case-insensitively
- Fix 'customerid' not found when column is 'customerid' in PyArrow
- Provide better error message with available columns
- Check if table has identifier_field_ids set
- Use update_schema().set_identifier_fields() to set them if missing
- Reload table after schema update
- Fix 'Join columns could not be found' error
- Support upsert on tables created by DuckDB without identifier fields
- Use case-insensitive search to find fields in Iceberg table schema
- Get actual schema column names (e.g., 'customerid' not 'CustomerID')
- Pass correct schema column names to set_identifier_fields()
- Fix 'Could not find field with name CustomerID, case_sensitive=True' error
- Handle mismatch between PyArrow column names and Iceberg schema names
…them

- PyIceberg requires identifier fields to be marked as required (NOT NULL)
- Added logic to check if unique_key columns are required in Iceberg schema
- Automatically call make_column_required() for non-required identifier fields
- Then set identifier fields using set_identifier_fields()
- Fixes validation error: 'Identifier field invalid: not a required field'
- Added detailed logging for troubleshooting
- Added setup_iceberg_identifier_fields() function to set identifier fields after table creation
- Added adapter method setup_s3_tables_identifier_fields() wrapper
- Updated table.sql to set identifier fields when unique_key is provided
- Updated incremental.sql to set identifier fields during full refresh
- Simplified pyiceberg_incremental_write() to just set identifier fields if missing
- Identifier fields are now set up front during table creation, not during upsert
- This matches PyIceberg's recommended pattern from documentation
… tables

- Remove setup_iceberg_identifier_fields() function and all calls to it
- Implement pyiceberg_incremental_write() using DELETE + INSERT instead of upsert
- DELETE uses PyIceberg expressions (In for single key, EqualTo/And for composite keys)
- INSERT uses PyIceberg append() method
- No longer requires identifier fields or NOT NULL constraints
- Works with tables created via DuckDB's CREATE TABLE AS SELECT
- Handles both single and composite unique keys
- Returns rows_deleted and rows_inserted counts
- Align PyArrow table schema with Iceberg schema before append
- Reorder columns to match Iceberg schema order
- Rename columns to match Iceberg schema names (case-sensitive)
- Handle missing columns by adding NULL arrays
- Fixes error: 'PyArrow table contains more columns... Update the schema first'
…output

1. Transaction Safety:
   - Wrap DELETE + INSERT in PyIceberg transaction for atomicity
   - If INSERT fails, DELETE is automatically rolled back
   - Ensures data consistency

2. Row Count Accuracy:
   - Estimate deleted rows based on unique key count
   - PyIceberg delete() doesn't return count, so we estimate
   - Shows 'X deleted, Y inserted' in logs

3. Better Compiled SQL Output:
   - Replace dummy query with meaningful SQL representation
   - Shows what operation was performed (as SQL comments)
   - Includes actual row counts in comments
   - Helps with debugging and documentation

4. Code Organization:
   - Extract schema alignment to _align_arrow_schema() helper function
   - Cleaner transaction handling with context manager
   - Better error messages with rollback notification
- Added comprehensive configuration parameter table for S3 Tables
- Documented parameters for both table and incremental materializations
- Included partition transform reference (identity, day, month, year, hour)
- Added 4 practical examples covering common use cases
- Clarified mandatory vs optional parameters
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant