Skip to content

data inlining is slow with postgres #615

@djouallah

Description

@djouallah

What happens?

testing data inlining in a local postgres, heap is around 10 x compared to ducklake

To Reproduce

import psycopg2
from psycopg2 import pool
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
import duckdb

# ============================================================================
# Database Connection Configuration
# ============================================================================

# Database connection parameters
DB_CONFIG = {
    'host': 'localhost',
    'database': 'postgres',
    'user': 'me',
    'port': 5432
}

# Test configuration
NUM_WORKERS = [1,2]  # List of worker counts to test
INSERTS_PER_WORKER = 100

# ============================================================================
# Create Connection Pool
# ============================================================================

# Create a PostgreSQL connection pool
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=50,
    **DB_CONFIG
)

# Create DuckDB connection for DuckLake
duckdb_conn = duckdb.connect()
duckdb_conn.execute("""
    ATTACH OR REPLACE 'ducklake:postgres:dbname=ducklake host=localhost' AS ducklake 
    (DATA_PATH 's3://mybucket/ducklake/',DATA_INLINING_ROW_LIMIT 10)
""")
duckdb_conn.execute("USE ducklake")

print("PostgreSQL connection pool created successfully")
print("DuckDB connection with DuckLake attached successfully")

# ============================================================================
# Create Test Schema and Tables
# ============================================================================

def setup_tables():
    """Create test schema and heap, iceberg, and ducklake tables"""
    # PostgreSQL tables
    conn = connection_pool.getconn()
    try:
        cursor = conn.cursor()
        
        # Create schema
        cursor.execute("CREATE SCHEMA IF NOT EXISTS test_concurrent")
        
        # Drop existing tables if they exist
        cursor.execute("DROP TABLE IF EXISTS test_concurrent.lineorder_heap CASCADE")
        cursor.execute("DROP TABLE IF EXISTS test_concurrent.lineorder_iceberg CASCADE")
        
        # Create HEAP table (no primary key for fair comparison)
        cursor.execute("""
            CREATE TABLE test_concurrent.lineorder_heap (
                lo_orderkey BIGINT NOT NULL,
                lo_linenumber INTEGER NOT NULL,
                lo_custkey INTEGER,
                lo_partkey INTEGER,
                lo_suppkey INTEGER,
                lo_orderdate INTEGER,
                lo_commitdate INTEGER,
                lo_shipdate INTEGER,
                lo_orderpriority VARCHAR(15),
                lo_shippriority INTEGER,
                lo_shipmode VARCHAR(10),
                lo_quantity NUMERIC(15,2),
                lo_extendedprice NUMERIC(15,2),
                lo_discount NUMERIC(15,2),
                lo_tax NUMERIC(15,2),
                lo_revenue NUMERIC(15,2),
                lo_supplycost NUMERIC(15,2)
            )
        """)
        print("✓ Created HEAP table: test_concurrent.lineorder_heap")
        
        # Create ICEBERG table (no primary key constraint - not supported on foreign tables)
        cursor.execute("""
            CREATE TABLE test_concurrent.lineorder_iceberg (
                lo_orderkey BIGINT NOT NULL,
                lo_linenumber INTEGER NOT NULL,
                lo_custkey INTEGER,
                lo_partkey INTEGER,
                lo_suppkey INTEGER,
                lo_orderdate INTEGER,
                lo_commitdate INTEGER,
                lo_shipdate INTEGER,
                lo_orderpriority VARCHAR(15),
                lo_shippriority INTEGER,
                lo_shipmode VARCHAR(10),
                lo_quantity NUMERIC(15,2),
                lo_extendedprice NUMERIC(15,2),
                lo_discount NUMERIC(15,2),
                lo_tax NUMERIC(15,2),
                lo_revenue NUMERIC(15,2),
                lo_supplycost NUMERIC(15,2)
            ) USING iceberg
        """)
        print("✓ Created ICEBERG table: test_concurrent.lineorder_iceberg")
        
        conn.commit()
        cursor.close()
        
    except Exception as e:
        print(f"Error setting up PostgreSQL tables: {e}")
        conn.rollback()
    finally:
        connection_pool.putconn(conn)
    
    # DuckDB DuckLake table
    try:
        duckdb_conn.execute("CREATE SCHEMA IF NOT EXISTS test_concurrent")
        duckdb_conn.execute("DROP TABLE IF EXISTS ducklake.test_concurrent.lineorder_ducklake")
        
        duckdb_conn.execute("""
            CREATE TABLE ducklake.test_concurrent.lineorder_ducklake (
                lo_orderkey BIGINT NOT NULL,
                lo_linenumber INTEGER NOT NULL,
                lo_custkey INTEGER,
                lo_partkey INTEGER,
                lo_suppkey INTEGER,
                lo_orderdate INTEGER,
                lo_commitdate INTEGER,
                lo_shipdate INTEGER,
                lo_orderpriority VARCHAR(15),
                lo_shippriority INTEGER,
                lo_shipmode VARCHAR(10),
                lo_quantity DECIMAL(15,2),
                lo_extendedprice DECIMAL(15,2),
                lo_discount DECIMAL(15,2),
                lo_tax DECIMAL(15,2),
                lo_revenue DECIMAL(15,2),
                lo_supplycost DECIMAL(15,2)
            )
        """)
        print("✓ Created DUCKLAKE table: test_concurrent.lineorder_ducklake")
        
    except Exception as e:
        print(f"Error setting up DuckLake table: {e}")

