From 56df3d23999aa68ff2df236a393bd0d5cdc57f59 Mon Sep 17 00:00:00 2001 From: khustup2 Date: Thu, 12 Feb 2026 22:38:38 +0000 Subject: [PATCH] Sync db catalogs. --- cpp/deeplake_pg/dl_catalog.cpp | 3 + cpp/deeplake_pg/pg_deeplake.cpp | 8 +- cpp/deeplake_pg/table_storage.cpp | 35 +-- cpp/deeplake_pg/utils.hpp | 15 +- postgres/Dockerfile | 15 ++ scripts/tpch_deeplake_ingest.py | 385 ++++++++++++++++++++++++++++++ 6 files changed, 440 insertions(+), 21 deletions(-) create mode 100644 scripts/tpch_deeplake_ingest.py diff --git a/cpp/deeplake_pg/dl_catalog.cpp b/cpp/deeplake_pg/dl_catalog.cpp index 9dfa94f555..bb10c85724 100644 --- a/cpp/deeplake_pg/dl_catalog.cpp +++ b/cpp/deeplake_pg/dl_catalog.cpp @@ -128,6 +128,9 @@ std::vector load_int64_vector(const nd::array& arr) int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) { + if (root_path.empty()) { + return 0; + } const auto tables_path = join_path(root_path, k_tables_name); const auto columns_path = join_path(root_path, k_columns_name); const auto indexes_path = join_path(root_path, k_indexes_name); diff --git a/cpp/deeplake_pg/pg_deeplake.cpp b/cpp/deeplake_pg/pg_deeplake.cpp index a1905af18b..93449a6d2c 100644 --- a/cpp/deeplake_pg/pg_deeplake.cpp +++ b/cpp/deeplake_pg/pg_deeplake.cpp @@ -356,7 +356,7 @@ void deeplake_xact_callback(XactEvent event, void *arg) void init_deeplake() { static bool initialized = false; - if (initialized) { + if (initialized || !IsUnderPostmaster) { return; } initialized = true; @@ -368,6 +368,12 @@ void init_deeplake() constexpr int THREAD_POOL_MULTIPLIER = 8; // Threads per CPU core for async operations deeplake_api::initialize(std::make_shared(), THREAD_POOL_MULTIPLIER * base::system_report::cpu_cores()); + const std::string redis_url = base::getenv("REDIS_URL", ""); + if (!redis_url.empty()) { + deeplake_api::initialize_redis_cache(redis_url, 86400, + deeplake_api::metadata_catalog_cache_pattern); + } + pg::table_storage::instance(); /// initialize table storage RegisterXactCallback(deeplake_xact_callback, nullptr); diff --git a/cpp/deeplake_pg/table_storage.cpp b/cpp/deeplake_pg/table_storage.cpp index 139b48dc5f..b4f67ad7b8 100644 --- a/cpp/deeplake_pg/table_storage.cpp +++ b/cpp/deeplake_pg/table_storage.cpp @@ -236,6 +236,9 @@ void table_storage::save_table_metadata(const pg::table_data& table_data) } return root; }(); + if (root_dir.empty()) { + return; + } auto creds = session_credentials::get_credentials(); pg::dl_catalog::ensure_catalog(root_dir, creds); @@ -292,8 +295,8 @@ void table_storage::load_table_metadata() }(); auto creds = session_credentials::get_credentials(); - // Stateless catalog sync (only when enabled) - if (pg::stateless_enabled) { + // Stateless catalog sync (only when enabled and root_dir is configured) + if (pg::stateless_enabled && !root_dir.empty()) { // Fast path: if already loaded, just check version without ensure_catalog if (tables_loaded_) { const auto current_version = pg::dl_catalog::get_catalog_version(root_dir, creds); @@ -534,7 +537,7 @@ void table_storage::load_table_metadata() } try { // Seed the DL catalog with legacy metadata (only when stateless is enabled). - if (pg::stateless_enabled) { + if (pg::stateless_enabled && !root_dir.empty()) { auto [schema_name, simple_table_name] = split_table_name(table_name); pg::dl_catalog::table_meta meta; meta.table_id = schema_name + "." + simple_table_name; @@ -580,7 +583,7 @@ void table_storage::load_table_metadata() base::log_channel::generic, "Failed to delete invalid table metadata for table_oid: {}", invalid_oid); } } - if (catalog_seeded && pg::stateless_enabled) { + if (catalog_seeded && pg::stateless_enabled && !root_dir.empty()) { pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); } @@ -991,17 +994,19 @@ void table_storage::drop_table(const std::string& table_name) } return root; }(); - pg::dl_catalog::ensure_catalog(root_dir, creds); - auto [schema_name, simple_table_name] = split_table_name(table_name); - pg::dl_catalog::table_meta meta; - meta.table_id = schema_name + "." + simple_table_name; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = table_data.get_dataset_path().url(); - meta.state = "dropping"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); + if (!root_dir.empty()) { + pg::dl_catalog::ensure_catalog(root_dir, creds); + auto [schema_name, simple_table_name] = split_table_name(table_name); + pg::dl_catalog::table_meta meta; + meta.table_id = schema_name + "." + simple_table_name; + meta.schema_name = schema_name; + meta.table_name = simple_table_name; + meta.dataset_path = table_data.get_dataset_path().url(); + meta.state = "dropping"; + pg::dl_catalog::upsert_table(root_dir, creds, meta); + pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); + catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); + } } try { diff --git a/cpp/deeplake_pg/utils.hpp b/cpp/deeplake_pg/utils.hpp index 3cde8d0a2d..5d2eb0cb0f 100644 --- a/cpp/deeplake_pg/utils.hpp +++ b/cpp/deeplake_pg/utils.hpp @@ -270,17 +270,22 @@ static std::string get_pg_data_directory() { const char* data_dir = GetConfigOption("data_directory", true, false); if (data_dir == nullptr) { - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Unable to retrieve data_directory"))); + return ""; } return std::string(data_dir); } static std::string get_deeplake_root_directory() { - static const std::string root_dir_variable_name = "DEEPLAKE_ROOT_PATH"; - static const std::string pg_data_dir = get_pg_data_directory(); - static const std::string deeplake_root_dir = base::getenv(root_dir_variable_name, pg_data_dir); - return deeplake_root_dir; + // Avoid static locals: if get_pg_data_directory() previously failed via + // ereport(ERROR) (longjmp through C++ static init), the static guard + // variable is permanently poisoned and subsequent calls return "". + // Re-evaluate every time so a later call can succeed once GUCs are ready. + auto root = base::getenv("DEEPLAKE_ROOT_PATH", ""); + if (root.empty()) { + root = get_pg_data_directory(); + } + return root; } inline std::pair row_number_to_tid(int64_t row_number) diff --git a/postgres/Dockerfile b/postgres/Dockerfile index c6462da2f1..044243e082 100644 --- a/postgres/Dockerfile +++ b/postgres/Dockerfile @@ -1,6 +1,7 @@ FROM BASE_IMAGE ARG VERSION=VERSION ARG TARGETARCH +ARG STATELESS=false LABEL name="pg-deeplake" \ version="${VERSION}" \ @@ -28,4 +29,18 @@ COPY ./debs/ /tmp/debs/ COPY --chmod=444 ./LICENSE /LICENSE COPY ./postgres/docker-entrypoint.d/ /docker-entrypoint-initdb.d/ RUN apt-get install --no-install-recommends -y /tmp/debs/pg-deeplake-${VERSION}_${TARGETARCH}.deb && rm -rf /tmp/debs/ +COPY ./serverless/scripts/init-deeplake-stateless.sh /tmp/init-deeplake-stateless.sh +COPY ./serverless/config/postgresql-overrides.conf /tmp/postgresql-overrides.conf +COPY ./serverless/scripts/health-check.sh /tmp/health-check.sh +RUN if [ "$STATELESS" = "true" ]; then \ + mv /tmp/init-deeplake-stateless.sh /docker-entrypoint-initdb.d/3-stateless-init.sh && \ + chmod 755 /docker-entrypoint-initdb.d/3-stateless-init.sh && \ + mv /tmp/postgresql-overrides.conf /etc/postgresql-overrides.conf && \ + chmod 644 /etc/postgresql-overrides.conf && \ + mv /tmp/health-check.sh /usr/local/bin/health-check.sh && \ + chmod 755 /usr/local/bin/health-check.sh && \ + mkdir -p /deeplake-data; \ + else \ + rm -f /tmp/init-deeplake-stateless.sh /tmp/postgresql-overrides.conf /tmp/health-check.sh; \ + fi USER 999 diff --git a/scripts/tpch_deeplake_ingest.py b/scripts/tpch_deeplake_ingest.py new file mode 100644 index 0000000000..6a329303a2 --- /dev/null +++ b/scripts/tpch_deeplake_ingest.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python3 +""" +TPC-H Ingestion Script for pg_deeplake + +Each table gets its own connection through a load balancer (HAProxy), +which distributes tables across backend instances via round-robin. + +Usage: + # Via HAProxy (parallel, one connection per table): + python tpch_deeplake_ingest.py + + # Direct to single instance: + python tpch_deeplake_ingest.py --port 5433 --sequential +""" + +import argparse +import io +import sys +import time +import psycopg2 +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + + +def log(msg): + print(msg, flush=True) + +# TPC-H table definitions +TPCH_TABLES = { + 'region': { + 'columns': [ + ('r_regionkey', 'INTEGER'), + ('r_name', 'VARCHAR(25)'), + ('r_comment', 'VARCHAR(152)') + ], + }, + 'nation': { + 'columns': [ + ('n_nationkey', 'INTEGER'), + ('n_name', 'VARCHAR(25)'), + ('n_regionkey', 'INTEGER'), + ('n_comment', 'VARCHAR(152)') + ], + }, + 'supplier': { + 'columns': [ + ('s_suppkey', 'INTEGER'), + ('s_name', 'VARCHAR(25)'), + ('s_address', 'VARCHAR(40)'), + ('s_nationkey', 'INTEGER'), + ('s_phone', 'VARCHAR(15)'), + ('s_acctbal', 'DECIMAL(15,2)'), + ('s_comment', 'VARCHAR(101)') + ], + }, + 'customer': { + 'columns': [ + ('c_custkey', 'INTEGER'), + ('c_name', 'VARCHAR(25)'), + ('c_address', 'VARCHAR(40)'), + ('c_nationkey', 'INTEGER'), + ('c_phone', 'VARCHAR(15)'), + ('c_acctbal', 'DECIMAL(15,2)'), + ('c_mktsegment', 'VARCHAR(10)'), + ('c_comment', 'VARCHAR(117)') + ], + }, + 'part': { + 'columns': [ + ('p_partkey', 'INTEGER'), + ('p_name', 'VARCHAR(55)'), + ('p_mfgr', 'VARCHAR(25)'), + ('p_brand', 'VARCHAR(10)'), + ('p_type', 'VARCHAR(25)'), + ('p_size', 'INTEGER'), + ('p_container', 'VARCHAR(10)'), + ('p_retailprice', 'DECIMAL(15,2)'), + ('p_comment', 'VARCHAR(23)') + ], + }, + 'partsupp': { + 'columns': [ + ('ps_partkey', 'INTEGER'), + ('ps_suppkey', 'INTEGER'), + ('ps_availqty', 'INTEGER'), + ('ps_supplycost', 'DECIMAL(15,2)'), + ('ps_comment', 'VARCHAR(199)') + ], + }, + 'orders': { + 'columns': [ + ('o_orderkey', 'INTEGER'), + ('o_custkey', 'INTEGER'), + ('o_orderstatus', 'VARCHAR(1)'), + ('o_totalprice', 'DECIMAL(15,2)'), + ('o_orderdate', 'DATE'), + ('o_orderpriority', 'VARCHAR(15)'), + ('o_clerk', 'VARCHAR(15)'), + ('o_shippriority', 'INTEGER'), + ('o_comment', 'VARCHAR(79)') + ], + }, + 'lineitem': { + 'columns': [ + ('l_orderkey', 'INTEGER'), + ('l_partkey', 'INTEGER'), + ('l_suppkey', 'INTEGER'), + ('l_linenumber', 'INTEGER'), + ('l_quantity', 'DECIMAL(15,2)'), + ('l_extendedprice', 'DECIMAL(15,2)'), + ('l_discount', 'DECIMAL(15,2)'), + ('l_tax', 'DECIMAL(15,2)'), + ('l_returnflag', 'VARCHAR(1)'), + ('l_linestatus', 'VARCHAR(1)'), + ('l_shipdate', 'DATE'), + ('l_commitdate', 'DATE'), + ('l_receiptdate', 'DATE'), + ('l_shipinstruct', 'VARCHAR(25)'), + ('l_shipmode', 'VARCHAR(10)'), + ('l_comment', 'VARCHAR(44)') + ], + }, +} + +TABLE_LOAD_ORDER = ['region', 'nation', 'supplier', 'customer', 'part', 'partsupp', 'orders', 'lineitem'] + + +def get_connection(host, port, database, user, password): + return psycopg2.connect(host=host, port=port, database=database, user=user, password=password) + + + +def disable_autovacuum(conn): + old_autocommit = conn.autocommit + conn.autocommit = True + try: + with conn.cursor() as cur: + cur.execute("ALTER SYSTEM SET autovacuum = off;") + cur.execute("SELECT pg_reload_conf();") + finally: + conn.autocommit = old_autocommit + + +def enable_autovacuum(conn): + old_autocommit = conn.autocommit + conn.autocommit = True + try: + with conn.cursor() as cur: + cur.execute("ALTER SYSTEM SET autovacuum = on;") + cur.execute("SELECT pg_reload_conf();") + finally: + conn.autocommit = old_autocommit + + +def drop_table(conn, table_name): + with conn.cursor() as cur: + cur.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE;") + conn.commit() + + +def create_table(conn, table_name, table_def): + columns = table_def['columns'] + col_defs = ', '.join([f"{name} {dtype}" for name, dtype, *_ in columns]) + sql = f"CREATE TABLE {table_name} ({col_defs}) USING deeplake;" + with conn.cursor() as cur: + cur.execute(sql) + conn.commit() + + +def table_exists(conn, table_name): + with conn.cursor() as cur: + cur.execute( + """ + SELECT 1 + FROM pg_tables + WHERE schemaname = 'public' AND tablename = %s + """, + (table_name,), + ) + return cur.fetchone() is not None + + +CHUNK_SIZE = 5_000_000 # rows per COPY batch to avoid OOM on large tables + + +def load_data(conn, table_name, data_file): + """Load data in chunks to avoid OOM on large tables like lineitem (60M rows). + + Each chunk is a separate COPY + COMMIT so deeplake flushes to S3 + before accumulating the next chunk. + """ + total_lines = 0 + chunk_num = 0 + + with open(data_file, 'r') as f: + while True: + chunk = io.StringIO() + lines_in_chunk = 0 + + for line in f: + if line.endswith('|\n'): + chunk.write(line[:-2] + '\n') + elif line.endswith('|'): + chunk.write(line[:-1]) + else: + chunk.write(line) + lines_in_chunk += 1 + if lines_in_chunk >= CHUNK_SIZE: + break + + if lines_in_chunk == 0: + break + + chunk.seek(0) + with conn.cursor() as cur: + cur.copy_expert( + f"COPY {table_name} FROM STDIN WITH (FORMAT csv, DELIMITER '|')", + chunk + ) + conn.commit() + + total_lines += lines_in_chunk + chunk_num += 1 + if chunk_num > 1 or lines_in_chunk >= CHUNK_SIZE: + log(f" chunk {chunk_num}: {lines_in_chunk:,} rows committed ({total_lines:,} total)") + + return total_lines + + +def get_row_count(conn, table_name): + with conn.cursor() as cur: + cur.execute(f"SELECT COUNT(*) FROM {table_name};") + return cur.fetchone()[0] + + +def run_vacuum(conn, table_name): + old_autocommit = conn.autocommit + conn.autocommit = True + try: + with conn.cursor() as cur: + cur.execute(f"VACUUM ANALYZE {table_name};") + finally: + conn.autocommit = old_autocommit + + +def ingest_one_table(table_name, args): + """Worker: open a new connection (via LB), load one table. Returns (table, rows, seconds).""" + conn = get_connection(args.host, args.port, args.database, args.user, args.password) + try: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT inet_server_addr()") + backend_addr = cur.fetchone()[0] + conn.autocommit = False + + data_file = Path(args.data_dir) / f"{table_name}.tbl" + if not data_file.exists(): + log(f"[{table_name}] Data file not found: {data_file}, skipping") + return (table_name, 0, 0.0) + + log(f"[{table_name}] Loading on backend {backend_addr}...") + + start_time = time.time() + load_data(conn, table_name, data_file) + elapsed = time.time() - start_time + + row_count = get_row_count(conn, table_name) + log(f"[{table_name}] {row_count:,} rows in {elapsed:.1f}s (backend {backend_addr})") + + if args.vacuum_after_each: + run_vacuum(conn, table_name) + + return (table_name, row_count, elapsed) + except Exception as e: + log(f"[{table_name}] ERROR: {e}") + raise + finally: + try: + conn.rollback() + except Exception: + pass + conn.close() + + +def main(): + parser = argparse.ArgumentParser(description='TPC-H pg_deeplake ingestion') + + # Connection + parser.add_argument('--host', default='localhost') + parser.add_argument('--port', type=int, default=5432, + help='Port (default: 5432 for HAProxy)') + parser.add_argument('--database', default='postgres') + parser.add_argument('--user', default='postgres') + parser.add_argument('--password', default='postgres') + + # Ingestion + parser.add_argument('--data-dir', default='/home/admin/sasun/work/tpch/tpch_data') + parser.add_argument('--tables', nargs='*', default=None, + help='Specific tables to load (default: all 8)') + parser.add_argument('--skip-create', action='store_true') + parser.add_argument('--replace-existing', action='store_true', + help='DROP+CREATE existing tables before load') + parser.add_argument('--sequential', action='store_true', + help='Load tables sequentially on one connection (no parallel)') + parser.add_argument('--disable-autovacuum', action='store_true') + parser.add_argument('--vacuum-after-each', action='store_true') + + args = parser.parse_args() + + tables_to_load = args.tables if args.tables else TABLE_LOAD_ORDER + + for t in tables_to_load: + if t not in TPCH_TABLES: + parser.error(f"Unknown table: {t}") + + log(f"TPC-H pg_deeplake Ingestion") + log(f" Endpoint: {args.host}:{args.port}") + log(f" Database: {args.database}") + log(f" Tables: {', '.join(tables_to_load)}") + log(f" Parallel: {not args.sequential} ({len(tables_to_load)} connections)") + log("") + + # Phase 1: Create tables serially on one connection + if not args.skip_create: + log("Phase 1: Creating tables...") + conn = get_connection(args.host, args.port, args.database, args.user, args.password) + try: + if args.disable_autovacuum: + disable_autovacuum(conn) + for table_name in tables_to_load: + table_def = TPCH_TABLES[table_name] + if args.replace_existing: + drop_table(conn, table_name) + create_table(conn, table_name, table_def) + log(f" {table_name}: recreated") + elif not table_exists(conn, table_name): + create_table(conn, table_name, table_def) + log(f" {table_name}: created") + else: + log(f" {table_name}: exists") + finally: + try: + conn.rollback() + except Exception: + pass + conn.close() + + log("") + + # Phase 2: Load data + overall_start = time.time() + + if args.sequential: + # Sequential: one connection, all tables + log("Loading data sequentially...") + results = [] + for table_name in tables_to_load: + result = ingest_one_table(table_name, args) + results.append(result) + else: + # Parallel: one connection per table, HAProxy distributes + log(f"Phase 2: Loading data in parallel ({len(tables_to_load)} tables, 1 connection each)...") + results = [] + with ThreadPoolExecutor(max_workers=len(tables_to_load)) as executor: + futures = { + executor.submit(ingest_one_table, table_name, args): table_name + for table_name in tables_to_load + } + for future in as_completed(futures): + table_name = futures[future] + try: + results.append(future.result()) + except Exception as e: + log(f"[{table_name}] FAILED: {e}") + + overall_elapsed = time.time() - overall_start + total_rows = sum(r[1] for r in results) + + log(f"\nDone. {total_rows:,} total rows in {overall_elapsed:.1f}s (wall time)") + for table_name, row_count, elapsed in sorted(results, key=lambda x: x[0]): + log(f" {table_name:<12} {row_count:>12,} rows {elapsed:>7.1f}s") + + +if __name__ == '__main__': + main()