From 3fc139529f09f0b0fc4bb2ac134f5b16f08f1ce5 Mon Sep 17 00:00:00 2001 From: khustup2 Date: Sat, 14 Feb 2026 19:17:57 +0000 Subject: [PATCH 1/2] Removed 16 pg support. --- .github/workflows/pg-extension-build.yaml | 7 +++---- cpp/CMakeLists.pg.cmake | 5 ----- cpp/cmake/modules/FindPostgres.cmake | 3 --- cpp/deeplake_pg/pg_version_compat.h | 1 - cpp/deeplake_pg/table_am.cpp | 4 ---- postgres/scripts/build_deb.sh | 6 +++--- postgres/scripts/install.sh | 4 ++-- scripts/build_pg_ext.py | 14 +++++++------- 8 files changed, 15 insertions(+), 29 deletions(-) diff --git a/.github/workflows/pg-extension-build.yaml b/.github/workflows/pg-extension-build.yaml index 5091bfc035..68ad7992ec 100644 --- a/.github/workflows/pg-extension-build.yaml +++ b/.github/workflows/pg-extension-build.yaml @@ -8,12 +8,11 @@ on: workflow_dispatch: inputs: pg_version: - description: "PostgreSQL version to build (16, 17, 18, or all)" + description: "PostgreSQL version to build (17, 18, or all)" required: false default: "18" type: choice options: - - "16" - "17" - "18" - all @@ -50,8 +49,8 @@ jobs: id: set-versions run: |- if [ "${PG_VERSION}" == "all" ]; then - echo "versions=[\"16\",\"17\",\"18\"]" >> "${GITHUB_OUTPUT}" - echo "versions-list=16,17,18" >> "${GITHUB_OUTPUT}" + echo "versions=[\"17\",\"18\"]" >> "${GITHUB_OUTPUT}" + echo "versions-list=17,18" >> "${GITHUB_OUTPUT}" else echo "versions=[\"${PG_VERSION}\"]" >> "${GITHUB_OUTPUT}" echo "versions-list=${PG_VERSION}" >> "${GITHUB_OUTPUT}" diff --git a/cpp/CMakeLists.pg.cmake b/cpp/CMakeLists.pg.cmake index a0dcc19c13..65e198d775 100644 --- a/cpp/CMakeLists.pg.cmake +++ b/cpp/CMakeLists.pg.cmake @@ -1,4 +1,3 @@ -option(BUILD_PG_16 "Build PostgreSQL 16 extension" OFF) option(BUILD_PG_17 "Build PostgreSQL 17 extension" OFF) option(BUILD_PG_18 "Build PostgreSQL 18 extension" ON) option(USE_DEEPLAKE_SHARED "Use shared library for deeplake_api (default: auto-detect)" OFF) @@ -6,10 +5,6 @@ option(USE_DEEPLAKE_SHARED "Use shared library for deeplake_api (default: auto-d set(PG_MODULE deeplake_pg) set(PG_VERSIONS) -if(BUILD_PG_16) - list(APPEND PG_VERSIONS 16) -endif() - if(BUILD_PG_17) list(APPEND PG_VERSIONS 17) endif() diff --git a/cpp/cmake/modules/FindPostgres.cmake b/cpp/cmake/modules/FindPostgres.cmake index 62531f6250..88e87b0c17 100644 --- a/cpp/cmake/modules/FindPostgres.cmake +++ b/cpp/cmake/modules/FindPostgres.cmake @@ -3,14 +3,12 @@ include(ExternalProject) # Define PostgreSQL versions set(postgres_versions - "REL_16_0" "REL_17_0" "REL_18_0" ) # Define corresponding SHA256 checksums for each version set(postgres_SHA256_CHECKSUMS - "37851d1fdae1f2cdd1d23bf9a4598b6c2f3f6792e18bc974d78ed780a28933bf" "16912fe4aef3c8f297b5da1b591741f132377c8b5e1b8e896e07fdd680d6bf34" "b155bd4a467b401ebe61b504643492aae2d0836981aa4a5a60f8668b94eadebc" ) @@ -47,6 +45,5 @@ foreach(postgres_version IN LISTS postgres_versions) ) endforeach() -set(postgres_INSTALL_DIR_REL_16_0 ${DEFAULT_PARENT_DIR}/.ext/postgres-REL_16_0/install) set(postgres_INSTALL_DIR_REL_17_0 ${DEFAULT_PARENT_DIR}/.ext/postgres-REL_17_0/install) set(postgres_INSTALL_DIR_REL_18_0 ${DEFAULT_PARENT_DIR}/.ext/postgres-REL_18_0/install) diff --git a/cpp/deeplake_pg/pg_version_compat.h b/cpp/deeplake_pg/pg_version_compat.h index efb09e90f4..3b65c53fab 100644 --- a/cpp/deeplake_pg/pg_version_compat.h +++ b/cpp/deeplake_pg/pg_version_compat.h @@ -1,6 +1,5 @@ #pragma once -#define PG_VERSION_NUM_16 160000 #define PG_VERSION_NUM_17 170000 #define PG_VERSION_NUM_18 180000 diff --git a/cpp/deeplake_pg/table_am.cpp b/cpp/deeplake_pg/table_am.cpp index 5625826e2d..cc0242a19e 100644 --- a/cpp/deeplake_pg/table_am.cpp +++ b/cpp/deeplake_pg/table_am.cpp @@ -313,7 +313,6 @@ bool deeplake_scan_analyze_next_tuple( return true; } -#if PG_VERSION_NUM >= PG_VERSION_NUM_17 bool deeplake_scan_analyze_next_block(TableScanDesc scan, ReadStream* stream) { DeeplakeScanData* scan_data = get_scan_data(scan); @@ -349,7 +348,6 @@ bool deeplake_scan_analyze_next_block(TableScanDesc scan, ReadStream* stream) return true; // Indicate we have data to process } -#endif double deeplake_index_build_range_scan(Relation heap_rel, Relation index_rel, @@ -657,9 +655,7 @@ void deeplake_table_am_routine::initialize() routine.relation_size = deeplake_relation_size; routine.relation_estimate_size = deeplake_estimate_rel_size; routine.scan_analyze_next_tuple = deeplake_scan_analyze_next_tuple; -#if PG_VERSION_NUM >= PG_VERSION_NUM_17 routine.scan_analyze_next_block = deeplake_scan_analyze_next_block; -#endif routine.parallelscan_initialize = parallelscan_initialize; routine.parallelscan_estimate = parallelscan_estimate; diff --git a/postgres/scripts/build_deb.sh b/postgres/scripts/build_deb.sh index 24d3d1c073..0e50671a47 100644 --- a/postgres/scripts/build_deb.sh +++ b/postgres/scripts/build_deb.sh @@ -42,10 +42,10 @@ usage() { echo -e " repository: Repository directory path" echo -e " architecture: amd64 or arm64" echo -e " gpg-keyid: GPG key ID for signing" - echo -e " supported-versions: Comma-separated PostgreSQL versions (e.g., 16,17,18)" + echo -e " supported-versions: Comma-separated PostgreSQL versions (e.g., 17,18)" echo -e "\nExamples:" - echo -e " bash $0 4.4.4-1 /tmp/repo arm64 1F8B584DBEA11E9D 16,17,18" - echo -e " bash $0 /tmp/repo arm64 1F8B584DBEA11E9D 16,17,18 # Auto-detect version" + echo -e " bash $0 4.4.4-1 /tmp/repo arm64 1F8B584DBEA11E9D 17,18" + echo -e " bash $0 /tmp/repo arm64 1F8B584DBEA11E9D 17,18 # Auto-detect version" } # Error handling function diff --git a/postgres/scripts/install.sh b/postgres/scripts/install.sh index 7e92983922..d405c9860d 100644 --- a/postgres/scripts/install.sh +++ b/postgres/scripts/install.sh @@ -63,11 +63,11 @@ check_postgres() { if command_exists psql; then psql_version=$(psql -V | awk '{print $3}' | cut -d'.' -f1) case "$psql_version" in - 14 | 15 | 16 | 17) + 17 | 18) log "PostgreSQL version $psql_version detected." ;; *) - handle_error 1 "Unsupported PostgreSQL version: $psql_version. Only versions 14, 15, 16, and 17 are supported." + handle_error 1 "Unsupported PostgreSQL version: $psql_version. Only versions 17 and 18 are supported." ;; esac else diff --git a/scripts/build_pg_ext.py b/scripts/build_pg_ext.py index 01e2e62198..ba0ee156a9 100644 --- a/scripts/build_pg_ext.py +++ b/scripts/build_pg_ext.py @@ -11,8 +11,8 @@ Usage: python3 scripts/build_pg_ext.py prod #Release build Usage: python3 scripts/build_pg_ext.py debug --deeplake-shared #Debug build with shared deeplake_api linking Usage: python3 scripts/build_pg_ext.py debug --deeplake-static #Debug build with static deeplake_api linking (force) -Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16,17,18 #Build for PostgreSQL 16, 17, and 18 -Usage: python3 scripts/build_pg_ext.py dev --pg-versions 16 #Build for PostgreSQL 16 only +Usage: python3 scripts/build_pg_ext.py dev --pg-versions 17,18 #Build for PostgreSQL 17 and 18 +Usage: python3 scripts/build_pg_ext.py dev --pg-versions 17 #Build for PostgreSQL 17 only Usage: python3 scripts/build_pg_ext.py prod --pg-versions all #Build for all supported PostgreSQL versions """ @@ -141,7 +141,7 @@ def run(mode: str, incremental: bool, deeplake_link_type: str = None, pg_version # Add PostgreSQL version options if specified if pg_versions is not None: - supported_versions = {16, 17, 18} + supported_versions = {17, 18} # Set all versions to OFF by default for ver in supported_versions: if ver in pg_versions: @@ -234,20 +234,20 @@ def write_mode(mode: str): i += 1 elif arg == "--pg-versions": if i + 1 >= len(sys.argv): - raise Exception("--pg-versions requires a value (e.g., '16,17,18' or 'all')") + raise Exception("--pg-versions requires a value (e.g., '17,18' or 'all')") versions_str = sys.argv[i + 1] if versions_str == "all": - pg_versions = [16, 17, 18] + pg_versions = [17, 18] else: try: pg_versions = [int(v.strip()) for v in versions_str.split(',')] # Validate versions - supported = {16, 17, 18} + supported = {17, 18} invalid = set(pg_versions) - supported if invalid: raise Exception(f"Invalid PostgreSQL versions: {invalid}. Supported: {supported}") except ValueError: - raise Exception(f"Invalid --pg-versions format: '{versions_str}'. Use comma-separated numbers (e.g., '16,17,18') or 'all'") + raise Exception(f"Invalid --pg-versions format: '{versions_str}'. Use comma-separated numbers (e.g., '17,18') or 'all'") i += 2 else: raise Exception(f"Invalid option '{arg}'. Use --deeplake-shared, --deeplake-static, or --pg-versions") From 2cf9e84dc3434c80543cea9905cfd7043abc3c1f Mon Sep 17 00:00:00 2001 From: khustup2 Date: Sat, 14 Feb 2026 19:23:42 +0000 Subject: [PATCH 2/2] Added wal based stateless logging. --- cpp/deeplake_pg/dl_catalog.cpp | 630 ------------------ cpp/deeplake_pg/dl_catalog.hpp | 70 -- cpp/deeplake_pg/dl_wal.cpp | 416 ++++++++++++ cpp/deeplake_pg/dl_wal.hpp | 66 ++ cpp/deeplake_pg/extension_init.cpp | 153 ++++- cpp/deeplake_pg/logger.hpp | 2 +- cpp/deeplake_pg/pg_deeplake.cpp | 25 + cpp/deeplake_pg/sync_worker.cpp | 334 ++++++---- cpp/deeplake_pg/table_data.hpp | 12 + cpp/deeplake_pg/table_data_impl.hpp | 76 ++- cpp/deeplake_pg/table_storage.cpp | 351 ++++------ cpp/deeplake_pg/table_storage.hpp | 23 +- postgres/pg_deeplake--1.0.sql | 4 + postgres/tests/py_tests/conftest.py | 6 +- .../py_tests/test_concurrent_insert_index.py | 2 - .../tests/py_tests/test_create_database.py | 8 +- .../tests/py_tests/test_drop_table_column.py | 10 +- postgres/tests/py_tests/test_root_path.py | 5 +- .../tests/py_tests/test_startup_latency.py | 24 +- .../test_stateless_reserved_schema.py | 9 + .../test_transaction_abort_handling.py | 251 ++++++- 21 files changed, 1368 insertions(+), 1109 deletions(-) delete mode 100644 cpp/deeplake_pg/dl_catalog.cpp delete mode 100644 cpp/deeplake_pg/dl_catalog.hpp create mode 100644 cpp/deeplake_pg/dl_wal.cpp create mode 100644 cpp/deeplake_pg/dl_wal.hpp diff --git a/cpp/deeplake_pg/dl_catalog.cpp b/cpp/deeplake_pg/dl_catalog.cpp deleted file mode 100644 index bb10c85724..0000000000 --- a/cpp/deeplake_pg/dl_catalog.cpp +++ /dev/null @@ -1,630 +0,0 @@ -#include "dl_catalog.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -extern "C" { -#include -#include -} - -namespace pg::dl_catalog { - -namespace { - -constexpr const char* k_catalog_dir = "__deeplake_catalog"; -constexpr const char* k_tables_name = "tables"; -constexpr const char* k_columns_name = "columns"; -constexpr const char* k_indexes_name = "indexes"; -constexpr const char* k_meta_name = "meta"; -constexpr const char* k_databases_name = "databases"; - -std::string join_path(const std::string& root, const std::string& name) -{ - if (!root.empty() && root.back() == '/') { - return root + k_catalog_dir + "/" + name; - } - return root + "/" + k_catalog_dir + "/" + name; -} - -// Cache for catalog table handles to avoid repeated S3 opens -struct catalog_table_cache -{ - std::string root_path; - std::shared_ptr meta_table; - - static catalog_table_cache& instance() - { - static thread_local catalog_table_cache cache; - return cache; - } - - std::shared_ptr get_meta_table(const std::string& path, icm::string_map<> creds) - { - if (path != root_path || !meta_table) { - // Cache miss or path changed - open and cache - root_path = path; - const auto meta_path = join_path(path, k_meta_name); - meta_table = deeplake_api::open_catalog_table(meta_path, std::move(creds)).get_future().get(); - } - return meta_table; - } - - void invalidate() - { - root_path.clear(); - meta_table.reset(); - } -}; - -int64_t now_ms() -{ - using namespace std::chrono; - return duration_cast(system_clock::now().time_since_epoch()).count(); -} - -std::shared_ptr -open_catalog_table(const std::string& root_path, const std::string& name, icm::string_map<> creds) -{ - const auto path = join_path(root_path, name); - return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); -} - -template -std::vector load_vector(const nd::array& arr) -{ - std::vector out; - out.reserve(static_cast(arr.volume())); - for (int64_t i = 0; i < arr.volume(); ++i) { - out.push_back(arr.value(i)); - } - return out; -} - -std::vector load_int64_vector(const nd::array& arr) -{ - std::vector out; - out.reserve(static_cast(arr.volume())); - bool is_numeric = false; - try { - is_numeric = nd::dtype_is_numeric(arr.dtype()); - } catch (...) { - is_numeric = false; - } - if (is_numeric) { - try { - for (int64_t i = 0; i < arr.volume(); ++i) { - out.push_back(arr.value(i)); - } - return out; - } catch (...) { - out.clear(); - } - } - for (int64_t i = 0; i < arr.volume(); ++i) { - auto v = arr.value(i); - try { - out.push_back(std::stoll(std::string(v))); - } catch (...) { - out.push_back(0); - } - } - return out; -} - -} // namespace - -int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) -{ - if (root_path.empty()) { - return 0; - } - const auto tables_path = join_path(root_path, k_tables_name); - const auto columns_path = join_path(root_path, k_columns_name); - const auto indexes_path = join_path(root_path, k_indexes_name); - const auto meta_path = join_path(root_path, k_meta_name); - const auto databases_path = join_path(root_path, k_databases_name); - - try { - // Build schemas for all catalog tables - deeplake_api::catalog_table_schema tables_schema; - tables_schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("schema_name", deeplake_core::type::text(codecs::compression::null)) - .add("table_name", deeplake_core::type::text(codecs::compression::null)) - .add("dataset_path", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("table_id"); - - deeplake_api::catalog_table_schema columns_schema; - columns_schema.add("column_id", deeplake_core::type::text(codecs::compression::null)) - .add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("column_name", deeplake_core::type::text(codecs::compression::null)) - .add("pg_type", deeplake_core::type::text(codecs::compression::null)) - .add("dl_type_json", deeplake_core::type::text(codecs::compression::null)) - .add("nullable", deeplake_core::type::generic(nd::type::scalar(nd::dtype::boolean))) - .add("position", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) - .set_primary_key("column_id"); - - deeplake_api::catalog_table_schema indexes_schema; - indexes_schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("column_names", deeplake_core::type::text(codecs::compression::null)) - .add("index_type", deeplake_core::type::text(codecs::compression::null)) - .add("order_type", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) - .set_primary_key("table_id"); - - deeplake_api::catalog_table_schema meta_schema; - meta_schema.add("catalog_version", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("catalog_version"); - - deeplake_api::catalog_table_schema databases_schema; - databases_schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) - .add("owner", deeplake_core::type::text(codecs::compression::null)) - .add("encoding", deeplake_core::type::text(codecs::compression::null)) - .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) - .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) - .add("template_db", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("db_name"); - - // Launch all 5 open_or_create operations in parallel - icm::vector>> promises; - promises.reserve(5); - promises.push_back( - deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(columns_path, std::move(columns_schema), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds))); - promises.push_back( - deeplake_api::open_or_create_catalog_table(databases_path, std::move(databases_schema), icm::string_map<>(creds))); - - // Wait for all to complete - auto results = async::combine(std::move(promises)).get_future().get(); - if (results.size() != 5) { - elog(ERROR, - "Failed to initialize catalog at %s: expected 5 catalog tables, got %zu", - root_path.c_str(), - static_cast(results.size())); - } - - // Initialize meta table if empty (index 3 is meta) - auto& meta_table = results[3]; - if (meta_table) { - auto snapshot = meta_table->read().get_future().get(); - if (snapshot.row_count() == 0) { - icm::string_map row; - row["catalog_version"] = nd::adapt(static_cast(1)); - row["updated_at"] = nd::adapt(now_ms()); - meta_table->insert(std::move(row)).get_future().get(); - } - } - // Get version from the meta table we already have open (avoids a second open_catalog_table) - if (meta_table) { - return static_cast(meta_table->version().get_future().get()); - } - return 0; - } catch (const std::exception& e) { - catalog_table_cache::instance().invalidate(); - elog(ERROR, "Failed to ensure catalog at %s: %s", root_path.c_str(), e.what()); - return 0; - } catch (...) { - catalog_table_cache::instance().invalidate(); - elog(ERROR, "Failed to ensure catalog at %s: unknown error", root_path.c_str()); - return 0; - } -} - -std::vector load_tables(const std::string& root_path, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_catalog_table(root_path, k_tables_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - std::unordered_map latest; - for (const auto& row : snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto schema_it = row.find("schema_name"); - auto table_it = row.find("table_name"); - auto path_it = row.find("dataset_path"); - auto state_it = row.find("state"); - auto updated_it = row.find("updated_at"); - if (table_id_it == row.end() || schema_it == row.end() || table_it == row.end() || path_it == row.end() || - state_it == row.end() || updated_it == row.end()) { - continue; - } - - table_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.schema_name = deeplake_api::array_to_string(schema_it->second); - meta.table_name = deeplake_api::array_to_string(table_it->second); - meta.dataset_path = deeplake_api::array_to_string(path_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - - auto it = latest.find(meta.table_id); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.table_id] = std::move(meta); - } - } - - out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - out.push_back(std::move(meta)); - } - } - return out; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog tables: %s", e.what()); - return out; - } catch (...) { - elog(WARNING, "Failed to load catalog tables: unknown error"); - return out; - } -} - -std::vector load_columns(const std::string& root_path, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_catalog_table(root_path, k_columns_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - for (const auto& row : snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto column_name_it = row.find("column_name"); - auto pg_type_it = row.find("pg_type"); - auto dl_type_it = row.find("dl_type_json"); - auto nullable_it = row.find("nullable"); - auto position_it = row.find("position"); - - if (table_id_it == row.end() || column_name_it == row.end() || pg_type_it == row.end()) { - continue; - } - - column_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.column_name = deeplake_api::array_to_string(column_name_it->second); - meta.pg_type = deeplake_api::array_to_string(pg_type_it->second); - if (dl_type_it != row.end()) { - meta.dl_type_json = deeplake_api::array_to_string(dl_type_it->second); - } - if (nullable_it != row.end()) { - try { - meta.nullable = nullable_it->second.value(0); - } catch (...) { - meta.nullable = true; - } - } - if (position_it != row.end()) { - try { - meta.position = position_it->second.value(0); - } catch (...) { - auto pos_vec = load_int64_vector(position_it->second); - meta.position = pos_vec.empty() ? 0 : static_cast(pos_vec.front()); - } - } - - out.push_back(std::move(meta)); - } - return out; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog columns: %s", e.what()); - return out; - } catch (...) { - elog(WARNING, "Failed to load catalog columns: unknown error"); - return out; - } -} - -std::vector load_indexes(const std::string&, icm::string_map<>) -{ - return {}; -} - -std::pair, std::vector> -load_tables_and_columns(const std::string& root_path, icm::string_map<> creds) -{ - std::vector tables_out; - std::vector columns_out; - - try { - // Open both catalog tables in parallel - auto tables_promise = deeplake_api::open_catalog_table(join_path(root_path, k_tables_name), icm::string_map<>(creds)); - auto columns_promise = deeplake_api::open_catalog_table(join_path(root_path, k_columns_name), icm::string_map<>(creds)); - - icm::vector>> open_promises; - open_promises.push_back(std::move(tables_promise)); - open_promises.push_back(std::move(columns_promise)); - - auto catalog_tables = async::combine(std::move(open_promises)).get_future().get(); - - auto& tables_table = catalog_tables[0]; - auto& columns_table = catalog_tables[1]; - - if (!tables_table || !columns_table) { - return {tables_out, columns_out}; - } - - // Read both snapshots in parallel - auto tables_read_promise = tables_table->read(); - auto columns_read_promise = columns_table->read(); - - icm::vector> read_promises; - read_promises.push_back(std::move(tables_read_promise)); - read_promises.push_back(std::move(columns_read_promise)); - - auto snapshots = async::combine(std::move(read_promises)).get_future().get(); - - auto& tables_snapshot = snapshots[0]; - auto& columns_snapshot = snapshots[1]; - - // Process tables - if (tables_snapshot.row_count() > 0) { - std::unordered_map latest; - for (const auto& row : tables_snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto schema_it = row.find("schema_name"); - auto table_it = row.find("table_name"); - auto path_it = row.find("dataset_path"); - auto state_it = row.find("state"); - auto updated_it = row.find("updated_at"); - if (table_id_it == row.end() || schema_it == row.end() || table_it == row.end() || path_it == row.end() || - state_it == row.end() || updated_it == row.end()) { - continue; - } - - table_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.schema_name = deeplake_api::array_to_string(schema_it->second); - meta.table_name = deeplake_api::array_to_string(table_it->second); - meta.dataset_path = deeplake_api::array_to_string(path_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - - auto it = latest.find(meta.table_id); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.table_id] = std::move(meta); - } - } - - tables_out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - tables_out.push_back(std::move(meta)); - } - } - } - - // Process columns - if (columns_snapshot.row_count() > 0) { - for (const auto& row : columns_snapshot.rows()) { - auto table_id_it = row.find("table_id"); - auto column_name_it = row.find("column_name"); - auto pg_type_it = row.find("pg_type"); - auto dl_type_it = row.find("dl_type_json"); - auto nullable_it = row.find("nullable"); - auto position_it = row.find("position"); - - if (table_id_it == row.end() || column_name_it == row.end() || pg_type_it == row.end()) { - continue; - } - - column_meta meta; - meta.table_id = deeplake_api::array_to_string(table_id_it->second); - meta.column_name = deeplake_api::array_to_string(column_name_it->second); - meta.pg_type = deeplake_api::array_to_string(pg_type_it->second); - if (dl_type_it != row.end()) { - meta.dl_type_json = deeplake_api::array_to_string(dl_type_it->second); - } - if (nullable_it != row.end()) { - try { - auto nullable_vec = load_vector(nullable_it->second); - meta.nullable = !nullable_vec.empty() && nullable_vec.front() != 0; - } catch (...) { - meta.nullable = true; - } - } - if (position_it != row.end()) { - try { - auto pos_vec = load_vector(position_it->second); - meta.position = pos_vec.empty() ? 0 : pos_vec.front(); - } catch (...) { - auto pos_vec = load_int64_vector(position_it->second); - meta.position = pos_vec.empty() ? 0 : static_cast(pos_vec.front()); - } - } - - columns_out.push_back(std::move(meta)); - } - } - - return {tables_out, columns_out}; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog tables and columns: %s", e.what()); - return {tables_out, columns_out}; - } catch (...) { - elog(WARNING, "Failed to load catalog tables and columns: unknown error"); - return {tables_out, columns_out}; - } -} - -void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta) -{ - auto table = open_catalog_table(root_path, k_tables_name, std::move(creds)); - icm::string_map row; - row["table_id"] = nd::adapt(meta.table_id); - row["schema_name"] = nd::adapt(meta.schema_name); - row["table_name"] = nd::adapt(meta.table_name); - row["dataset_path"] = nd::adapt(meta.dataset_path); - row["state"] = nd::adapt(meta.state); - row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); - table->upsert(std::move(row)).get_future().get(); -} - -void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector& columns) -{ - if (columns.empty()) { - return; - } - auto table = open_catalog_table(root_path, k_columns_name, std::move(creds)); - icm::vector> rows; - rows.reserve(columns.size()); - for (const auto& col : columns) { - icm::string_map row; - // column_id is the composite key: table_id:column_name - row["column_id"] = nd::adapt(col.table_id + ":" + col.column_name); - row["table_id"] = nd::adapt(col.table_id); - row["column_name"] = nd::adapt(col.column_name); - row["pg_type"] = nd::adapt(col.pg_type); - row["dl_type_json"] = nd::adapt(col.dl_type_json); - row["nullable"] = nd::adapt(col.nullable); - row["position"] = nd::adapt(col.position); - rows.push_back(std::move(row)); - } - table->upsert_many(std::move(rows)).get_future().get(); -} - -std::vector load_databases(const std::string& root_path, icm::string_map<> creds) -{ - std::vector out; - try { - auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); - if (!table) { - return out; - } - auto snapshot = table->read().get_future().get(); - if (snapshot.row_count() == 0) { - return out; - } - - std::unordered_map latest; - for (const auto& row : snapshot.rows()) { - auto db_name_it = row.find("db_name"); - auto owner_it = row.find("owner"); - auto encoding_it = row.find("encoding"); - auto lc_collate_it = row.find("lc_collate"); - auto lc_ctype_it = row.find("lc_ctype"); - auto template_it = row.find("template_db"); - auto state_it = row.find("state"); - auto updated_it = row.find("updated_at"); - if (db_name_it == row.end() || state_it == row.end()) { - continue; - } - - database_meta meta; - meta.db_name = deeplake_api::array_to_string(db_name_it->second); - if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second); - if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second); - if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second); - if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second); - if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second); - meta.state = deeplake_api::array_to_string(state_it->second); - if (updated_it != row.end()) { - auto updated_vec = load_int64_vector(updated_it->second); - meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); - } - - auto it = latest.find(meta.db_name); - if (it == latest.end() || it->second.updated_at <= meta.updated_at) { - latest[meta.db_name] = std::move(meta); - } - } - - out.reserve(latest.size()); - for (auto& [_, meta] : latest) { - if (meta.state == "ready") { - out.push_back(std::move(meta)); - } - } - return out; - } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog databases: %s", e.what()); - return out; - } catch (...) { - elog(WARNING, "Failed to load catalog databases: unknown error"); - return out; - } -} - -void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta) -{ - auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); - icm::string_map row; - row["db_name"] = nd::adapt(meta.db_name); - row["owner"] = nd::adapt(meta.owner); - row["encoding"] = nd::adapt(meta.encoding); - row["lc_collate"] = nd::adapt(meta.lc_collate); - row["lc_ctype"] = nd::adapt(meta.lc_ctype); - row["template_db"] = nd::adapt(meta.template_db); - row["state"] = nd::adapt(meta.state); - row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); - table->upsert(std::move(row)).get_future().get(); -} - -int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds) -{ - try { - // Use cached meta table handle to avoid repeated S3 opens - auto table = catalog_table_cache::instance().get_meta_table(root_path, std::move(creds)); - if (!table) { - return 0; - } - // Use version() for fast HEAD request instead of reading the whole table. - // Returns a hash of the ETag which changes whenever the table is modified. - return static_cast(table->version().get_future().get()); - } catch (const std::exception& e) { - elog(WARNING, "Failed to read catalog version: %s", e.what()); - catalog_table_cache::instance().invalidate(); - return 0; - } catch (...) { - elog(WARNING, "Failed to read catalog version: unknown error"); - catalog_table_cache::instance().invalidate(); - return 0; - } -} - -void bump_catalog_version(const std::string& root_path, icm::string_map<> creds) -{ - auto table = open_catalog_table(root_path, k_meta_name, std::move(creds)); - icm::string_map row; - // Use a fixed key and upsert - the updated_at timestamp change will trigger - // a new ETag, which is what get_catalog_version() now detects via version(). - row["catalog_version"] = nd::adapt(static_cast(1)); - row["updated_at"] = nd::adapt(now_ms()); - table->upsert(std::move(row)).get_future().get(); -} - -} // namespace pg::dl_catalog diff --git a/cpp/deeplake_pg/dl_catalog.hpp b/cpp/deeplake_pg/dl_catalog.hpp deleted file mode 100644 index 52d680242e..0000000000 --- a/cpp/deeplake_pg/dl_catalog.hpp +++ /dev/null @@ -1,70 +0,0 @@ -#pragma once - -#include - -#include -#include -#include - -namespace pg::dl_catalog { - -struct table_meta -{ - std::string table_id; - std::string schema_name; - std::string table_name; - std::string dataset_path; - std::string state; - int64_t updated_at = 0; -}; - -struct column_meta -{ - std::string table_id; - std::string column_name; - std::string pg_type; - std::string dl_type_json; - bool nullable = true; - int32_t position = 0; -}; - -struct index_meta -{ - std::string table_id; - std::string column_names; - std::string index_type; - int32_t order_type = 0; -}; - -struct database_meta -{ - std::string db_name; // PK - std::string owner; - std::string encoding; - std::string lc_collate; - std::string lc_ctype; - std::string template_db; - std::string state; // "ready" or "dropping" - int64_t updated_at = 0; -}; - -int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds); - -std::vector load_tables(const std::string& root_path, icm::string_map<> creds); -std::vector load_columns(const std::string& root_path, icm::string_map<> creds); -std::vector load_indexes(const std::string& root_path, icm::string_map<> creds); - -// Load tables and columns in parallel for better performance -std::pair, std::vector> -load_tables_and_columns(const std::string& root_path, icm::string_map<> creds); - -void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta); -void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector& columns); - -std::vector load_databases(const std::string& root_path, icm::string_map<> creds); -void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta); - -int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds); -void bump_catalog_version(const std::string& root_path, icm::string_map<> creds); - -} // namespace pg::dl_catalog diff --git a/cpp/deeplake_pg/dl_wal.cpp b/cpp/deeplake_pg/dl_wal.cpp new file mode 100644 index 0000000000..aab73be0af --- /dev/null +++ b/cpp/deeplake_pg/dl_wal.cpp @@ -0,0 +1,416 @@ +#include "dl_wal.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +extern "C" { +#include +#include +#include +#include +} + +namespace pg::dl_wal { + +namespace { + +constexpr const char* k_catalog_dir = "__deeplake_catalog"; +constexpr const char* k_databases_name = "databases"; +constexpr const char* k_ddl_log_name = "__wal_table"; + +// Shared (cluster-wide) path: {root}/__deeplake_catalog/{name} +std::string join_path(const std::string& root, const std::string& name) +{ + if (!root.empty() && root.back() == '/') { + return root + k_catalog_dir + "/" + name; + } + return root + "/" + k_catalog_dir + "/" + name; +} + +// Per-database path: {root}/{db_name}/__deeplake_catalog/{name} +std::string join_db_path(const std::string& root, const std::string& db_name, const std::string& name) +{ + std::string base = root; + if (!base.empty() && base.back() == '/') { + base.pop_back(); + } + return base + "/" + db_name + "/" + k_catalog_dir + "/" + name; +} + +int64_t now_ms() +{ + using namespace std::chrono; + return duration_cast(system_clock::now().time_since_epoch()).count(); +} + +// Open a shared (cluster-wide) catalog table +std::shared_ptr +open_catalog_table(const std::string& root_path, const std::string& name, icm::string_map<> creds) +{ + const auto path = join_path(root_path, name); + return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); +} + +// Create the WAL dataset with schema. Called once from ensure_db_catalog. +void create_ddl_dataset(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + const auto path = join_db_path(root_path, db_name, k_ddl_log_name); + bool exists = false; + try { + exists = deeplake_api::exists(path, icm::string_map<>(creds)).get_future().get(); + } catch (...) { + exists = false; + } + if (exists) { + return; + } + + auto ds = deeplake_api::create(path, std::move(creds)).get_future().get(); + ds->add_column("seq", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))); + ds->add_column("origin_instance_id", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("search_path", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("command_tag", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("object_identity", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("ddl_sql", deeplake_core::type::text(codecs::compression::null)); + ds->add_column("timestamp", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))); + ds->commit().get_future().get(); +} + +// Thread-local cached WAL dataset handle for append (hot path). +struct ddl_dataset_cache +{ + std::string key; // root_path + "\t" + db_name + std::shared_ptr ds; + + static ddl_dataset_cache& instance() + { + static thread_local ddl_dataset_cache cache; + return cache; + } + + std::shared_ptr get(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) + { + const auto k = root_path + "\t" + db_name; + if (k == key && ds) { + return ds; + } + const auto path = join_db_path(root_path, db_name, k_ddl_log_name); + ds = deeplake_api::open(path, std::move(creds)).get_future().get(); + key = k; + return ds; + } + + void invalidate() + { + key.clear(); + ds.reset(); + } +}; + +std::vector load_int64_vector(const nd::array& arr) +{ + std::vector out; + out.reserve(static_cast(arr.volume())); + bool is_numeric = false; + try { + is_numeric = nd::dtype_is_numeric(arr.dtype()); + } catch (...) { + is_numeric = false; + } + if (is_numeric) { + try { + for (int64_t i = 0; i < arr.volume(); ++i) { + out.push_back(arr.value(i)); + } + return out; + } catch (...) { + out.clear(); + } + } + for (int64_t i = 0; i < arr.volume(); ++i) { + auto v = arr.value(i); + try { + out.push_back(std::stoll(std::string(v))); + } catch (...) { + out.push_back(0); + } + } + return out; +} + +deeplake_api::catalog_table_schema make_databases_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) + .add("owner", deeplake_core::type::text(codecs::compression::null)) + .add("encoding", deeplake_core::type::text(codecs::compression::null)) + .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) + .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) + .add("template_db", deeplake_core::type::text(codecs::compression::null)) + .add("state", deeplake_core::type::text(codecs::compression::null)) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("db_name"); + return schema; +} + +} // namespace + +int64_t next_ddl_seq() +{ + static thread_local int64_t counter = 0; + static thread_local int64_t last_ms = 0; + const int64_t ms = now_ms(); + if (ms == last_ms) { + counter++; + } else { + counter = 0; + last_ms = ms; + } + return ms * 1000 + counter; +} + +std::string local_instance_id() +{ + char hostname[256] = {0}; + if (gethostname(hostname, sizeof(hostname) - 1) != 0) { + strlcpy(hostname, "unknown_host", sizeof(hostname)); + } + const char* port = GetConfigOption("port", true, false); + const std::string data_dir = DataDir != nullptr ? std::string(DataDir) : std::string("unknown_data_dir"); + const std::string host_str(hostname); + const std::string port_str = port != nullptr ? std::string(port) : std::string("unknown_port"); + return host_str + ":" + port_str + ":" + data_dir; +} + +void ensure_catalog(const std::string& root_path, icm::string_map<> creds) +{ + if (root_path.empty()) { + return; + } + const auto databases_path = join_path(root_path, k_databases_name); + + try { + deeplake_api::open_or_create_catalog_table(databases_path, make_databases_schema(), std::move(creds)) + .get_future() + .get(); + } catch (const std::exception& e) { + elog(ERROR, "Failed to ensure shared catalog at %s: %s", root_path.c_str(), e.what()); + } catch (...) { + elog(ERROR, "Failed to ensure shared catalog at %s: unknown error", root_path.c_str()); + } +} + +void ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + if (root_path.empty() || db_name.empty()) { + return; + } + + try { + create_ddl_dataset(root_path, db_name, std::move(creds)); + } catch (const std::exception& e) { + elog(ERROR, "Failed to ensure per-db catalog at %s/%s: %s", root_path.c_str(), db_name.c_str(), e.what()); + } catch (...) { + elog(ERROR, "Failed to ensure per-db catalog at %s/%s: unknown error", root_path.c_str(), db_name.c_str()); + } +} + +std::vector load_databases(const std::string& root_path, icm::string_map<> creds) +{ + std::vector out; + try { + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + if (!table) { + return out; + } + auto snapshot = table->read().get_future().get(); + if (snapshot.row_count() == 0) { + return out; + } + + std::unordered_map latest; + for (const auto& row : snapshot.rows()) { + auto db_name_it = row.find("db_name"); + auto owner_it = row.find("owner"); + auto encoding_it = row.find("encoding"); + auto lc_collate_it = row.find("lc_collate"); + auto lc_ctype_it = row.find("lc_ctype"); + auto template_it = row.find("template_db"); + auto state_it = row.find("state"); + auto updated_it = row.find("updated_at"); + if (db_name_it == row.end() || state_it == row.end()) { + continue; + } + + database_meta meta; + meta.db_name = deeplake_api::array_to_string(db_name_it->second); + if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second); + if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second); + if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second); + if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second); + if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second); + meta.state = deeplake_api::array_to_string(state_it->second); + if (updated_it != row.end()) { + auto updated_vec = load_int64_vector(updated_it->second); + meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); + } + + auto it = latest.find(meta.db_name); + if (it == latest.end() || it->second.updated_at <= meta.updated_at) { + latest[meta.db_name] = std::move(meta); + } + } + + out.reserve(latest.size()); + for (auto& [_, meta] : latest) { + if (meta.state == "ready") { + out.push_back(std::move(meta)); + } + } + return out; + } catch (const std::exception& e) { + elog(WARNING, "Failed to load catalog databases: %s", e.what()); + return out; + } catch (...) { + elog(WARNING, "Failed to load catalog databases: unknown error"); + return out; + } +} + +void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta) +{ + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + icm::string_map row; + row["db_name"] = nd::adapt(meta.db_name); + row["owner"] = nd::adapt(meta.owner); + row["encoding"] = nd::adapt(meta.encoding); + row["lc_collate"] = nd::adapt(meta.lc_collate); + row["lc_ctype"] = nd::adapt(meta.lc_ctype); + row["template_db"] = nd::adapt(meta.template_db); + row["state"] = nd::adapt(meta.state); + row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); + table->upsert(std::move(row)).get_future().get(); +} + +int64_t get_databases_version(const std::string& root_path, icm::string_map<> creds) +{ + try { + auto table = open_catalog_table(root_path, k_databases_name, std::move(creds)); + if (!table) { + return 0; + } + return static_cast(table->version().get_future().get()); + } catch (const std::exception& e) { + elog(WARNING, "Failed to read databases catalog version: %s", e.what()); + return 0; + } catch (...) { + elog(WARNING, "Failed to read databases catalog version: unknown error"); + return 0; + } +} + +std::shared_ptr +open_ddl_log_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + return ddl_dataset_cache::instance().get(root_path, db_name, std::move(creds)); +} + +void append_ddl_log(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, + const ddl_log_entry& entry) +{ + try { + auto ds = ddl_dataset_cache::instance().get(root_path, db_name, std::move(creds)); + ds->set_auto_commit_enabled(false).get_future().get(); + icm::string_map row; + row["seq"] = nd::adapt(entry.seq); + row["origin_instance_id"] = nd::adapt(entry.origin_instance_id); + row["search_path"] = nd::adapt(entry.search_path); + row["command_tag"] = nd::adapt(entry.command_tag); + row["object_identity"] = nd::adapt(entry.object_identity); + row["ddl_sql"] = nd::adapt(entry.ddl_sql); + row["timestamp"] = nd::adapt(entry.timestamp == 0 ? now_ms() : entry.timestamp); + ds->append_row(row).get_future().get(); + ds->commit().get_future().get(); + } catch (...) { + ddl_dataset_cache::instance().invalidate(); + throw; + } +} + +std::vector load_ddl_log(const std::string& root_path, const std::string& db_name, + icm::string_map<> creds, int64_t after_seq) +{ + std::vector out; + try { + auto ds = ddl_dataset_cache::instance().get(root_path, db_name, std::move(creds)); + if (!ds) { + return out; + } + ds->refresh().get_future().get(); + const int64_t row_count = ds->num_rows(); + if (row_count == 0) { + return out; + } + + auto seq_arr = ds->get_column("seq").request_range(0, row_count, {}).get_future().get(); + auto origin_arr = ds->get_column("origin_instance_id").request_range(0, row_count, {}).get_future().get(); + auto search_path_arr = ds->get_column("search_path").request_range(0, row_count, {}).get_future().get(); + auto tag_arr = ds->get_column("command_tag").request_range(0, row_count, {}).get_future().get(); + auto object_arr = ds->get_column("object_identity").request_range(0, row_count, {}).get_future().get(); + auto sql_arr = ds->get_column("ddl_sql").request_range(0, row_count, {}).get_future().get(); + auto ts_arr = ds->get_column("timestamp").request_range(0, row_count, {}).get_future().get(); + + auto seq_vec = load_int64_vector(seq_arr); + auto ts_vec = load_int64_vector(ts_arr); + for (int64_t i = 0; i < row_count; ++i) { + ddl_log_entry entry; + entry.seq = i < static_cast(seq_vec.size()) ? seq_vec[static_cast(i)] : 0; + if (entry.seq <= after_seq) { + continue; + } + auto read_string = [](const nd::array& arr, int64_t idx) -> std::string { + try { + auto sub = arr[idx]; + auto bytes = sub.data(); + return std::string(reinterpret_cast(bytes.data()), bytes.size()); + } catch (...) { + try { + return std::string(arr.value(idx)); + } catch (...) { + return {}; + } + } + }; + entry.origin_instance_id = read_string(origin_arr, i); + entry.search_path = read_string(search_path_arr, i); + entry.command_tag = read_string(tag_arr, i); + entry.object_identity = read_string(object_arr, i); + entry.ddl_sql = read_string(sql_arr, i); + entry.timestamp = i < static_cast(ts_vec.size()) ? ts_vec[static_cast(i)] : 0; + out.push_back(std::move(entry)); + } + std::sort(out.begin(), out.end(), [](const ddl_log_entry& a, const ddl_log_entry& b) { + return a.seq < b.seq; + }); + return out; + } catch (const std::exception& e) { + elog(WARNING, "Failed to load DDL log for db '%s': %s", db_name.c_str(), e.what()); + return out; + } catch (...) { + elog(WARNING, "Failed to load DDL log for db '%s': unknown error", db_name.c_str()); + return out; + } +} + +} // namespace pg::dl_wal diff --git a/cpp/deeplake_pg/dl_wal.hpp b/cpp/deeplake_pg/dl_wal.hpp new file mode 100644 index 0000000000..c0d386cc69 --- /dev/null +++ b/cpp/deeplake_pg/dl_wal.hpp @@ -0,0 +1,66 @@ +#pragma once + +#include + +#include +#include +#include +#include + +namespace deeplake_api { +class dataset; +} + +namespace pg::dl_wal { + +struct database_meta +{ + std::string db_name; // PK + std::string owner; + std::string encoding; + std::string lc_collate; + std::string lc_ctype; + std::string template_db; + std::string state; // "ready" or "dropping" + int64_t updated_at = 0; +}; + +struct ddl_log_entry +{ + int64_t seq = 0; // Primary key + std::string origin_instance_id; + std::string search_path; + std::string command_tag; + std::string object_identity; + std::string ddl_sql; + int64_t timestamp = 0; +}; + +// Shared (cluster-wide) catalog: databases catalog_table +void ensure_catalog(const std::string& root_path, icm::string_map<> creds); + +// Per-database catalog: __wal_table dataset +void ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +// Shared (cluster-wide) database catalog +std::vector load_databases(const std::string& root_path, icm::string_map<> creds); +void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta); + +// Global version check via databases catalog_table +int64_t get_databases_version(const std::string& root_path, icm::string_map<> creds); + +std::shared_ptr +open_ddl_log_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +void append_ddl_log(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, + const ddl_log_entry& entry); + +std::vector load_ddl_log(const std::string& root_path, const std::string& db_name, + icm::string_map<> creds, int64_t after_seq = 0); + +int64_t next_ddl_seq(); + +// Unique identifier for this PostgreSQL instance: "hostname:port:datadir" +std::string local_instance_id(); + +} // namespace pg::dl_wal diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index b293f7e819..6d5646a062 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -9,7 +9,10 @@ extern "C" { #include #include +#include #include +#include +#include #include #include #include @@ -25,7 +28,7 @@ extern "C" { #include "column_statistics.hpp" #include "deeplake_executor.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "pg_deeplake.hpp" #include "pg_version_compat.h" #include "sync_worker.hpp" @@ -42,6 +45,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -70,6 +74,58 @@ bool stateless_enabled = false; // Enable stateless catalog sync across ins namespace { +bool is_ddl_log_suppressed() +{ + if (pg::table_storage::is_catalog_only_create()) { + return true; + } + if (creating_extension) { + return true; + } + const char* app_name = GetConfigOption("application_name", true, false); + return app_name && strcmp(app_name, "pg_deeplake_sync") == 0; +} + +void append_to_ddl_log_if_needed(const char* command_tag, const char* object_identity, const char* query_string) +{ + if (!pg::stateless_enabled || is_ddl_log_suppressed()) { + return; + } + if (query_string == nullptr || query_string[0] == '\0') { + return; + } + + auto root_path = pg::session_credentials::get_root_path(); + if (root_path.empty()) { + root_path = pg::utils::get_deeplake_root_directory(); + } + if (root_path.empty()) { + return; + } + + try { + auto creds = pg::session_credentials::get_credentials(); + const char* dbname = get_database_name(MyDatabaseId); + std::string db_name = dbname ? dbname : "postgres"; + if (dbname) { + pfree(const_cast(dbname)); + } + + pg::dl_wal::ddl_log_entry entry; + entry.seq = pg::dl_wal::next_ddl_seq(); + entry.origin_instance_id = pg::dl_wal::local_instance_id(); + const char* current_search_path = GetConfigOption("search_path", true, false); + entry.search_path = current_search_path != nullptr ? current_search_path : ""; + entry.command_tag = command_tag != nullptr ? command_tag : ""; + entry.object_identity = object_identity != nullptr ? object_identity : ""; + entry.ddl_sql = query_string; + + pg::dl_wal::append_ddl_log(root_path, db_name, creds, entry); + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake: failed to append DDL to WAL log: %s", e.what()); + } +} + bool is_count_star(TargetEntry* node) { if (node == nullptr || node->expr == nullptr || !IsA(node->expr, Aggref)) { @@ -586,6 +642,8 @@ static void process_utility(PlannedStmt* pstmt, } } } + + // DROP SCHEMA propagation is handled by DDL WAL logging post-hook. } } else if (stmt->removeType == OBJECT_DATABASE) { const char* query = "SELECT nspname, relname " @@ -691,11 +749,11 @@ static void process_utility(PlannedStmt* pstmt, } if (!root_path.empty()) { auto creds = pg::session_credentials::get_credentials(); - pg::dl_catalog::database_meta db_meta; + pg::dl_wal::ensure_catalog(root_path, creds); + pg::dl_wal::database_meta db_meta; db_meta.db_name = dbstmt->dbname; db_meta.state = "dropping"; - pg::dl_catalog::upsert_database(root_path, creds, db_meta); - pg::dl_catalog::bump_catalog_version(root_path, creds); + pg::dl_wal::upsert_database(root_path, creds, db_meta); elog(LOG, "pg_deeplake: marked database '%s' as dropping in catalog", dbstmt->dbname); } } catch (const std::exception& e) { @@ -727,7 +785,8 @@ static void process_utility(PlannedStmt* pstmt, } if (!root_path.empty()) { auto creds = pg::session_credentials::get_credentials(); - pg::dl_catalog::database_meta db_meta; + pg::dl_wal::ensure_catalog(root_path, creds); + pg::dl_wal::database_meta db_meta; db_meta.db_name = dbstmt->dbname; db_meta.state = "ready"; @@ -748,8 +807,7 @@ static void process_utility(PlannedStmt* pstmt, } } - pg::dl_catalog::upsert_database(root_path, creds, db_meta); - pg::dl_catalog::bump_catalog_version(root_path, creds); + pg::dl_wal::upsert_database(root_path, creds, db_meta); elog(DEBUG1, "pg_deeplake: recorded CREATE DATABASE '%s' in catalog", dbstmt->dbname); } } catch (const std::exception& e) { @@ -758,6 +816,14 @@ static void process_utility(PlannedStmt* pstmt, } } + // Post-hook: record CREATE SCHEMA in DDL WAL log + if (IsA(pstmt->utilityStmt, CreateSchemaStmt)) { + CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt; + if (schemastmt->schemaname != nullptr) { + append_to_ddl_log_if_needed("CREATE SCHEMA", schemastmt->schemaname, queryString); + } + } + // Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset if (IsA(pstmt->utilityStmt, AlterTableStmt)) { AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt; @@ -1256,6 +1322,59 @@ static void process_utility(PlannedStmt* pstmt, } } } + + if (nodeTag(pstmt->utilityStmt) == T_DropStmt) { + DropStmt* stmt = (DropStmt*)pstmt->utilityStmt; + if (stmt->removeType == OBJECT_SCHEMA) { + append_to_ddl_log_if_needed("DROP SCHEMA", nullptr, queryString); + } else if (stmt->removeType == OBJECT_TABLE) { + append_to_ddl_log_if_needed("DROP TABLE", nullptr, queryString); + } else if (stmt->removeType == OBJECT_INDEX) { + append_to_ddl_log_if_needed("DROP INDEX", nullptr, queryString); + } else if (stmt->removeType == OBJECT_VIEW) { + append_to_ddl_log_if_needed("DROP VIEW", nullptr, queryString); + } + } + + if (IsA(pstmt->utilityStmt, CreateStmt)) { + CreateStmt* stmt = (CreateStmt*)pstmt->utilityStmt; + if (stmt->accessMethod != nullptr && std::strcmp(stmt->accessMethod, "deeplake") == 0) { + const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->relation->relname; + append_to_ddl_log_if_needed("CREATE TABLE", object_id.c_str(), queryString); + } + } + + if (IsA(pstmt->utilityStmt, AlterTableStmt)) { + if (queryString != nullptr && strncasecmp(queryString, "ALTER TABLE", 11) == 0) { + AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt; + const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->relation->relname; + append_to_ddl_log_if_needed("ALTER TABLE", object_id.c_str(), queryString); + } + } + + if (IsA(pstmt->utilityStmt, IndexStmt)) { + if (queryString && strncasecmp(queryString, "CREATE", 6) == 0 && + strncasecmp(queryString, "CREATE TABLE", 12) != 0) { + IndexStmt* stmt = (IndexStmt*)pstmt->utilityStmt; + const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->relation->relname; + append_to_ddl_log_if_needed("CREATE INDEX", object_id.c_str(), queryString); + } + } + + if (IsA(pstmt->utilityStmt, ViewStmt)) { + ViewStmt* stmt = (ViewStmt*)pstmt->utilityStmt; + const char* schema_name = stmt->view->schemaname ? stmt->view->schemaname : "public"; + std::string object_id = std::string(schema_name) + "." + stmt->view->relname; + append_to_ddl_log_if_needed("CREATE VIEW", object_id.c_str(), queryString); + } + + if (IsA(pstmt->utilityStmt, RenameStmt)) { + append_to_ddl_log_if_needed("RENAME", nullptr, queryString); + } + if (IsA(pstmt->utilityStmt, VariableSetStmt)) { VariableSetStmt* vstmt = (VariableSetStmt*)pstmt->utilityStmt; if (vstmt->name != nullptr && pg_strcasecmp(vstmt->name, "search_path") == 0) { @@ -1533,26 +1652,6 @@ static void executor_end(QueryDesc* query_desc) } if (pg::query_info::is_in_executor_context(query_desc)) { - if (query_desc->operation == CMD_INSERT || query_desc->operation == CMD_UPDATE || - query_desc->operation == CMD_DELETE || query_desc->operation == CMD_UTILITY) { - // Use PG_TRY/CATCH to handle errors during flush without cascading aborts - PG_TRY(); - { - if (!pg::table_storage::instance().flush_all()) { - pg::table_storage::instance().rollback_all(); - ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to flush table storage"))); - } - } - PG_CATCH(); - { - // Error occurred during flush - rollback and suppress to prevent cascade - // This prevents "Deeplake does not support transaction aborts" cascade - pg::table_storage::instance().rollback_all(); - // Don't re-throw - let the transaction abort naturally - FlushErrorState(); - } - PG_END_TRY(); - } pg::query_info::pop_context(query_desc); pg::table_storage::instance().reset_requested_columns(); } diff --git a/cpp/deeplake_pg/logger.hpp b/cpp/deeplake_pg/logger.hpp index a47da65094..74a1a14dc7 100644 --- a/cpp/deeplake_pg/logger.hpp +++ b/cpp/deeplake_pg/logger.hpp @@ -37,7 +37,7 @@ class logger_adapter : public base::logger_adapter elog(DEBUG1, "%s", message.c_str()); break; case base::log_level::info: - elog(INFO, "%s", message.c_str()); + elog(LOG, "%s", message.c_str()); break; case base::log_level::warning: elog(WARNING, "%s", message.c_str()); diff --git a/cpp/deeplake_pg/pg_deeplake.cpp b/cpp/deeplake_pg/pg_deeplake.cpp index 93449a6d2c..99e62bc7f0 100644 --- a/cpp/deeplake_pg/pg_deeplake.cpp +++ b/cpp/deeplake_pg/pg_deeplake.cpp @@ -1,6 +1,8 @@ #include "pg_deeplake.hpp" +#include "dl_wal.hpp" #include "logger.hpp" #include "table_storage.hpp" +#include "utils.hpp" #include #include @@ -9,6 +11,9 @@ extern "C" { #endif +#include +#include +#include #include #ifdef __cplusplus @@ -260,6 +265,8 @@ void save_index_metadata(Oid oid) if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata"))); } + + // Cross-instance propagation is driven by DDL WAL logging in ProcessUtility. } void load_index_metadata() @@ -353,6 +360,23 @@ void deeplake_xact_callback(XactEvent event, void *arg) } } +void deeplake_subxact_callback(SubXactEvent event, + SubTransactionId my_subid, + SubTransactionId parent_subid, + void* arg) +{ + switch (event) { + case SUBXACT_EVENT_ABORT_SUB: + pg::table_storage::instance().rollback_subxact(my_subid); + break; + case SUBXACT_EVENT_COMMIT_SUB: + pg::table_storage::instance().commit_subxact(my_subid, parent_subid); + break; + default: + break; + } +} + void init_deeplake() { static bool initialized = false; @@ -377,6 +401,7 @@ void init_deeplake() pg::table_storage::instance(); /// initialize table storage RegisterXactCallback(deeplake_xact_callback, nullptr); + RegisterSubXactCallback(deeplake_subxact_callback, nullptr); } } // namespace pg diff --git a/cpp/deeplake_pg/sync_worker.cpp b/cpp/deeplake_pg/sync_worker.cpp index 051d52fc5f..d3cb31bbde 100644 --- a/cpp/deeplake_pg/sync_worker.cpp +++ b/cpp/deeplake_pg/sync_worker.cpp @@ -28,16 +28,21 @@ extern "C" { #include "sync_worker.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "table_storage.hpp" #include "utils.hpp" #include +#include #include +#include +#include +#include +#include #include // GUC variables -int deeplake_sync_interval_ms = 2000; // Default 2 seconds +int deeplake_sync_interval_ms = 1000; // Default 1 second // Forward declaration (defined in the anonymous namespace below) namespace { bool execute_via_libpq(const char* dbname, const char* sql); } @@ -125,6 +130,75 @@ void pending_install_queue::drain_and_install() namespace { +std::string wal_checkpoint_file_path() +{ + if (DataDir == nullptr) { + return "/tmp/pg_deeplake_wal_checkpoints.tsv"; + } + return std::string(DataDir) + "/pg_deeplake_wal_checkpoints.tsv"; +} + +std::string checkpoint_key(const std::string& root_path, const std::string& db_name) +{ + return root_path + "\t" + db_name; +} + +void load_wal_checkpoints(std::unordered_map& checkpoints) +{ + checkpoints.clear(); + std::ifstream in(wal_checkpoint_file_path()); + if (!in.is_open()) { + return; + } + + std::string line; + while (std::getline(in, line)) { + if (line.empty()) { + continue; + } + std::istringstream ss(line); + std::string root_path; + std::string db_name; + std::string seq_str; + if (!std::getline(ss, root_path, '\t') || + !std::getline(ss, db_name, '\t') || + !std::getline(ss, seq_str)) { + continue; + } + try { + checkpoints[checkpoint_key(root_path, db_name)] = std::stoll(seq_str); + } catch (...) { + } + } +} + +void persist_wal_checkpoints(const std::unordered_map& checkpoints) +{ + const std::string path = wal_checkpoint_file_path(); + const std::string tmp_path = path + ".tmp"; + + std::ofstream out(tmp_path, std::ios::trunc); + if (!out.is_open()) { + elog(WARNING, "pg_deeplake sync: failed to open WAL checkpoint tmp file: %s", tmp_path.c_str()); + return; + } + + for (const auto& kv : checkpoints) { + const size_t sep = kv.first.find('\t'); + if (sep == std::string::npos) { + continue; + } + out << kv.first.substr(0, sep) << '\t' + << kv.first.substr(sep + 1) << '\t' + << kv.second << '\n'; + } + out.close(); + + if (std::rename(tmp_path.c_str(), path.c_str()) != 0) { + elog(WARNING, "pg_deeplake sync: failed to persist WAL checkpoints to %s", path.c_str()); + } +} + // Worker state - use sig_atomic_t for signal safety volatile sig_atomic_t got_sigterm = false; volatile sig_atomic_t got_sighup = false; @@ -212,7 +286,7 @@ bool execute_via_libpq(const char* dbname, const char* sql) */ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::string_map<> creds) { - auto catalog_databases = pg::dl_catalog::load_databases(root_path, creds); + auto catalog_databases = pg::dl_wal::load_databases(root_path, creds); for (const auto& db : catalog_databases) { // Skip system databases @@ -304,101 +378,153 @@ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::str } } -/** - * Sync tables from the deeplake catalog to PostgreSQL. - * - * This function checks the catalog for tables that exist in the deeplake - * catalog but not in PostgreSQL, and creates them. - */ -void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string_map<> creds) +bool is_harmless_replay_error(const char* sqlstate) { - // Load tables and columns in parallel for better performance - auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_path, creds); + if (sqlstate == nullptr) { + return false; + } + return strcmp(sqlstate, "42P07") == 0 || // duplicate_table + strcmp(sqlstate, "42P06") == 0 || // duplicate_schema + strcmp(sqlstate, "42701") == 0 || // duplicate_column + strcmp(sqlstate, "42710") == 0 || // duplicate_object + strcmp(sqlstate, "42704") == 0 || // undefined_object + strcmp(sqlstate, "42P01") == 0 || // undefined_table + strcmp(sqlstate, "3F000") == 0; // invalid_schema_name +} - for (const auto& meta : catalog_tables) { - // Skip tables marked as dropping - if (meta.state == "dropping") { +void deeplake_replay_ddl_log_for_db(const std::string& db_name, const std::string& root_path, + icm::string_map<> creds, int64_t& last_seq) +{ + auto entries = pg::dl_wal::load_ddl_log(root_path, db_name, creds, last_seq); + for (const auto& entry : entries) { + if (entry.seq > last_seq) { + last_seq = entry.seq; + } + if (entry.origin_instance_id == pg::dl_wal::local_instance_id()) { continue; } - const std::string qualified_name = meta.schema_name + "." + meta.table_name; + StringInfoData sql; + initStringInfo(&sql); + appendStringInfo(&sql, "SET application_name = 'pg_deeplake_sync'; "); + if (!entry.search_path.empty()) { + appendStringInfo(&sql, + "SELECT pg_catalog.set_config('search_path', %s, false); ", + quote_literal_cstr(entry.search_path.c_str())); + } + appendStringInfoString(&sql, entry.ddl_sql.c_str()); - // Check if table exists in PostgreSQL - auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1); - Oid relid = RangeVarGetRelid(rel, NoLock, true); + const char* port = GetConfigOption("port", true, false); + const char* socket_dir = GetConfigOption("unix_socket_directories", true, false); - if (!OidIsValid(relid)) { - // Gather columns for this table, sorted by position - std::vector table_columns; - for (const auto& col : catalog_columns) { - if (col.table_id == meta.table_id) { - table_columns.push_back(col); - } + StringInfoData conninfo; + initStringInfo(&conninfo); + appendStringInfo(&conninfo, "dbname=%s", db_name.c_str()); + if (port != nullptr) { + appendStringInfo(&conninfo, " port=%s", port); + } + if (socket_dir != nullptr) { + char* dir_copy = pstrdup(socket_dir); + char* comma = strchr(dir_copy, ','); + if (comma != nullptr) { + *comma = '\0'; + } + char* dir = dir_copy; + while (*dir == ' ') { + dir++; } - std::sort(table_columns.begin(), table_columns.end(), - [](const auto& a, const auto& b) { return a.position < b.position; }); + appendStringInfo(&conninfo, " host=%s", dir); + pfree(dir_copy); + } + + PGconn* conn = PQconnectdb(conninfo.data); + pfree(conninfo.data); + if (PQstatus(conn) != CONNECTION_OK) { + elog(WARNING, "pg_deeplake sync: libpq connect failed for '%s': %s", db_name.c_str(), PQerrorMessage(conn)); + PQfinish(conn); + pfree(sql.data); + continue; + } - if (table_columns.empty()) { - elog(DEBUG1, "pg_deeplake sync: no columns found for table %s, skipping", qualified_name.c_str()); - continue; + PGresult* res = PQexec(conn, sql.data); + const ExecStatusType status = PQresultStatus(res); + bool ok = status == PGRES_COMMAND_OK || status == PGRES_TUPLES_OK; + if (!ok) { + const char* sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + if (!is_harmless_replay_error(sqlstate)) { + elog(WARNING, + "pg_deeplake sync: DDL WAL replay failed in '%s' [%s]: %s (SQL: %.200s)", + db_name.c_str(), + entry.command_tag.c_str(), + PQerrorMessage(conn), + entry.ddl_sql.c_str()); + } else { + ok = true; } + } + if (ok) { + elog(LOG, "pg_deeplake sync: replayed %s in '%s'", entry.command_tag.c_str(), db_name.c_str()); + } + PQclear(res); + PQfinish(conn); + pfree(sql.data); + } +} - const char* qschema = quote_identifier(meta.schema_name.c_str()); - const char* qtable = quote_identifier(meta.table_name.c_str()); +/** + * Sync all databases: check per-db versions in parallel, replay new DDL WAL entries. + * + * Called OUTSIDE transaction context. + */ +void sync_all_databases( + const std::string& root_path, + icm::string_map<> creds, + std::unordered_map& last_db_seqs) +{ + // Step 1: Sync databases (create missing ones, install extension) + deeplake_sync_databases_from_catalog(root_path, creds); - // Build CREATE TABLE IF NOT EXISTS statement - StringInfoData buf; - initStringInfo(&buf); - appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable); + // Step 2: Get list of all databases from the shared catalog + auto databases = pg::dl_wal::load_databases(root_path, creds); - bool first = true; - for (const auto& col : table_columns) { - if (!first) { - appendStringInfoString(&buf, ", "); - } - first = false; - appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); - } - appendStringInfo(&buf, ") USING deeplake"); - - // Wrap in subtransaction so that if another backend concurrently - // creates the same table (race on composite type), the error is - // caught and we continue instead of aborting the sync cycle. - MemoryContext saved_context = CurrentMemoryContext; - ResourceOwner saved_owner = CurrentResourceOwner; - - BeginInternalSubTransaction(NULL); - PG_TRY(); - { - pg::utils::spi_connector connector; - - // Create schema if needed - StringInfoData schema_buf; - initStringInfo(&schema_buf); - appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema); - SPI_execute(schema_buf.data, false, 0); - pfree(schema_buf.data); - - if (SPI_execute(buf.data, false, 0) == SPI_OK_UTILITY) { - elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str()); - } + // Always include "postgres" which may not be in the databases catalog + bool has_postgres = false; + for (const auto& db : databases) { + if (db.db_name == "postgres") { has_postgres = true; break; } + } + if (!has_postgres) { + pg::dl_wal::database_meta pg_meta; + pg_meta.db_name = "postgres"; + pg_meta.state = "ready"; + databases.push_back(std::move(pg_meta)); + } - ReleaseCurrentSubTransaction(); - } - PG_CATCH(); - { - // Another backend created this table concurrently — not an error. - MemoryContextSwitchTo(saved_context); - CurrentResourceOwner = saved_owner; - RollbackAndReleaseCurrentSubTransaction(); - FlushErrorState(); - elog(DEBUG1, "pg_deeplake sync: concurrent creation of %s, skipping", qualified_name.c_str()); + // Step 3: For each database, replay DDL WAL entries + // (cheap if no new entries due to after_seq filtering) + bool checkpoints_updated = false; + for (const auto& db : databases) { + if (db.db_name == "template0" || db.db_name == "template1") { + continue; + } + try { + const std::string key = checkpoint_key(root_path, db.db_name); + int64_t& last_seq = last_db_seqs[key]; + const int64_t prev_seq = last_seq; + deeplake_replay_ddl_log_for_db(db.db_name, root_path, creds, last_seq); + if (last_seq != prev_seq) { + checkpoints_updated = true; } - PG_END_TRY(); - - pfree(buf.data); + elog(LOG, "pg_deeplake sync: replayed DDL WAL for '%s' (last_seq=%ld)", db.db_name.c_str(), last_seq); + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db.db_name.c_str(), e.what()); + } catch (...) { + elog(WARNING, "pg_deeplake sync: failed to sync database '%s': unknown error", db.db_name.c_str()); } } + + if (checkpoints_updated) { + persist_wal_checkpoints(last_db_seqs); + } } } // anonymous namespace @@ -421,6 +547,8 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) int64_t last_catalog_version = 0; std::string last_root_path; // Track root_path to detect changes + std::unordered_map last_db_seqs; + load_wal_checkpoints(last_db_seqs); while (!got_sigterm) { @@ -433,6 +561,10 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) ProcessConfigFile(PGC_SIGHUP); } + // Always drain pending extension installs first so CREATE DATABASE + // async installs are not starved behind expensive sync work. + pg::pending_install_queue::drain_and_install(); + // Variables to carry state across transaction boundaries // (declared before goto target to avoid crossing initialization) std::string sync_root_path; @@ -467,15 +599,16 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) if (!last_root_path.empty()) { pg::table_storage::instance().reset_and_load_table_metadata(); last_catalog_version = 0; + last_db_seqs.clear(); } last_root_path = root_path; } - // Use existing catalog version API to check for changes (now fast with cache) - int64_t current_version = pg::dl_catalog::get_catalog_version(root_path, creds); + // Fast global version check via databases catalog_table + int64_t current_version = pg::dl_wal::get_databases_version(root_path, creds); if (current_version != last_catalog_version) { - // Save state for database sync (which happens outside transaction) + // Save state for sync (which happens outside transaction) sync_root_path = root_path; sync_creds = creds; need_sync = true; @@ -494,43 +627,20 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) PopActiveSnapshot(); CommitTransactionCommand(); - // Sync databases via libpq OUTSIDE transaction context - // (CREATE DATABASE cannot run inside a transaction block) + // All sync happens OUTSIDE transaction context via libpq if (need_sync && !sync_root_path.empty()) { try { - deeplake_sync_databases_from_catalog(sync_root_path, sync_creds); + sync_all_databases(sync_root_path, sync_creds, last_db_seqs); + elog(DEBUG1, "pg_deeplake sync: completed (global version %ld)", last_catalog_version); } catch (const std::exception& e) { - elog(WARNING, "pg_deeplake sync: database sync failed: %s", e.what()); + elog(WARNING, "pg_deeplake sync: sync failed: %s", e.what()); } catch (...) { - elog(WARNING, "pg_deeplake sync: database sync failed: unknown error"); - } - - // Re-enter transaction for table sync - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - PG_TRY(); - { - deeplake_sync_tables_from_catalog(sync_root_path, sync_creds); - elog(DEBUG1, "pg_deeplake sync: synced (catalog version %ld)", last_catalog_version); - } - PG_CATCH(); - { - EmitErrorReport(); - FlushErrorState(); + elog(WARNING, "pg_deeplake sync: sync failed: unknown error"); } - PG_END_TRY(); - - PopActiveSnapshot(); - CommitTransactionCommand(); } pgstat_report_stat(true); - // Drain any databases queued for async extension install - pg::pending_install_queue::drain_and_install(); - wait_for_latch: // Wait for latch or timeout (void)WaitLatch(MyLatch, diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index 2bd45b63f3..a3bbd9115e 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,14 @@ using string_stream_array_holder = nd::string_stream_array_holder; struct table_data { + struct tx_snapshot + { + icm::string_map insert_rows_sizes; + size_t delete_rows_size = 0; + size_t update_rows_size = 0; + int64_t num_total_rows = 0; + }; + inline table_data(Oid table_oid, const std::string& table_name, TupleDesc tupdesc, @@ -79,6 +88,9 @@ struct table_data inline void clear_delete_rows() noexcept; inline void add_update_row(int64_t row_id, icm::string_map update_row); inline void clear_update_rows() noexcept; + inline tx_snapshot capture_tx_snapshot() const; + inline void restore_tx_snapshot(const tx_snapshot& snapshot); + inline void rollback(); inline Oid get_table_oid() const noexcept; inline bool flush(); diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index a88bb6e0e8..4575ef2c0f 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -84,12 +84,19 @@ inline table_data::table_data( inline void table_data::commit(bool show_progress) { - if (dataset_ == nullptr || !dataset_->has_uncommitted_changes()) { + if (dataset_ == nullptr) { + return; + } + const bool has_pending_ops = !insert_rows_.empty() || !delete_rows_.empty() || + !update_rows_.empty() || !insert_promises_.empty(); + if (!has_pending_ops && !dataset_->has_uncommitted_changes()) { return; } try { flush(); - impl::commit_dataset(get_dataset(), show_progress); + if (dataset_->has_uncommitted_changes()) { + impl::commit_dataset(get_dataset(), show_progress); + } } catch (const std::exception& e) { reset_insert_rows(); clear_delete_rows(); @@ -105,6 +112,66 @@ inline void table_data::commit(bool show_progress) force_refresh(); } +inline void table_data::rollback() +{ + // Clear any local staged operations first. + reset_insert_rows(); + clear_delete_rows(); + clear_update_rows(); + streamers_.reset(); + + // Reopen dataset handles to drop any in-memory uncommitted state associated + // with the current object instance. + if (dataset_) { + dataset_.reset(); + refreshing_dataset_.reset(); + open_dataset(false); + } + + // Align cached row count with the current committed dataset state. + if (dataset_) { + num_total_rows_ = dataset_->num_rows(); + } +} + +inline table_data::tx_snapshot table_data::capture_tx_snapshot() const +{ + tx_snapshot snapshot; + snapshot.delete_rows_size = delete_rows_.size(); + snapshot.update_rows_size = update_rows_.size(); + snapshot.num_total_rows = num_total_rows_; + + for (const auto& [column_name, values] : insert_rows_) { + snapshot.insert_rows_sizes[column_name] = values.size(); + } + return snapshot; +} + +inline void table_data::restore_tx_snapshot(const tx_snapshot& snapshot) +{ + // Remove or truncate staged inserts to the savepoint snapshot. + for (auto it = insert_rows_.begin(); it != insert_rows_.end();) { + auto snapshot_it = snapshot.insert_rows_sizes.find(it->first); + if (snapshot_it == snapshot.insert_rows_sizes.end()) { + it = insert_rows_.erase(it); + continue; + } + if (it->second.size() > snapshot_it->second) { + it->second.resize(snapshot_it->second); + } + ++it; + } + + if (delete_rows_.size() > snapshot.delete_rows_size) { + delete_rows_.resize(snapshot.delete_rows_size); + } + if (update_rows_.size() > snapshot.update_rows_size) { + update_rows_.resize(snapshot.update_rows_size); + } + num_total_rows_ = snapshot.num_total_rows; + streamers_.reset(); +} + inline void table_data::open_dataset(bool create) { elog(DEBUG1, "Opening dataset at path: %s (create=%s)", dataset_path_.url().c_str(), create ? "true" : "false"); @@ -116,6 +183,9 @@ inline void table_data::open_dataset(bool create) dataset_ = deeplake_api::open(dataset_path_, std::move(creds)).get_future().get(); } ASSERT(dataset_ != nullptr); + // PostgreSQL transaction boundaries must control durability. Disable + // DeepLake auto-commit so writes are only persisted on explicit commit(). + dataset_->set_auto_commit_enabled(false).get_future().get(); num_total_rows_ = dataset_->num_rows(); // Enable logging if GUC parameter is set @@ -347,7 +417,7 @@ inline void table_data::add_insert_slots(int32_t nslots, TupleTableSlot** slots) } num_total_rows_ += nslots; const auto num_inserts = insert_rows_.begin()->second.size(); - if (num_inserts >= 512) { + if (num_inserts >= 512 && GetCurrentTransactionNestLevel() <= 1) { flush_inserts(); } } diff --git a/cpp/deeplake_pg/table_storage.cpp b/cpp/deeplake_pg/table_storage.cpp index b4f67ad7b8..7aeaf49450 100644 --- a/cpp/deeplake_pg/table_storage.cpp +++ b/cpp/deeplake_pg/table_storage.cpp @@ -12,6 +12,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -31,11 +32,12 @@ extern "C" { #include "table_storage.hpp" -#include "dl_catalog.hpp" +#include "dl_wal.hpp" #include "exceptions.hpp" #include "logger.hpp" #include "memory_tracker.hpp" #include "nd_utils.hpp" +#include "pg_version_compat.h" #include "table_ddl_lock.hpp" #include "table_scan.hpp" #include "utils.hpp" @@ -130,6 +132,15 @@ void convert_pg_to_nd(const pg::table_data& table_data, } } +std::string get_current_database_name() +{ + const char* dbname = get_database_name(MyDatabaseId); + if (!dbname) return "postgres"; + std::string result(dbname); + pfree(const_cast(dbname)); + return result; +} + } // unnamed namespace namespace pg { @@ -163,6 +174,11 @@ icm::string_map<> session_credentials::get_credentials() std::string session_credentials::get_root_path() { + // Environment variable takes priority over per-session GUC + auto root = base::getenv("DEEPLAKE_ROOT_PATH", ""); + if (!root.empty()) { + return root; + } if (root_path_guc_string != nullptr && std::strlen(root_path_guc_string) > 0) { return std::string(root_path_guc_string); } @@ -225,55 +241,7 @@ void table_storage::save_table_metadata(const pg::table_data& table_data) return true; }); - // Also write into Deep Lake catalog for stateless multi-instance support. - // Skip when in catalog-only mode — the data was synced FROM the S3 catalog, - // so writing back would be redundant and add unnecessary S3 latency. - if (pg::stateless_enabled && !is_catalog_only_create()) { - const auto root_dir = []() { - auto root = session_credentials::get_root_path(); - if (root.empty()) { - root = pg::utils::get_deeplake_root_directory(); - } - return root; - }(); - if (root_dir.empty()) { - return; - } - auto creds = session_credentials::get_credentials(); - pg::dl_catalog::ensure_catalog(root_dir, creds); - - auto [schema_name, simple_table_name] = split_table_name(table_name); - const std::string table_id = schema_name + "." + simple_table_name; - - pg::dl_catalog::table_meta meta; - meta.table_id = table_id; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = ds_path; - meta.state = "ready"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); - - // Save column metadata to catalog - TupleDesc tupdesc = table_data.get_tuple_descriptor(); - std::vector columns; - for (int i = 0; i < tupdesc->natts; i++) { - Form_pg_attribute attr = TupleDescAttr(tupdesc, i); - if (attr->attisdropped) { - continue; - } - pg::dl_catalog::column_meta col; - col.table_id = table_id; - col.column_name = NameStr(attr->attname); - col.pg_type = format_type_with_typemod(attr->atttypid, attr->atttypmod); - col.nullable = !attr->attnotnull; - col.position = i; - columns.push_back(std::move(col)); - } - pg::dl_catalog::upsert_columns(root_dir, creds, columns); - - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); - } + // Stateless sync uses DDL WAL replay from __wal_table. } void table_storage::load_table_metadata() @@ -295,156 +263,74 @@ void table_storage::load_table_metadata() }(); auto creds = session_credentials::get_credentials(); - // Stateless catalog sync (only when enabled and root_dir is configured) + // Stateless sync via DDL WAL replay (only when enabled and root_dir is configured) if (pg::stateless_enabled && !root_dir.empty()) { - // Fast path: if already loaded, just check version without ensure_catalog - if (tables_loaded_) { - const auto current_version = pg::dl_catalog::get_catalog_version(root_dir, creds); - if (current_version == catalog_version_) { - return; + const auto db_name = get_current_database_name(); + + // Ensure both shared and per-database catalogs exist + pg::dl_wal::ensure_catalog(root_dir, creds); + pg::dl_wal::ensure_db_catalog(root_dir, db_name, creds); + + auto is_sync_replay_backend = []() { + const char* app_name = GetConfigOption("application_name", true, false); + return app_name != nullptr && strcmp(app_name, "pg_deeplake_sync") == 0; + }; + if (!in_ddl_context() && !AmBackgroundWorkerProcess() && !is_sync_replay_backend()) { + auto entries = pg::dl_wal::load_ddl_log(root_dir, db_name, creds, ddl_log_last_seq_); + if (!entries.empty()) { + elog(LOG, "pg_deeplake: DDL WAL replay: %zu entries to process for db '%s' (after seq %ld)", + entries.size(), db_name.c_str(), ddl_log_last_seq_); } - // Version changed, need to reload - tables_.clear(); - views_.clear(); - tables_loaded_ = false; - catalog_version_ = current_version; - } - - // Ensure catalog exists and get version in one call - const auto version = pg::dl_catalog::ensure_catalog(root_dir, creds); - if (catalog_version_ == 0) { - catalog_version_ = version; - } - tables_loaded_ = true; - - // Load tables and columns in parallel - auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_dir, creds); - - if (!catalog_tables.empty()) { - for (const auto& meta : catalog_tables) { - const std::string qualified_name = meta.schema_name + "." + meta.table_name; - auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1); - Oid relid = RangeVarGetRelid(rel, NoLock, true); - if (!OidIsValid(relid)) { - // Table exists in catalog but not in PostgreSQL. - if (in_ddl_context()) { - // During DDL (CREATE TABLE), skip auto-creation to avoid races. - // The table might be in the middle of being created by another backend. - continue; - } - - // Gather columns for this table, sorted by position - std::vector table_columns; - for (const auto& col : catalog_columns) { - if (col.table_id == meta.table_id) { - table_columns.push_back(col); - } - } - std::sort(table_columns.begin(), table_columns.end(), - [](const auto& a, const auto& b) { return a.position < b.position; }); - - if (table_columns.empty()) { - elog(WARNING, "No columns found for catalog table %s, skipping", qualified_name.c_str()); - continue; - } - - // Build CREATE TABLE IF NOT EXISTS from catalog metadata. - // Wrap in a subtransaction so that if another backend concurrently - // creates the same table (race on composite type), the error is - // caught and we continue instead of aborting the session. - const char* qschema = quote_identifier(meta.schema_name.c_str()); - const char* qtable = quote_identifier(meta.table_name.c_str()); - - StringInfoData buf; - initStringInfo(&buf); - appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable); - - bool first = true; - for (const auto& col : table_columns) { - if (!first) { - appendStringInfoString(&buf, ", "); - } - first = false; - appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); - } - appendStringInfo(&buf, ") USING deeplake"); - - MemoryContext saved_context = CurrentMemoryContext; - ResourceOwner saved_owner = CurrentResourceOwner; - - BeginInternalSubTransaction(NULL); - PG_TRY(); - { - catalog_only_guard co_guard; - pg::utils::spi_connector connector; - bool pushed_snapshot = false; - if (!ActiveSnapshotSet()) { - PushActiveSnapshot(GetTransactionSnapshot()); - pushed_snapshot = true; - } - - // Create schema if needed - StringInfoData schema_buf; - initStringInfo(&schema_buf); - appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema); - SPI_execute(schema_buf.data, false, 0); - pfree(schema_buf.data); - - SPI_execute(buf.data, false, 0); - - if (pushed_snapshot) { - PopActiveSnapshot(); - } - - ReleaseCurrentSubTransaction(); - } - PG_CATCH(); - { - // Another backend created this table concurrently — not an error. - MemoryContextSwitchTo(saved_context); - CurrentResourceOwner = saved_owner; - RollbackAndReleaseCurrentSubTransaction(); - FlushErrorState(); - elog(DEBUG1, "Concurrent table creation for %s, skipping", qualified_name.c_str()); - } - PG_END_TRY(); - - pfree(buf.data); - - relid = RangeVarGetRelid(rel, NoLock, true); + for (const auto& entry : entries) { + if (entry.seq > ddl_log_last_seq_) { + ddl_log_last_seq_ = entry.seq; } - if (!OidIsValid(relid)) { - elog(WARNING, "Catalog table %s does not exist in PG instance", qualified_name.c_str()); + if (entry.origin_instance_id == pg::dl_wal::local_instance_id()) { continue; } - Relation relation = try_relation_open(relid, NoLock); - if (relation == nullptr) { - elog(WARNING, "Could not open relation for table %s", qualified_name.c_str()); - continue; + + MemoryContext saved_context = CurrentMemoryContext; + ResourceOwner saved_owner = CurrentResourceOwner; + BeginInternalSubTransaction(nullptr); + PG_TRY(); + { + set_catalog_only_create(true); + pg::utils::spi_connector connector; + bool pushed_snapshot = false; + if (!ActiveSnapshotSet()) { + PushActiveSnapshot(GetTransactionSnapshot()); + pushed_snapshot = true; + } + SPI_execute(entry.ddl_sql.c_str(), false, 0); + if (pushed_snapshot) { + PopActiveSnapshot(); + } + set_catalog_only_create(false); + ReleaseCurrentSubTransaction(); } + PG_CATCH(); { - pg::utils::memory_context_switcher context_switcher(TopMemoryContext); - table_data td( - relid, qualified_name, CreateTupleDescCopy(RelationGetDescr(relation)), meta.dataset_path, creds); - auto it2status = tables_.emplace(relid, std::move(td)); - up_to_date_ = false; - ASSERT(it2status.second); + set_catalog_only_create(false); + MemoryContextSwitchTo(saved_context); + CurrentResourceOwner = saved_owner; + RollbackAndReleaseCurrentSubTransaction(); + FlushErrorState(); + elog(WARNING, "pg_deeplake: DDL WAL replay failed (seq=%ld, tag=%s): %.200s", + entry.seq, entry.command_tag.c_str(), entry.ddl_sql.c_str()); } - relation_close(relation, NoLock); + PG_END_TRY(); } - - load_schema_name(); - return; } } - // Non-stateless path: load from local pg_deeplake_tables + // Load from local pg_deeplake_tables if (tables_loaded_) { return; } tables_loaded_ = true; if (!pg::utils::check_table_exists("pg_deeplake_tables")) { + elog(LOG, "pg_deeplake: pg_deeplake_tables does not exist, skipping local scan"); return; } @@ -519,8 +405,6 @@ void table_storage::load_table_metadata() creds = session_credentials::get_credentials(); std::vector invalid_table_oids; - bool catalog_seeded = false; - for (auto i = 0; i < proc; ++i) { HeapTuple tuple = tuptable->vals[i]; bool is_null = false; @@ -536,19 +420,6 @@ void table_storage::load_table_metadata() continue; } try { - // Seed the DL catalog with legacy metadata (only when stateless is enabled). - if (pg::stateless_enabled && !root_dir.empty()) { - auto [schema_name, simple_table_name] = split_table_name(table_name); - pg::dl_catalog::table_meta meta; - meta.table_id = schema_name + "." + simple_table_name; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = ds_path; - meta.state = "ready"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); - catalog_seeded = true; - } - // Get the relation and its tuple descriptor Relation rel = try_relation_open(relid, NoLock); if (rel == nullptr) { @@ -583,10 +454,6 @@ void table_storage::load_table_metadata() base::log_channel::generic, "Failed to delete invalid table metadata for table_oid: {}", invalid_oid); } } - if (catalog_seeded && pg::stateless_enabled && !root_dir.empty()) { - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); - } load_views(); load_schema_name(); } @@ -702,8 +569,11 @@ void table_storage::create_table(const std::string& table_name, Oid table_id, Tu if (session_root.empty()) { session_root = pg::utils::get_deeplake_root_directory(); } - // Construct path: root_dir/schema_name/table_name - dataset_path = session_root + "/" + schema_name + "/" + simple_table_name; + // Construct path: root_dir/db_name/schema_name/table_name + // root_path is the global root; include the database name so each + // database's datasets are stored under their own prefix. + const auto db_name = get_current_database_name(); + dataset_path = session_root + "/" + db_name + "/" + schema_name + "/" + simple_table_name; } // Get credentials from current session @@ -985,29 +855,7 @@ void table_storage::drop_table(const std::string& table_name) auto& table_data = get_table_data(table_name); auto creds = session_credentials::get_credentials(); - // Update stateless catalog if enabled - if (pg::stateless_enabled) { - const auto root_dir = []() { - auto root = session_credentials::get_root_path(); - if (root.empty()) { - root = pg::utils::get_deeplake_root_directory(); - } - return root; - }(); - if (!root_dir.empty()) { - pg::dl_catalog::ensure_catalog(root_dir, creds); - auto [schema_name, simple_table_name] = split_table_name(table_name); - pg::dl_catalog::table_meta meta; - meta.table_id = schema_name + "." + simple_table_name; - meta.schema_name = schema_name; - meta.table_name = simple_table_name; - meta.dataset_path = table_data.get_dataset_path().url(); - meta.state = "dropping"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); - pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); - } - } + // Stateless propagation is handled by DDL WAL logging in ProcessUtility hook. try { table_data.commit(); // Ensure all changes are committed before deletion @@ -1032,14 +880,64 @@ void table_storage::insert_slot(Oid table_id, TupleTableSlot* slot) insert_slots(table_id, 1, &slot); } +void table_storage::mark_subxact_change(Oid table_id) +{ + if (GetCurrentTransactionNestLevel() <= 1) { + return; + } + + auto& table_data = get_table_data(table_id); + const SubTransactionId sub_id = GetCurrentSubTransactionId(); + auto& sub_snapshots = subxact_snapshots_[sub_id]; + if (!sub_snapshots.contains(table_id)) { + sub_snapshots.emplace(table_id, table_data.capture_tx_snapshot()); + } +} + +void table_storage::rollback_subxact(SubTransactionId sub_id) +{ + auto it = subxact_snapshots_.find(sub_id); + if (it == subxact_snapshots_.end()) { + return; + } + + for (const auto& [table_id, snapshot] : it->second) { + auto table_it = tables_.find(table_id); + if (table_it != tables_.end()) { + table_it->second.restore_tx_snapshot(snapshot); + } + } + subxact_snapshots_.erase(it); +} + +void table_storage::commit_subxact(SubTransactionId sub_id, SubTransactionId parent_sub_id) +{ + auto it = subxact_snapshots_.find(sub_id); + if (it == subxact_snapshots_.end()) { + return; + } + + if (parent_sub_id != InvalidSubTransactionId) { + auto& parent_snapshots = subxact_snapshots_[parent_sub_id]; + for (const auto& [table_id, snapshot] : it->second) { + if (!parent_snapshots.contains(table_id)) { + parent_snapshots.emplace(table_id, snapshot); + } + } + } + subxact_snapshots_.erase(it); +} + void table_storage::insert_slots(Oid table_id, int32_t nslots, TupleTableSlot** slots) { + mark_subxact_change(table_id); auto& table_data = get_table_data(table_id); table_data.add_insert_slots(nslots, slots); } bool table_storage::delete_tuple(Oid table_id, ItemPointer tid) { + mark_subxact_change(table_id); auto& table_data = get_table_data(table_id); try { const auto row_number = utils::tid_to_row_number(tid); @@ -1062,6 +960,7 @@ bool table_storage::delete_tuple(Oid table_id, ItemPointer tid) bool table_storage::update_tuple(Oid table_id, ItemPointer tid, HeapTuple new_tuple) { + mark_subxact_change(table_id); auto& table_data = get_table_data(table_id); TupleDesc tupdesc = table_data.get_tuple_descriptor(); diff --git a/cpp/deeplake_pg/table_storage.hpp b/cpp/deeplake_pg/table_storage.hpp index c15588cfe7..3e6383d4dc 100644 --- a/cpp/deeplake_pg/table_storage.hpp +++ b/cpp/deeplake_pg/table_storage.hpp @@ -129,7 +129,10 @@ class table_storage inline void refresh_table(Oid table_id) { - tables_.at(table_id).refresh(); + auto it = tables_.find(table_id); + if (it != tables_.end()) { + it->second.refresh(); + } } // Data operations @@ -138,6 +141,9 @@ class table_storage bool delete_tuple(Oid table_id, ItemPointer tid); bool update_tuple(Oid table_id, ItemPointer tid, HeapTuple new_tuple); bool fetch_tuple(Oid table_id, ItemPointer tid, TupleTableSlot* slot); + void mark_subxact_change(Oid table_id); + void rollback_subxact(SubTransactionId sub_id); + void commit_subxact(SubTransactionId sub_id, SubTransactionId parent_sub_id); bool flush_all() { @@ -156,15 +162,16 @@ class table_storage for (auto& [_, table_data] : tables_) { table_data.commit(); } + subxact_snapshots_.clear(); } void rollback_all() { // Rollback all changes in all tables for (auto& [_, table_data] : tables_) { - table_data.reset_insert_rows(); - table_data.clear_delete_rows(); + table_data.rollback(); } + subxact_snapshots_.clear(); } inline auto& get_tables() noexcept @@ -195,7 +202,7 @@ class table_storage tables_.clear(); views_.clear(); tables_loaded_ = false; - catalog_version_ = 0; + ddl_log_last_seq_ = 0; load_table_metadata(); } void mark_metadata_stale() noexcept @@ -298,6 +305,11 @@ class table_storage return catalog_only_create_; } + static void set_catalog_only_create(bool value) noexcept + { + catalog_only_create_ = value; + } + private: static inline thread_local bool in_ddl_context_ = false; static inline thread_local bool catalog_only_create_ = false; @@ -307,11 +319,12 @@ class table_storage void erase_table_metadata(const std::string& table_name); std::unordered_map tables_; + std::unordered_map> subxact_snapshots_; std::unordered_map> views_; std::string schema_name_ = "public"; bool tables_loaded_ = false; bool up_to_date_ = true; - int64_t catalog_version_ = 0; + int64_t ddl_log_last_seq_ = 0; }; } // namespace pg diff --git a/postgres/pg_deeplake--1.0.sql b/postgres/pg_deeplake--1.0.sql index 907e9bd9bd..e77414a27b 100644 --- a/postgres/pg_deeplake--1.0.sql +++ b/postgres/pg_deeplake--1.0.sql @@ -13,6 +13,10 @@ COMMENT ON DOMAIN VIDEO IS 'Binary video data stored as BYTEA'; CREATE DOMAIN FILE AS BYTEA; COMMENT ON DOMAIN FILE IS 'Binary file data stored as BYTEA'; +-- FILE_ID domain: UUID alias for file identifiers +CREATE DOMAIN FILE_ID AS UUID; +COMMENT ON DOMAIN FILE_ID IS 'UUID identifier for files'; + CREATE FUNCTION handle_index_creation() RETURNS event_trigger AS 'pg_deeplake' LANGUAGE C VOLATILE; -- Create the event trigger to listen for CREATE INDEX events diff --git a/postgres/tests/py_tests/conftest.py b/postgres/tests/py_tests/conftest.py index c1037e0934..aa0061c7bf 100644 --- a/postgres/tests/py_tests/conftest.py +++ b/postgres/tests/py_tests/conftest.py @@ -236,9 +236,9 @@ async def db_conn(pg_server) -> AsyncGenerator[asyncpg.Connection, None]: ) try: - # Setup: Clean extension state - await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await conn.execute("CREATE EXTENSION pg_deeplake") + # Setup: ensure extension is available without potentially blocking DROP. + # DROP EXTENSION can wait on concurrent backend locks and stall tests. + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") # Load utility functions from utils.psql script_path = Path(__file__).parent # py_tests/ diff --git a/postgres/tests/py_tests/test_concurrent_insert_index.py b/postgres/tests/py_tests/test_concurrent_insert_index.py index 9252766658..4c55b716a6 100644 --- a/postgres/tests/py_tests/test_concurrent_insert_index.py +++ b/postgres/tests/py_tests/test_concurrent_insert_index.py @@ -217,7 +217,6 @@ async def test_concurrent_insert_and_index_creation(db_conn: asyncpg.Connection) try: # Initialization: Create domains and table print("Setting up domains and table...") - await db_conn.execute("CREATE DOMAIN file_id AS UUID") await db_conn.execute("CREATE DOMAIN chunk_id AS TEXT") await db_conn.execute(""" @@ -381,7 +380,6 @@ async def do_cleanup(): try: await db_conn.execute("DROP TABLE IF EXISTS chunk_text_image CASCADE") await db_conn.execute("DROP DOMAIN IF EXISTS chunk_id CASCADE") - await db_conn.execute("DROP DOMAIN IF EXISTS file_id CASCADE") print("✓ Cleanup complete") except Exception as e: print(f"⚠ Cleanup failed: {e}") diff --git a/postgres/tests/py_tests/test_create_database.py b/postgres/tests/py_tests/test_create_database.py index 9c0c20f6a6..78f767a74e 100644 --- a/postgres/tests/py_tests/test_create_database.py +++ b/postgres/tests/py_tests/test_create_database.py @@ -231,7 +231,7 @@ async def test_create_multiple_databases(pg_server): # --------------------------------------------------------------------------- -async def poll_for_extension(conn, timeout=10.0): +async def poll_for_extension(conn, timeout=20.0): """Poll for pg_deeplake extension. Returns True if found, False on timeout.""" deadline = asyncio.get_event_loop().time() + timeout while asyncio.get_event_loop().time() < deadline: @@ -265,7 +265,7 @@ async def test_async_extension_auto_install(pg_server): installed = await poll_for_extension(target_conn) assert installed, ( - "Sync worker should have auto-installed pg_deeplake within 10s" + "Sync worker should have auto-installed pg_deeplake within 20s" ) finally: if target_conn is not None: @@ -295,7 +295,7 @@ async def test_async_extension_auto_install_multiple(pg_server): # Poll all databases for extension remaining = set(db_names) - deadline = asyncio.get_event_loop().time() + 10.0 + deadline = asyncio.get_event_loop().time() + 20.0 while remaining and asyncio.get_event_loop().time() < deadline: for db in list(remaining): ext = await conns[db].fetchval( @@ -308,7 +308,7 @@ async def test_async_extension_auto_install_multiple(pg_server): assert not remaining, ( f"Sync worker should have installed pg_deeplake in all databases " - f"within 10s, still missing: {remaining}" + f"within 20s, still missing: {remaining}" ) finally: for c in conns.values(): diff --git a/postgres/tests/py_tests/test_drop_table_column.py b/postgres/tests/py_tests/test_drop_table_column.py index 93f6b858a8..505f9282d5 100644 --- a/postgres/tests/py_tests/test_drop_table_column.py +++ b/postgres/tests/py_tests/test_drop_table_column.py @@ -25,7 +25,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): try: # Create table with multiple columns await db_conn.execute(""" - CREATE TABLE vectors ( + CREATE TABLE drop_col_vectors ( id SERIAL PRIMARY KEY, v1 float4[], v2 float4[] @@ -34,7 +34,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): # Create index on v2 (not v1) await db_conn.execute(""" - CREATE INDEX index_for_v2 ON vectors USING deeplake_index (v2 DESC) + CREATE INDEX index_for_v2 ON drop_col_vectors USING deeplake_index (v2 DESC) """) # Verify index exists in pg_class @@ -60,7 +60,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): f"Dataset directory '{dataset_path}' should exist before DROP COLUMN" # DROP non-indexed column (v1) - index should remain - await db_conn.execute("ALTER TABLE vectors DROP COLUMN v1") + await db_conn.execute("ALTER TABLE drop_col_vectors DROP COLUMN v1") # Verify index still exists after dropping non-indexed column await assertions.assert_query_row_count( @@ -83,7 +83,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): f"Dataset directory '{dataset_path_after_v1}' should exist after dropping non-indexed column" # DROP indexed column (v2) - index should be removed - await db_conn.execute("ALTER TABLE vectors DROP COLUMN v2") + await db_conn.execute("ALTER TABLE drop_col_vectors DROP COLUMN v2") # Verify index removed from pg_class await assertions.assert_query_row_count( @@ -108,4 +108,4 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): finally: # Cleanup (in case test fails) await db_conn.execute("DROP INDEX IF EXISTS index_for_v2 CASCADE") - await db_conn.execute("DROP TABLE IF EXISTS vectors CASCADE") + await db_conn.execute("DROP TABLE IF EXISTS drop_col_vectors CASCADE") diff --git a/postgres/tests/py_tests/test_root_path.py b/postgres/tests/py_tests/test_root_path.py index f8e0c3d5ff..e482dcd340 100644 --- a/postgres/tests/py_tests/test_root_path.py +++ b/postgres/tests/py_tests/test_root_path.py @@ -55,7 +55,10 @@ async def test_root_path_basic(db_conn: asyncpg.Connection, temp_dir_for_postgre print(f"✓ Dataset path from metadata: {ds_path}") # Verify the path starts with our root_path - expected_path = os.path.join(root_path, "public", "test_root_path") + # The extension now auto-includes the database name in the path: + # root_path / db_name / schema / table + db_name = await db_conn.fetchval("SELECT current_database()") + expected_path = os.path.join(root_path, db_name, "public", "test_root_path") # Strip trailing slash for comparison ds_path_normalized = ds_path.rstrip('/') expected_path_normalized = expected_path.rstrip('/') diff --git a/postgres/tests/py_tests/test_startup_latency.py b/postgres/tests/py_tests/test_startup_latency.py index 6f01809043..d263629571 100644 --- a/postgres/tests/py_tests/test_startup_latency.py +++ b/postgres/tests/py_tests/test_startup_latency.py @@ -138,8 +138,7 @@ async def measure_connection_latency( # 2. Measure extension load time if with_extension: ext_start = time.perf_counter() - await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await conn.execute("CREATE EXTENSION pg_deeplake") + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") metrics.extension_load_time_ms = (time.perf_counter() - ext_start) * 1000 # 3. Measure root_path set time (triggers catalog loading in stateless mode) @@ -205,8 +204,7 @@ async def measure_catalog_discovery_latency( try: # Load extension ext_start = time.perf_counter() - await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await conn.execute("CREATE EXTENSION pg_deeplake") + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") metrics.extension_load_time_ms = (time.perf_counter() - ext_start) * 1000 # Set root_path - this triggers catalog discovery @@ -338,8 +336,7 @@ async def test_stateless_catalog_loading_latency(pg_server, temp_root_path): ) try: - await setup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await setup_conn.execute("CREATE EXTENSION pg_deeplake") + await setup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await setup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") # Create multiple tables to populate the catalog @@ -389,8 +386,7 @@ async def test_stateless_catalog_loading_latency(pg_server, temp_root_path): statement_cache_size=0 ) try: - await cleanup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await cleanup_conn.execute("CREATE EXTENSION pg_deeplake") + await cleanup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await cleanup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") for i in range(num_tables): await cleanup_conn.execute(f"DROP TABLE IF EXISTS catalog_test_{i} CASCADE") @@ -477,8 +473,7 @@ async def test_multi_table_catalog_scaling(pg_server, temp_root_path): ) try: - await setup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await setup_conn.execute("CREATE EXTENSION pg_deeplake") + await setup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await setup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") # Create tables @@ -513,8 +508,7 @@ async def test_multi_table_catalog_scaling(pg_server, temp_root_path): statement_cache_size=0 ) try: - await cleanup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await cleanup_conn.execute("CREATE EXTENSION pg_deeplake") + await cleanup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await cleanup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") for i in range(num_tables): await cleanup_conn.execute(f"DROP TABLE IF EXISTS scale_test_{num_tables}_{i} CASCADE") @@ -576,8 +570,7 @@ async def test_cold_start_simulation(pg_server, temp_root_path): ) try: - await setup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await setup_conn.execute("CREATE EXTENSION pg_deeplake") + await setup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await setup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") await setup_conn.execute(""" @@ -645,8 +638,7 @@ async def test_cold_start_simulation(pg_server, temp_root_path): statement_cache_size=0 ) try: - await cleanup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await cleanup_conn.execute("CREATE EXTENSION pg_deeplake") + await cleanup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await cleanup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") await cleanup_conn.execute("DROP TABLE IF EXISTS existing_data CASCADE") finally: diff --git a/postgres/tests/py_tests/test_stateless_reserved_schema.py b/postgres/tests/py_tests/test_stateless_reserved_schema.py index 744762c2ed..1685c3abb1 100644 --- a/postgres/tests/py_tests/test_stateless_reserved_schema.py +++ b/postgres/tests/py_tests/test_stateless_reserved_schema.py @@ -75,8 +75,17 @@ async def primary_conn(pg_server): try: await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") await conn.execute("CREATE EXTENSION pg_deeplake") + # Clean up any leftover reserved schemas from previous failed test runs + for schema in ["default", "user", "table"]: + await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE') yield conn finally: + # Best-effort cleanup of reserved schemas on teardown + for schema in ["default", "user", "table"]: + try: + await conn.execute(f'DROP SCHEMA IF EXISTS "{schema}" CASCADE') + except Exception: + pass await conn.close() diff --git a/postgres/tests/py_tests/test_transaction_abort_handling.py b/postgres/tests/py_tests/test_transaction_abort_handling.py index be3c259b9f..462cab28cd 100644 --- a/postgres/tests/py_tests/test_transaction_abort_handling.py +++ b/postgres/tests/py_tests/test_transaction_abort_handling.py @@ -28,7 +28,6 @@ @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_error_during_insert_with_abort(db_conn: asyncpg.Connection): """ Test that errors during INSERT operations don't cause cascading aborts. @@ -104,7 +103,6 @@ async def test_guc_parameter_error_handling(db_conn: asyncpg.Connection): @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_query_error_with_pending_changes(db_conn: asyncpg.Connection): """ Test that query errors with pending changes are handled correctly. @@ -163,7 +161,6 @@ async def test_query_error_with_pending_changes(db_conn: asyncpg.Connection): @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_multiple_errors_in_sequence(db_conn: asyncpg.Connection): """ Test that multiple errors in sequence don't cause cascading issues. @@ -203,7 +200,6 @@ async def test_multiple_errors_in_sequence(db_conn: asyncpg.Connection): @pytest.mark.asyncio -@pytest.mark.skip(reason="pg_deeplake does not handle rollback yet.") async def test_nested_transaction_abort(db_conn: asyncpg.Connection): """ Test nested transaction (savepoint) abort handling. @@ -252,6 +248,253 @@ async def test_nested_transaction_abort(db_conn: asyncpg.Connection): pass +@pytest.mark.asyncio +async def test_nested_transaction_abort_large_inner_insert(db_conn: asyncpg.Connection): + """ + Test savepoint rollback when inner transaction has enough rows to trigger insert buffering thresholds. + """ + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_large ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_large (value) VALUES (1)") + + try: + async with db_conn.transaction(): + await db_conn.execute(""" + INSERT INTO test_nested_abort_large (value) + SELECT generate_series(2, 700) + """) + await db_conn.execute("SELECT * FROM nonexistent_table_large") + except asyncpg.exceptions.UndefinedTableError: + pass + + await db_conn.execute("INSERT INTO test_nested_abort_large (value) VALUES (701)") + + count = await db_conn.fetchval("SELECT COUNT(*) FROM test_nested_abort_large") + assert count == 2, f"Expected 2 rows (values 1 and 701), got {count}" + + values = await db_conn.fetch("SELECT value FROM test_nested_abort_large ORDER BY value") + assert [r['value'] for r in values] == [1, 701], "Should have values 1 and 701" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_large") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_delete(db_conn: asyncpg.Connection): + """Inner savepoint DELETEs should rollback while outer transaction changes commit.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_delete ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + await db_conn.execute(""" + INSERT INTO test_nested_abort_delete (value) + SELECT generate_series(1, 5) + """) + + async with db_conn.transaction(): + try: + async with db_conn.transaction(): + await db_conn.execute("DELETE FROM test_nested_abort_delete WHERE value IN (2, 3)") + await db_conn.execute("SELECT * FROM nonexistent_table_delete") + except asyncpg.exceptions.UndefinedTableError: + pass + + await db_conn.execute("DELETE FROM test_nested_abort_delete WHERE value = 5") + + values = await db_conn.fetch("SELECT value FROM test_nested_abort_delete ORDER BY value") + assert [r['value'] for r in values] == [1, 2, 3, 4], "Inner DELETEs must rollback, outer DELETE must commit" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_delete") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_update(db_conn: asyncpg.Connection): + """Inner savepoint UPDATEs should rollback while outer transaction updates commit.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_update ( + id INTEGER PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + await db_conn.execute(""" + INSERT INTO test_nested_abort_update (id, value) VALUES (1, 10), (2, 20) + """) + + async with db_conn.transaction(): + await db_conn.execute("UPDATE test_nested_abort_update SET value = 11 WHERE id = 1") + try: + async with db_conn.transaction(): + await db_conn.execute("UPDATE test_nested_abort_update SET value = 22 WHERE id = 2") + await db_conn.execute("SELECT * FROM nonexistent_table_update") + except asyncpg.exceptions.UndefinedTableError: + pass + await db_conn.execute("UPDATE test_nested_abort_update SET value = 12 WHERE id = 1") + + rows = await db_conn.fetch("SELECT id, value FROM test_nested_abort_update ORDER BY id") + assert [(r['id'], r['value']) for r in rows] == [(1, 12), (2, 20)], \ + "Inner UPDATE must rollback, outer UPDATE must persist" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_update") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_copy_like_insert(db_conn: asyncpg.Connection): + """COPY path in inner savepoint should rollback only inner rows on error.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_copy ( + id SERIAL PRIMARY KEY, + value INTEGER NOT NULL + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_copy (value) VALUES (1)") + try: + async with db_conn.transaction(): + await db_conn.copy_records_to_table( + "test_nested_abort_copy", + records=[(2,), (None,), (3,)], + columns=["value"], + ) + except asyncpg.exceptions.NotNullViolationError: + pass + await db_conn.execute("INSERT INTO test_nested_abort_copy (value) VALUES (4)") + + values = await db_conn.fetch("SELECT value FROM test_nested_abort_copy ORDER BY value") + assert [r['value'] for r in values] == [1, 4], "Inner COPY rows must rollback on error" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_copy") + except: + pass + + +@pytest.mark.asyncio +async def test_multi_level_savepoint_abort(db_conn: asyncpg.Connection): + """Abort at second-level savepoint must preserve first-level savepoint and outer transaction.""" + try: + await db_conn.execute(""" + CREATE TABLE test_multi_level_savepoint ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (1)") + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (2)") + try: + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (3)") + await db_conn.execute("SELECT * FROM nonexistent_table_sp2") + except asyncpg.exceptions.UndefinedTableError: + pass + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (4)") + await db_conn.execute("INSERT INTO test_multi_level_savepoint (value) VALUES (5)") + + values = await db_conn.fetch("SELECT value FROM test_multi_level_savepoint ORDER BY value") + assert [r['value'] for r in values] == [1, 2, 4, 5], "Only deepest savepoint changes should rollback" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_multi_level_savepoint") + except: + pass + + +@pytest.mark.asyncio +async def test_top_level_abort_after_inner_savepoint_success(db_conn: asyncpg.Connection): + """Top-level abort must rollback both outer and successfully committed inner savepoint changes.""" + try: + await db_conn.execute(""" + CREATE TABLE test_top_level_abort_after_inner ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + try: + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_top_level_abort_after_inner (value) VALUES (1)") + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_top_level_abort_after_inner (value) VALUES (2)") + await db_conn.execute("SELECT * FROM nonexistent_table_outer_abort") + except asyncpg.exceptions.UndefinedTableError: + pass + + count = await db_conn.fetchval("SELECT COUNT(*) FROM test_top_level_abort_after_inner") + assert count == 0, "Top-level abort must clear all changes including inner savepoint changes" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_top_level_abort_after_inner") + except: + pass + + +@pytest.mark.asyncio +async def test_nested_transaction_abort_cross_table(db_conn: asyncpg.Connection): + """Savepoint rollback should restore staged state across multiple deeplake tables.""" + try: + await db_conn.execute(""" + CREATE TABLE test_nested_abort_cross_a ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + await db_conn.execute(""" + CREATE TABLE test_nested_abort_cross_b ( + id SERIAL PRIMARY KEY, + value INTEGER + ) USING deeplake + """) + + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_cross_a (value) VALUES (1)") + await db_conn.execute("INSERT INTO test_nested_abort_cross_b (value) VALUES (1)") + + try: + async with db_conn.transaction(): + await db_conn.execute("INSERT INTO test_nested_abort_cross_a (value) VALUES (2)") + await db_conn.execute("INSERT INTO test_nested_abort_cross_b (value) VALUES (2)") + await db_conn.execute("SELECT * FROM nonexistent_table_cross") + except asyncpg.exceptions.UndefinedTableError: + pass + + await db_conn.execute("INSERT INTO test_nested_abort_cross_a (value) VALUES (3)") + await db_conn.execute("INSERT INTO test_nested_abort_cross_b (value) VALUES (3)") + + values_a = await db_conn.fetch("SELECT value FROM test_nested_abort_cross_a ORDER BY value") + values_b = await db_conn.fetch("SELECT value FROM test_nested_abort_cross_b ORDER BY value") + assert [r['value'] for r in values_a] == [1, 3], "Table A must rollback inner savepoint rows" + assert [r['value'] for r in values_b] == [1, 3], "Table B must rollback inner savepoint rows" + finally: + try: + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_cross_a") + await db_conn.execute("DROP TABLE IF EXISTS test_nested_abort_cross_b") + except: + pass + + @pytest.mark.asyncio async def test_abort_during_schema_operation(db_conn: asyncpg.Connection): """