# ============================================================================
# Insert Function
# ============================================================================

# Static lists for random selection
ORDER_PRIORITIES = ['1-URGENT', '2-HIGH', '3-MEDIUM', '4-NOT SPECIFIED', '5-LOW']
SHIP_MODES = ['AIR', 'MAIL', 'RAIL', 'SHIP', 'TRUCK', 'REG AIR', 'FOB']

def insert_batch(worker_id, num_inserts, start_key, table_name):
    """Insert multiple rows from a single worker into specified table"""
    inserted = 0
    errors = 0
    
    # Determine if this is a DuckDB table
    is_ducklake = 'ducklake' in table_name.lower()
    
    if is_ducklake:
        # Use shared DuckDB connection
        try:
            for i in range(num_inserts):
                try:
                    lo_orderkey = start_key + i
                    lo_quantity = random.randint(1, 50)
                    lo_extendedprice = round(random.uniform(1000, 101000), 2)
                    lo_discount = round(random.uniform(0, 0.10), 2)
                    lo_tax = round(random.uniform(0, 0.08), 2)
                    lo_revenue = round(lo_extendedprice * (1 - lo_discount), 2)
                    
                    duckdb_conn.execute(f"""
                        INSERT INTO {table_name} VALUES (?, 1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """, (
                        lo_orderkey,
                        random.randint(1, 1000),
                        random.randint(1, 1000),
                        random.randint(1, 1000),
                        random.randint(19920101, 19981231),
                        random.randint(19920101, 19981231),
                        random.randint(19920101, 19981231),
                        random.choice(ORDER_PRIORITIES),
                        random.randint(0, 9),
                        random.choice(SHIP_MODES),
                        lo_quantity,
                        lo_extendedprice,
                        lo_discount,
                        lo_tax,
                        lo_revenue,
                        round(random.uniform(0, 50000), 2)
                    ))
                    inserted += 1
                        
                except Exception as e:
                    print(f"Worker {worker_id} insert error: {e}")
                    errors += 1
            
        except Exception as e:
            print(f"Worker {worker_id} DuckDB connection error: {e}")
            import traceback
            traceback.print_exc()
    else:
        # PostgreSQL connection
        conn = connection_pool.getconn()
        try:
            cursor = conn.cursor()
            
            for i in range(num_inserts):
                try:
                    lo_orderkey = start_key + i
                    lo_quantity = random.randint(1, 50)
                    lo_extendedprice = round(random.uniform(1000, 101000), 2)
                    lo_discount = round(random.uniform(0, 0.10), 2)
                    lo_tax = round(random.uniform(0, 0.08), 2)
                    lo_revenue = round(lo_extendedprice * (1 - lo_discount), 2)
                    
                    cursor.execute(f"""
                        INSERT INTO {table_name} (
                            lo_orderkey, lo_linenumber, lo_custkey, lo_partkey, lo_suppkey,
                            lo_orderdate, lo_commitdate, lo_shipdate,
                            lo_orderpriority, lo_shippriority, lo_shipmode,
                            lo_quantity, lo_extendedprice, lo_discount, lo_tax,
                            lo_revenue, lo_supplycost
                        ) VALUES (
                            %s, 1, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
                        )
                    """, (
                        lo_orderkey,
                        random.randint(1, 1000),
                        random.randint(1, 1000),
                        random.randint(1, 1000),
                        random.randint(19920101, 19981231),
                        random.randint(19920101, 19981231),
                        random.randint(19920101, 19981231),
                        random.choice(ORDER_PRIORITIES),
                        random.randint(0, 9),
                        random.choice(SHIP_MODES),
                        lo_quantity,
                        lo_extendedprice,
                        lo_discount,
                        lo_tax,
                        lo_revenue,
                        round(random.uniform(0, 50000), 2)
                    ))
                    
                    # Commit after each insert (1 row at a time)
                    conn.commit()
                    inserted += 1
                        
                except Exception as e:
                    errors += 1
                    conn.rollback()
            cursor.close()
            
        except Exception as e:
            print(f"Worker {worker_id} PostgreSQL error: {e}")
        finally:
            connection_pool.putconn(conn)
    
    return worker_id, inserted, errors

# ============================================================================
# Run Concurrent Insert Tests
# ============================================================================

def run_concurrent_test(num_workers, inserts_per_worker, table_name, table_type):
    """Run a test with specified number of concurrent workers"""
    print(f"\n{'='*60}")
    print(f"Testing {table_type} with {num_workers} workers...")
    print(f"{'='*60}")
    
    # Generate unique starting keys for each worker
    base_key = int(time.time() * 1000)  # Use timestamp as base to ensure uniqueness
    
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        futures = []
        for worker_id in range(num_workers):
            start_key = base_key + (worker_id * inserts_per_worker)
            future = executor.submit(insert_batch, worker_id, inserts_per_worker, start_key, table_name)
            futures.append(future)
        
        # Wait for all workers to complete
        results = []
        for future in as_completed(futures):
            results.append(future.result())
    
    end_time = time.time()
    elapsed_time = end_time - start_time
    
    total_inserted = sum(r[1] for r in results)
    total_errors = sum(r[2] for r in results)
    
    inserts_per_second = total_inserted / elapsed_time if elapsed_time > 0 else 0
    
    print(f"  ✓ Inserted: {total_inserted} rows")
    print(f"  ✗ Errors: {total_errors}")
    print(f"  ⏱ Time: {elapsed_time:.2f} seconds")
    print(f"  ⚡ Throughput: {inserts_per_second:.2f} inserts/second")
    
    return {
        'table_type': table_type,
        'workers': num_workers,
        'total_inserts': total_inserted,
        'errors': total_errors,
        'elapsed_time': elapsed_time,
        'inserts_per_second': inserts_per_second
    }

# ============================================================================
# Main Execution
# ============================================================================

if __name__ == '__main__':
    # Create tables
    setup_tables()
    
    # Run tests on all tables
    all_results = []
    
    for num_workers in NUM_WORKERS:
        # Test HEAP table
        result_heap = run_concurrent_test(
            num_workers, 
            INSERTS_PER_WORKER, 
            'test_concurrent.lineorder_heap',
            'HEAP'
        )
        all_results.append(result_heap)
        
        time.sleep(1)  # Brief pause between tests
        
        # Test DUCKLAKE table
        result_ducklake = run_concurrent_test(
            num_workers, 
            INSERTS_PER_WORKER, 
            'ducklake.test_concurrent.lineorder_ducklake',
            'DUCKLAKE'
        )
        all_results.append(result_ducklake)
        
        time.sleep(1)  # Brief pause between tests
        
        # Test ICEBERG table
        result_iceberg = run_concurrent_test(
            num_workers, 
            INSERTS_PER_WORKER, 
            'test_concurrent.lineorder_iceberg',
            'ICEBERG'
        )
        all_results.append(result_iceberg)
        
        time.sleep(2)  # Pause before next concurrency level
    
    print("\n" + "="*60)
    print("All tests completed!")
    print("="*60)
    

OS:

windows

DuckDB Version:

1.4.3

DuckLake Version:

0.2

DuckDB Client:

python

Hardware:

No response

Full Name:

mim

Affiliation:

personal

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have not tested with any build

Did you include all relevant data sets for reproducing the issue?

No - Other reason (please specify in the issue body)

Did you include all code required to reproduce the issue?

  • Yes, I have

Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?

  • Yes, I have

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions