diff --git a/cpp/deeplake_pg/dl_catalog.cpp b/cpp/deeplake_pg/dl_catalog.cpp index bb10c85724..53db8eead6 100644 --- a/cpp/deeplake_pg/dl_catalog.cpp +++ b/cpp/deeplake_pg/dl_catalog.cpp @@ -29,8 +29,10 @@ constexpr const char* k_tables_name = "tables"; constexpr const char* k_columns_name = "columns"; constexpr const char* k_indexes_name = "indexes"; constexpr const char* k_meta_name = "meta"; +constexpr const char* k_schemas_name = "schemas"; constexpr const char* k_databases_name = "databases"; +// Shared (cluster-wide) path: {root}/__deeplake_catalog/{name} std::string join_path(const std::string& root, const std::string& name) { if (!root.empty() && root.back() == '/') { @@ -39,6 +41,16 @@ std::string join_path(const std::string& root, const std::string& name) return root + "/" + k_catalog_dir + "/" + name; } +// Per-database path: {root}/{db_name}/__deeplake_catalog/{name} +std::string join_db_path(const std::string& root, const std::string& db_name, const std::string& name) +{ + std::string base = root; + if (!base.empty() && base.back() == '/') { + base.pop_back(); + } + return base + "/" + db_name + "/" + k_catalog_dir + "/" + name; +} + // Cache for catalog table handles to avoid repeated S3 opens struct catalog_table_cache { @@ -75,6 +87,7 @@ int64_t now_ms() return duration_cast(system_clock::now().time_since_epoch()).count(); } +// Open a shared (cluster-wide) catalog table std::shared_ptr open_catalog_table(const std::string& root_path, const std::string& name, icm::string_map<> creds) { @@ -82,6 +95,14 @@ open_catalog_table(const std::string& root_path, const std::string& name, icm::s return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); } +// Open a per-database catalog table +std::shared_ptr +open_db_catalog_table(const std::string& root_path, const std::string& db_name, const std::string& name, icm::string_map<> creds) +{ + const auto path = join_db_path(root_path, db_name, name); + return deeplake_api::open_catalog_table(path, std::move(creds)).get_future().get(); +} + template std::vector load_vector(const nd::array& arr) { @@ -124,6 +145,81 @@ std::vector load_int64_vector(const nd::array& arr) return out; } +// Build the tables schema (shared between ensure_db_catalog and schema definitions) +deeplake_api::catalog_table_schema make_tables_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) + .add("schema_name", deeplake_core::type::text(codecs::compression::null)) + .add("table_name", deeplake_core::type::text(codecs::compression::null)) + .add("dataset_path", deeplake_core::type::text(codecs::compression::null)) + .add("state", deeplake_core::type::text(codecs::compression::null)) + .add("db_name", deeplake_core::type::text(codecs::compression::null)) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("table_id"); + return schema; +} + +deeplake_api::catalog_table_schema make_columns_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("column_id", deeplake_core::type::text(codecs::compression::null)) + .add("table_id", deeplake_core::type::text(codecs::compression::null)) + .add("column_name", deeplake_core::type::text(codecs::compression::null)) + .add("pg_type", deeplake_core::type::text(codecs::compression::null)) + .add("dl_type_json", deeplake_core::type::text(codecs::compression::null)) + .add("nullable", deeplake_core::type::generic(nd::type::scalar(nd::dtype::boolean))) + .add("position", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) + .set_primary_key("column_id"); + return schema; +} + +deeplake_api::catalog_table_schema make_indexes_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) + .add("column_names", deeplake_core::type::text(codecs::compression::null)) + .add("index_type", deeplake_core::type::text(codecs::compression::null)) + .add("order_type", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) + .set_primary_key("table_id"); + return schema; +} + +deeplake_api::catalog_table_schema make_schemas_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("schema_name", deeplake_core::type::text(codecs::compression::null)) + .add("owner", deeplake_core::type::text(codecs::compression::null)) + .add("state", deeplake_core::type::text(codecs::compression::null)) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("schema_name"); + return schema; +} + +deeplake_api::catalog_table_schema make_meta_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("catalog_version", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("catalog_version"); + return schema; +} + +deeplake_api::catalog_table_schema make_databases_schema() +{ + deeplake_api::catalog_table_schema schema; + schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) + .add("owner", deeplake_core::type::text(codecs::compression::null)) + .add("encoding", deeplake_core::type::text(codecs::compression::null)) + .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) + .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) + .add("template_db", deeplake_core::type::text(codecs::compression::null)) + .add("state", deeplake_core::type::text(codecs::compression::null)) + .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) + .set_primary_key("db_name"); + return schema; +} + } // namespace int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) @@ -131,81 +227,92 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) if (root_path.empty()) { return 0; } - const auto tables_path = join_path(root_path, k_tables_name); - const auto columns_path = join_path(root_path, k_columns_name); - const auto indexes_path = join_path(root_path, k_indexes_name); const auto meta_path = join_path(root_path, k_meta_name); const auto databases_path = join_path(root_path, k_databases_name); try { - // Build schemas for all catalog tables - deeplake_api::catalog_table_schema tables_schema; - tables_schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("schema_name", deeplake_core::type::text(codecs::compression::null)) - .add("table_name", deeplake_core::type::text(codecs::compression::null)) - .add("dataset_path", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("table_id"); - - deeplake_api::catalog_table_schema columns_schema; - columns_schema.add("column_id", deeplake_core::type::text(codecs::compression::null)) - .add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("column_name", deeplake_core::type::text(codecs::compression::null)) - .add("pg_type", deeplake_core::type::text(codecs::compression::null)) - .add("dl_type_json", deeplake_core::type::text(codecs::compression::null)) - .add("nullable", deeplake_core::type::generic(nd::type::scalar(nd::dtype::boolean))) - .add("position", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) - .set_primary_key("column_id"); - - deeplake_api::catalog_table_schema indexes_schema; - indexes_schema.add("table_id", deeplake_core::type::text(codecs::compression::null)) - .add("column_names", deeplake_core::type::text(codecs::compression::null)) - .add("index_type", deeplake_core::type::text(codecs::compression::null)) - .add("order_type", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int32))) - .set_primary_key("table_id"); - - deeplake_api::catalog_table_schema meta_schema; - meta_schema.add("catalog_version", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("catalog_version"); - - deeplake_api::catalog_table_schema databases_schema; - databases_schema.add("db_name", deeplake_core::type::text(codecs::compression::null)) - .add("owner", deeplake_core::type::text(codecs::compression::null)) - .add("encoding", deeplake_core::type::text(codecs::compression::null)) - .add("lc_collate", deeplake_core::type::text(codecs::compression::null)) - .add("lc_ctype", deeplake_core::type::text(codecs::compression::null)) - .add("template_db", deeplake_core::type::text(codecs::compression::null)) - .add("state", deeplake_core::type::text(codecs::compression::null)) - .add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64))) - .set_primary_key("db_name"); - - // Launch all 5 open_or_create operations in parallel + // Launch shared catalog table creation in parallel (meta + databases only) + icm::vector>> promises; + promises.reserve(2); + promises.push_back( + deeplake_api::open_or_create_catalog_table(meta_path, make_meta_schema(), icm::string_map<>(creds))); + promises.push_back( + deeplake_api::open_or_create_catalog_table(databases_path, make_databases_schema(), icm::string_map<>(creds))); + + // Wait for all to complete + auto results = async::combine(std::move(promises)).get_future().get(); + if (results.size() != 2) { + elog(ERROR, + "Failed to initialize shared catalog at %s: expected 2 catalog tables, got %zu", + root_path.c_str(), + static_cast(results.size())); + } + + // Initialize meta table if empty (index 0 is meta) + auto& meta_table = results[0]; + if (meta_table) { + auto snapshot = meta_table->read().get_future().get(); + if (snapshot.row_count() == 0) { + icm::string_map row; + row["catalog_version"] = nd::adapt(static_cast(1)); + row["updated_at"] = nd::adapt(now_ms()); + meta_table->insert(std::move(row)).get_future().get(); + } + } + // Get version from the meta table we already have open + if (meta_table) { + return static_cast(meta_table->version().get_future().get()); + } + return 0; + } catch (const std::exception& e) { + catalog_table_cache::instance().invalidate(); + elog(ERROR, "Failed to ensure shared catalog at %s: %s", root_path.c_str(), e.what()); + return 0; + } catch (...) { + catalog_table_cache::instance().invalidate(); + elog(ERROR, "Failed to ensure shared catalog at %s: unknown error", root_path.c_str()); + return 0; + } +} + +int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + if (root_path.empty() || db_name.empty()) { + return 0; + } + + const auto tables_path = join_db_path(root_path, db_name, k_tables_name); + const auto columns_path = join_db_path(root_path, db_name, k_columns_name); + const auto indexes_path = join_db_path(root_path, db_name, k_indexes_name); + const auto schemas_path = join_db_path(root_path, db_name, k_schemas_name); + const auto meta_path = join_db_path(root_path, db_name, k_meta_name); + + try { + // Launch all 5 per-database catalog table creation in parallel icm::vector>> promises; promises.reserve(5); promises.push_back( - deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds))); + deeplake_api::open_or_create_catalog_table(tables_path, make_tables_schema(), icm::string_map<>(creds))); promises.push_back( - deeplake_api::open_or_create_catalog_table(columns_path, std::move(columns_schema), icm::string_map<>(creds))); + deeplake_api::open_or_create_catalog_table(columns_path, make_columns_schema(), icm::string_map<>(creds))); promises.push_back( - deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds))); + deeplake_api::open_or_create_catalog_table(indexes_path, make_indexes_schema(), icm::string_map<>(creds))); promises.push_back( - deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds))); + deeplake_api::open_or_create_catalog_table(schemas_path, make_schemas_schema(), icm::string_map<>(creds))); promises.push_back( - deeplake_api::open_or_create_catalog_table(databases_path, std::move(databases_schema), icm::string_map<>(creds))); + deeplake_api::open_or_create_catalog_table(meta_path, make_meta_schema(), icm::string_map<>(creds))); - // Wait for all to complete auto results = async::combine(std::move(promises)).get_future().get(); if (results.size() != 5) { elog(ERROR, - "Failed to initialize catalog at %s: expected 5 catalog tables, got %zu", + "Failed to initialize per-db catalog at %s/%s: expected 5 catalog tables, got %zu", root_path.c_str(), + db_name.c_str(), static_cast(results.size())); } - // Initialize meta table if empty (index 3 is meta) - auto& meta_table = results[3]; + // Initialize per-db meta table if empty (index 4 is meta) + auto& meta_table = results[4]; if (meta_table) { auto snapshot = meta_table->read().get_future().get(); if (snapshot.row_count() == 0) { @@ -215,27 +322,24 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds) meta_table->insert(std::move(row)).get_future().get(); } } - // Get version from the meta table we already have open (avoids a second open_catalog_table) if (meta_table) { return static_cast(meta_table->version().get_future().get()); } return 0; } catch (const std::exception& e) { - catalog_table_cache::instance().invalidate(); - elog(ERROR, "Failed to ensure catalog at %s: %s", root_path.c_str(), e.what()); + elog(ERROR, "Failed to ensure per-db catalog at %s/%s: %s", root_path.c_str(), db_name.c_str(), e.what()); return 0; } catch (...) { - catalog_table_cache::instance().invalidate(); - elog(ERROR, "Failed to ensure catalog at %s: unknown error", root_path.c_str()); + elog(ERROR, "Failed to ensure per-db catalog at %s/%s: unknown error", root_path.c_str(), db_name.c_str()); return 0; } } -std::vector load_tables(const std::string& root_path, icm::string_map<> creds) +std::vector load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) { std::vector out; try { - auto table = open_catalog_table(root_path, k_tables_name, std::move(creds)); + auto table = open_db_catalog_table(root_path, db_name, k_tables_name, std::move(creds)); if (!table) { return out; } @@ -263,6 +367,11 @@ std::vector load_tables(const std::string& root_path, icm::string_ma meta.table_name = deeplake_api::array_to_string(table_it->second); meta.dataset_path = deeplake_api::array_to_string(path_it->second); meta.state = deeplake_api::array_to_string(state_it->second); + meta.db_name = db_name; + auto db_name_it = row.find("db_name"); + if (db_name_it != row.end()) { + meta.db_name = deeplake_api::array_to_string(db_name_it->second); + } auto updated_vec = load_int64_vector(updated_it->second); meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); @@ -280,19 +389,19 @@ std::vector load_tables(const std::string& root_path, icm::string_ma } return out; } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog tables: %s", e.what()); + elog(WARNING, "Failed to load catalog tables for db '%s': %s", db_name.c_str(), e.what()); return out; } catch (...) { - elog(WARNING, "Failed to load catalog tables: unknown error"); + elog(WARNING, "Failed to load catalog tables for db '%s': unknown error", db_name.c_str()); return out; } } -std::vector load_columns(const std::string& root_path, icm::string_map<> creds) +std::vector load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) { std::vector out; try { - auto table = open_catalog_table(root_path, k_columns_name, std::move(creds)); + auto table = open_db_catalog_table(root_path, db_name, k_columns_name, std::move(creds)); if (!table) { return out; } @@ -340,29 +449,139 @@ std::vector load_columns(const std::string& root_path, icm::string_ } return out; } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog columns: %s", e.what()); + elog(WARNING, "Failed to load catalog columns for db '%s': %s", db_name.c_str(), e.what()); + return out; + } catch (...) { + elog(WARNING, "Failed to load catalog columns for db '%s': unknown error", db_name.c_str()); + return out; + } +} + +std::vector load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + std::vector out; + try { + auto table = open_db_catalog_table(root_path, db_name, k_indexes_name, std::move(creds)); + if (!table) { + return out; + } + auto snapshot = table->read().get_future().get(); + if (snapshot.row_count() == 0) { + return out; + } + + for (const auto& row : snapshot.rows()) { + auto table_id_it = row.find("table_id"); + auto column_names_it = row.find("column_names"); + auto index_type_it = row.find("index_type"); + auto order_type_it = row.find("order_type"); + + if (table_id_it == row.end() || column_names_it == row.end() || index_type_it == row.end()) { + continue; + } + + index_meta meta; + meta.table_id = deeplake_api::array_to_string(table_id_it->second); + meta.column_names = deeplake_api::array_to_string(column_names_it->second); + meta.index_type = deeplake_api::array_to_string(index_type_it->second); + if (order_type_it != row.end()) { + try { + auto order_vec = load_vector(order_type_it->second); + meta.order_type = order_vec.empty() ? 0 : order_vec.front(); + } catch (...) { + meta.order_type = 0; + } + } + + out.push_back(std::move(meta)); + } + return out; + } catch (const std::exception& e) { + elog(DEBUG1, "Failed to load catalog indexes for db '%s': %s", db_name.c_str(), e.what()); return out; } catch (...) { - elog(WARNING, "Failed to load catalog columns: unknown error"); + elog(DEBUG1, "Failed to load catalog indexes for db '%s': unknown error", db_name.c_str()); return out; } } -std::vector load_indexes(const std::string&, icm::string_map<>) +std::vector load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) { - return {}; + std::vector out; + try { + auto table = open_db_catalog_table(root_path, db_name, k_schemas_name, std::move(creds)); + if (!table) { + return out; + } + auto snapshot = table->read().get_future().get(); + if (snapshot.row_count() == 0) { + return out; + } + + std::unordered_map latest; + for (const auto& row : snapshot.rows()) { + auto schema_name_it = row.find("schema_name"); + auto state_it = row.find("state"); + if (schema_name_it == row.end() || state_it == row.end()) { + continue; + } + + schema_meta meta; + meta.schema_name = deeplake_api::array_to_string(schema_name_it->second); + meta.state = deeplake_api::array_to_string(state_it->second); + auto owner_it = row.find("owner"); + if (owner_it != row.end()) { + meta.owner = deeplake_api::array_to_string(owner_it->second); + } + auto updated_it = row.find("updated_at"); + if (updated_it != row.end()) { + auto updated_vec = load_int64_vector(updated_it->second); + meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); + } + + auto it = latest.find(meta.schema_name); + if (it == latest.end() || it->second.updated_at <= meta.updated_at) { + latest[meta.schema_name] = std::move(meta); + } + } + + out.reserve(latest.size()); + for (auto& [_, meta] : latest) { + if (meta.state == "ready") { + out.push_back(std::move(meta)); + } + } + return out; + } catch (const std::exception& e) { + elog(DEBUG1, "Failed to load catalog schemas for db '%s': %s (may be old catalog)", db_name.c_str(), e.what()); + return out; + } catch (...) { + elog(DEBUG1, "Failed to load catalog schemas for db '%s': unknown error (may be old catalog)", db_name.c_str()); + return out; + } +} + +void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta) +{ + auto table = open_db_catalog_table(root_path, db_name, k_schemas_name, std::move(creds)); + icm::string_map row; + row["schema_name"] = nd::adapt(meta.schema_name); + row["owner"] = nd::adapt(meta.owner); + row["state"] = nd::adapt(meta.state); + row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); + table->upsert(std::move(row)).get_future().get(); } std::pair, std::vector> -load_tables_and_columns(const std::string& root_path, icm::string_map<> creds) +load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) { std::vector tables_out; std::vector columns_out; try { - // Open both catalog tables in parallel - auto tables_promise = deeplake_api::open_catalog_table(join_path(root_path, k_tables_name), icm::string_map<>(creds)); - auto columns_promise = deeplake_api::open_catalog_table(join_path(root_path, k_columns_name), icm::string_map<>(creds)); + // Open both per-database catalog tables in parallel + auto tables_promise = deeplake_api::open_catalog_table(join_db_path(root_path, db_name, k_tables_name), icm::string_map<>(creds)); + auto columns_promise = deeplake_api::open_catalog_table(join_db_path(root_path, db_name, k_columns_name), icm::string_map<>(creds)); icm::vector>> open_promises; open_promises.push_back(std::move(tables_promise)); @@ -411,6 +630,11 @@ load_tables_and_columns(const std::string& root_path, icm::string_map<> creds) meta.table_name = deeplake_api::array_to_string(table_it->second); meta.dataset_path = deeplake_api::array_to_string(path_it->second); meta.state = deeplake_api::array_to_string(state_it->second); + meta.db_name = db_name; + auto db_name_it = row.find("db_name"); + if (db_name_it != row.end()) { + meta.db_name = deeplake_api::array_to_string(db_name_it->second); + } auto updated_vec = load_int64_vector(updated_it->second); meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front(); @@ -473,33 +697,34 @@ load_tables_and_columns(const std::string& root_path, icm::string_map<> creds) return {tables_out, columns_out}; } catch (const std::exception& e) { - elog(WARNING, "Failed to load catalog tables and columns: %s", e.what()); + elog(WARNING, "Failed to load catalog tables and columns for db '%s': %s", db_name.c_str(), e.what()); return {tables_out, columns_out}; } catch (...) { - elog(WARNING, "Failed to load catalog tables and columns: unknown error"); + elog(WARNING, "Failed to load catalog tables and columns for db '%s': unknown error", db_name.c_str()); return {tables_out, columns_out}; } } -void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta) +void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta) { - auto table = open_catalog_table(root_path, k_tables_name, std::move(creds)); + auto table = open_db_catalog_table(root_path, db_name, k_tables_name, std::move(creds)); icm::string_map row; row["table_id"] = nd::adapt(meta.table_id); row["schema_name"] = nd::adapt(meta.schema_name); row["table_name"] = nd::adapt(meta.table_name); row["dataset_path"] = nd::adapt(meta.dataset_path); row["state"] = nd::adapt(meta.state); + row["db_name"] = nd::adapt(meta.db_name.empty() ? db_name : meta.db_name); row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at); table->upsert(std::move(row)).get_future().get(); } -void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector& columns) +void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& columns) { if (columns.empty()) { return; } - auto table = open_catalog_table(root_path, k_columns_name, std::move(creds)); + auto table = open_db_catalog_table(root_path, db_name, k_columns_name, std::move(creds)); icm::vector> rows; rows.reserve(columns.size()); for (const auto& col : columns) { @@ -517,6 +742,25 @@ void upsert_columns(const std::string& root_path, icm::string_map<> creds, const table->upsert_many(std::move(rows)).get_future().get(); } +void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& indexes) +{ + if (indexes.empty()) { + return; + } + auto table = open_db_catalog_table(root_path, db_name, k_indexes_name, std::move(creds)); + icm::vector> rows; + rows.reserve(indexes.size()); + for (const auto& idx : indexes) { + icm::string_map row; + row["table_id"] = nd::adapt(idx.table_id); + row["column_names"] = nd::adapt(idx.column_names); + row["index_type"] = nd::adapt(idx.index_type); + row["order_type"] = nd::adapt(idx.order_type); + rows.push_back(std::move(row)); + } + table->upsert_many(std::move(rows)).get_future().get(); +} + std::vector load_databases(const std::string& root_path, icm::string_map<> creds) { std::vector out; @@ -627,4 +871,36 @@ void bump_catalog_version(const std::string& root_path, icm::string_map<> creds) table->upsert(std::move(row)).get_future().get(); } +int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + try { + auto table = open_db_catalog_table(root_path, db_name, k_meta_name, std::move(creds)); + if (!table) { + return 0; + } + return static_cast(table->version().get_future().get()); + } catch (const std::exception& e) { + elog(WARNING, "Failed to read per-db catalog version for '%s': %s", db_name.c_str(), e.what()); + return 0; + } catch (...) { + elog(WARNING, "Failed to read per-db catalog version for '%s': unknown error", db_name.c_str()); + return 0; + } +} + +void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + auto table = open_db_catalog_table(root_path, db_name, k_meta_name, std::move(creds)); + icm::string_map row; + row["catalog_version"] = nd::adapt(static_cast(1)); + row["updated_at"] = nd::adapt(now_ms()); + table->upsert(std::move(row)).get_future().get(); +} + +std::shared_ptr +open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds) +{ + return open_db_catalog_table(root_path, db_name, k_meta_name, std::move(creds)); +} + } // namespace pg::dl_catalog diff --git a/cpp/deeplake_pg/dl_catalog.hpp b/cpp/deeplake_pg/dl_catalog.hpp index 52d680242e..08ba6f468f 100644 --- a/cpp/deeplake_pg/dl_catalog.hpp +++ b/cpp/deeplake_pg/dl_catalog.hpp @@ -3,9 +3,12 @@ #include #include +#include #include #include +namespace deeplake_api { class catalog_table; } + namespace pg::dl_catalog { struct table_meta @@ -15,6 +18,7 @@ struct table_meta std::string table_name; std::string dataset_path; std::string state; + std::string db_name; int64_t updated_at = 0; }; @@ -36,6 +40,14 @@ struct index_meta int32_t order_type = 0; }; +struct schema_meta +{ + std::string schema_name; // PK + std::string owner; + std::string state; // "ready" or "dropping" + int64_t updated_at = 0; +}; + struct database_meta { std::string db_name; // PK @@ -48,23 +60,44 @@ struct database_meta int64_t updated_at = 0; }; +// Shared (cluster-wide) catalog: meta + databases int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds); -std::vector load_tables(const std::string& root_path, icm::string_map<> creds); -std::vector load_columns(const std::string& root_path, icm::string_map<> creds); -std::vector load_indexes(const std::string& root_path, icm::string_map<> creds); +// Per-database catalog: tables + columns + indexes + meta +int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +// Per-database loaders (read from {root}/{db_name}/__deeplake_catalog/) +std::vector load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); +std::vector load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); +std::vector load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); // Load tables and columns in parallel for better performance std::pair, std::vector> -load_tables_and_columns(const std::string& root_path, icm::string_map<> creds); +load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +// Per-database schema catalog +std::vector load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); +void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta); -void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta); -void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector& columns); +// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/) +void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta); +void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& columns); +void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector& indexes); +// Shared (cluster-wide) database catalog std::vector load_databases(const std::string& root_path, icm::string_map<> creds); void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta); +// Global (shared) catalog version int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds); void bump_catalog_version(const std::string& root_path, icm::string_map<> creds); +// Per-database catalog version +int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); +void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + +// Open the per-database meta table handle (for parallel .version() calls in sync worker) +std::shared_ptr +open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds); + } // namespace pg::dl_catalog diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index b293f7e819..2d531c7537 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -9,7 +9,9 @@ extern "C" { #include #include +#include #include +#include #include #include #include @@ -586,6 +588,35 @@ static void process_utility(PlannedStmt* pstmt, } } } + + // Mark schema as "dropping" in the S3 catalog + if (pg::stateless_enabled) { + try { + auto root_path = pg::session_credentials::get_root_path(); + if (root_path.empty()) { + root_path = pg::utils::get_deeplake_root_directory(); + } + if (!root_path.empty()) { + auto creds = pg::session_credentials::get_credentials(); + const char* dbname = get_database_name(MyDatabaseId); + std::string db_name = dbname ? dbname : "postgres"; + if (dbname) pfree(const_cast(dbname)); + + pg::dl_catalog::ensure_catalog(root_path, creds); + pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds); + + pg::dl_catalog::schema_meta s_meta; + s_meta.schema_name = schema_name; + s_meta.state = "dropping"; + pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta); + + pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials()); + pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials()); + } + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake: failed to mark schema '%s' as dropping in catalog: %s", schema_name, e.what()); + } + } } } else if (stmt->removeType == OBJECT_DATABASE) { const char* query = "SELECT nspname, relname " @@ -691,6 +722,7 @@ static void process_utility(PlannedStmt* pstmt, } if (!root_path.empty()) { auto creds = pg::session_credentials::get_credentials(); + pg::dl_catalog::ensure_catalog(root_path, creds); pg::dl_catalog::database_meta db_meta; db_meta.db_name = dbstmt->dbname; db_meta.state = "dropping"; @@ -727,6 +759,7 @@ static void process_utility(PlannedStmt* pstmt, } if (!root_path.empty()) { auto creds = pg::session_credentials::get_credentials(); + pg::dl_catalog::ensure_catalog(root_path, creds); pg::dl_catalog::database_meta db_meta; db_meta.db_name = dbstmt->dbname; db_meta.state = "ready"; @@ -758,6 +791,40 @@ static void process_utility(PlannedStmt* pstmt, } } + // Post-hook: record CREATE SCHEMA in S3 catalog for multi-instance sync + if (IsA(pstmt->utilityStmt, CreateSchemaStmt) && pg::stateless_enabled) { + CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt; + try { + auto root_path = pg::session_credentials::get_root_path(); + if (root_path.empty()) { + root_path = pg::utils::get_deeplake_root_directory(); + } + if (!root_path.empty() && schemastmt->schemaname != nullptr) { + auto creds = pg::session_credentials::get_credentials(); + const char* dbname = get_database_name(MyDatabaseId); + std::string db_name = dbname ? dbname : "postgres"; + if (dbname) pfree(const_cast(dbname)); + + pg::dl_catalog::ensure_catalog(root_path, creds); + pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds); + + pg::dl_catalog::schema_meta s_meta; + s_meta.schema_name = schemastmt->schemaname; + s_meta.state = "ready"; + if (schemastmt->authrole != nullptr) { + s_meta.owner = schemastmt->authrole->rolename; + } + pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta); + + pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials()); + pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials()); + elog(DEBUG1, "pg_deeplake: recorded CREATE SCHEMA '%s' in catalog", schemastmt->schemaname); + } + } catch (const std::exception& e) { + elog(DEBUG1, "pg_deeplake: failed to record CREATE SCHEMA in catalog: %s", e.what()); + } + } + // Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset if (IsA(pstmt->utilityStmt, AlterTableStmt)) { AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt; diff --git a/cpp/deeplake_pg/logger.hpp b/cpp/deeplake_pg/logger.hpp index a47da65094..74a1a14dc7 100644 --- a/cpp/deeplake_pg/logger.hpp +++ b/cpp/deeplake_pg/logger.hpp @@ -37,7 +37,7 @@ class logger_adapter : public base::logger_adapter elog(DEBUG1, "%s", message.c_str()); break; case base::log_level::info: - elog(INFO, "%s", message.c_str()); + elog(LOG, "%s", message.c_str()); break; case base::log_level::warning: elog(WARNING, "%s", message.c_str()); diff --git a/cpp/deeplake_pg/pg_deeplake.cpp b/cpp/deeplake_pg/pg_deeplake.cpp index 93449a6d2c..1453800c9e 100644 --- a/cpp/deeplake_pg/pg_deeplake.cpp +++ b/cpp/deeplake_pg/pg_deeplake.cpp @@ -1,6 +1,8 @@ #include "pg_deeplake.hpp" +#include "dl_catalog.hpp" #include "logger.hpp" #include "table_storage.hpp" +#include "utils.hpp" #include #include @@ -9,6 +11,8 @@ extern "C" { #endif +#include +#include #include #ifdef __cplusplus @@ -260,6 +264,41 @@ void save_index_metadata(Oid oid) if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) { ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata"))); } + + // Persist index to shared catalog for stateless multi-instance sync. + // Skip when in catalog-only mode — the table was synced FROM the catalog, + // so writing back would be redundant and cause version bump loops. + if (pg::stateless_enabled && !pg::table_storage::is_catalog_only_create()) { + try { + auto root_dir = pg::session_credentials::get_root_path(); + if (root_dir.empty()) { + root_dir = pg::utils::get_deeplake_root_directory(); + } + if (!root_dir.empty()) { + auto creds = pg::session_credentials::get_credentials(); + const char* dbname = get_database_name(MyDatabaseId); + std::string db_name = dbname ? dbname : "postgres"; + if (dbname) pfree(const_cast(dbname)); + + const std::string& table_id = idx_info.table_name(); // already schema-qualified + + pg::dl_catalog::index_meta idx_meta; + idx_meta.table_id = table_id; + idx_meta.column_names = idx_info.get_column_names_string(); + idx_meta.index_type = std::string(deeplake_core::deeplake_index_type::to_string(idx_info.index_type())); + idx_meta.order_type = static_cast(idx_info.order_type()); + + std::vector indexes = {idx_meta}; + pg::dl_catalog::upsert_indexes(root_dir, db_name, creds, indexes); + pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, creds); + pg::dl_catalog::bump_catalog_version(root_dir, creds); + } + } catch (const std::exception& e) { + elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: %s", e.what()); + } catch (...) { + elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: unknown error"); + } + } } void load_index_metadata() diff --git a/cpp/deeplake_pg/sync_worker.cpp b/cpp/deeplake_pg/sync_worker.cpp index 051d52fc5f..f56fbd935f 100644 --- a/cpp/deeplake_pg/sync_worker.cpp +++ b/cpp/deeplake_pg/sync_worker.cpp @@ -32,12 +32,17 @@ extern "C" { #include "table_storage.hpp" #include "utils.hpp" +#include +#include +#include + #include #include +#include #include // GUC variables -int deeplake_sync_interval_ms = 2000; // Default 2 seconds +int deeplake_sync_interval_ms = 1000; // Default 1 second // Forward declaration (defined in the anonymous namespace below) namespace { bool execute_via_libpq(const char* dbname, const char* sql); } @@ -305,98 +310,247 @@ void deeplake_sync_databases_from_catalog(const std::string& root_path, icm::str } /** - * Sync tables from the deeplake catalog to PostgreSQL. - * - * This function checks the catalog for tables that exist in the deeplake - * catalog but not in PostgreSQL, and creates them. + * Sync schemas for a specific database from pre-loaded catalog data via libpq. + * Creates missing schemas in the target database. + */ +void deeplake_sync_schemas_for_db(const std::string& db_name, + const std::vector& schemas) +{ + for (const auto& meta : schemas) { + if (meta.state == "dropping") { + continue; + } + + // Skip system schemas + if (meta.schema_name == "public" || meta.schema_name == "pg_catalog" || + meta.schema_name == "information_schema" || + meta.schema_name.substr(0, 3) == "pg_") { + continue; + } + + StringInfoData buf; + initStringInfo(&buf); + appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s", + quote_identifier(meta.schema_name.c_str())); + + if (execute_via_libpq(db_name.c_str(), buf.data)) { + elog(LOG, "pg_deeplake sync: created schema '%s' in database '%s'", + meta.schema_name.c_str(), db_name.c_str()); + } + + pfree(buf.data); + } +} + +/** + * Sync tables for a specific database from pre-loaded catalog data via libpq. + * Creates missing tables in the target database. + */ +/** + * Parse comma-separated column names string into a vector. + * The column_names string uses trailing comma format: "col1,col2," */ -void deeplake_sync_tables_from_catalog(const std::string& root_path, icm::string_map<> creds) +std::vector parse_column_names(const std::string& column_names) { - // Load tables and columns in parallel for better performance - auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_path, creds); + std::vector result; + std::string current; + for (char c : column_names) { + if (c == ',') { + if (!current.empty()) { + result.push_back(current); + current.clear(); + } + } else { + current += c; + } + } + if (!current.empty()) { + result.push_back(current); + } + return result; +} - for (const auto& meta : catalog_tables) { - // Skip tables marked as dropping +void deeplake_sync_tables_for_db(const std::string& db_name, + const std::vector& tables, + const std::vector& columns, + const std::vector& indexes) +{ + for (const auto& meta : tables) { if (meta.state == "dropping") { continue; } const std::string qualified_name = meta.schema_name + "." + meta.table_name; - // Check if table exists in PostgreSQL - auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1); - Oid relid = RangeVarGetRelid(rel, NoLock, true); + // Gather columns for this table, sorted by position + std::vector table_columns; + for (const auto& col : columns) { + if (col.table_id == meta.table_id) { + table_columns.push_back(col); + } + } + std::sort(table_columns.begin(), table_columns.end(), + [](const auto& a, const auto& b) { return a.position < b.position; }); - if (!OidIsValid(relid)) { - // Gather columns for this table, sorted by position - std::vector table_columns; - for (const auto& col : catalog_columns) { - if (col.table_id == meta.table_id) { - table_columns.push_back(col); - } + if (table_columns.empty()) { + elog(DEBUG1, "pg_deeplake sync: no columns for %s in db %s, skipping", + qualified_name.c_str(), db_name.c_str()); + continue; + } + + // Find indexes for this table + std::vector table_indexes; + for (const auto& idx : indexes) { + if (idx.table_id == meta.table_id) { + table_indexes.push_back(idx); } - std::sort(table_columns.begin(), table_columns.end(), - [](const auto& a, const auto& b) { return a.position < b.position; }); + } - if (table_columns.empty()) { - elog(DEBUG1, "pg_deeplake sync: no columns found for table %s, skipping", qualified_name.c_str()); - continue; + // Determine which columns are part of a primary key (inverted_index on non-nullable columns) + // The primary key columns are stored as comma-separated names in column_names + std::vector pk_columns; + for (const auto& idx : table_indexes) { + if (idx.index_type == "inverted_index") { + pk_columns = parse_column_names(idx.column_names); + break; } + } + + const char* qschema = quote_identifier(meta.schema_name.c_str()); + const char* qtable = quote_identifier(meta.table_name.c_str()); - const char* qschema = quote_identifier(meta.schema_name.c_str()); - const char* qtable = quote_identifier(meta.table_name.c_str()); + // Combine schema + table creation into a single SQL statement + StringInfoData buf; + initStringInfo(&buf); + appendStringInfo(&buf, "CREATE SCHEMA IF NOT EXISTS %s; ", qschema); + appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable); - // Build CREATE TABLE IF NOT EXISTS statement - StringInfoData buf; - initStringInfo(&buf); - appendStringInfo(&buf, "CREATE TABLE IF NOT EXISTS %s.%s (", qschema, qtable); + bool first = true; + for (const auto& col : table_columns) { + if (!first) { + appendStringInfoString(&buf, ", "); + } + first = false; + appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); + } - bool first = true; - for (const auto& col : table_columns) { - if (!first) { + // Add PRIMARY KEY table constraint if we have PK columns + if (!pk_columns.empty()) { + appendStringInfoString(&buf, ", PRIMARY KEY ("); + for (size_t i = 0; i < pk_columns.size(); ++i) { + if (i > 0) { appendStringInfoString(&buf, ", "); } - first = false; - appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); + appendStringInfoString(&buf, quote_identifier(pk_columns[i].c_str())); } - appendStringInfo(&buf, ") USING deeplake"); - - // Wrap in subtransaction so that if another backend concurrently - // creates the same table (race on composite type), the error is - // caught and we continue instead of aborting the sync cycle. - MemoryContext saved_context = CurrentMemoryContext; - ResourceOwner saved_owner = CurrentResourceOwner; - - BeginInternalSubTransaction(NULL); - PG_TRY(); - { - pg::utils::spi_connector connector; - - // Create schema if needed - StringInfoData schema_buf; - initStringInfo(&schema_buf); - appendStringInfo(&schema_buf, "CREATE SCHEMA IF NOT EXISTS %s", qschema); - SPI_execute(schema_buf.data, false, 0); - pfree(schema_buf.data); - - if (SPI_execute(buf.data, false, 0) == SPI_OK_UTILITY) { - elog(LOG, "pg_deeplake sync: successfully created table %s", qualified_name.c_str()); - } + appendStringInfoChar(&buf, ')'); + } + + appendStringInfo(&buf, ") USING deeplake"); + + if (execute_via_libpq(db_name.c_str(), buf.data)) { + elog(LOG, "pg_deeplake sync: created table %s in database %s", + qualified_name.c_str(), db_name.c_str()); + } - ReleaseCurrentSubTransaction(); + pfree(buf.data); + } +} + +/** + * Sync all databases: check per-db versions in parallel, load changed ones, + * create missing tables via libpq. + * + * Called OUTSIDE transaction context. + */ +void sync_all_databases( + const std::string& root_path, + icm::string_map<> creds, + std::unordered_map& last_db_versions) +{ + // Step 1: Sync databases (create missing ones, install extension) + deeplake_sync_databases_from_catalog(root_path, creds); + + // Step 2: Get list of all databases from the shared catalog + auto databases = pg::dl_catalog::load_databases(root_path, creds); + + // Always include "postgres" which may not be in the databases catalog + bool has_postgres = false; + for (const auto& db : databases) { + if (db.db_name == "postgres") { has_postgres = true; break; } + } + if (!has_postgres) { + pg::dl_catalog::database_meta pg_meta; + pg_meta.db_name = "postgres"; + pg_meta.state = "ready"; + databases.push_back(std::move(pg_meta)); + } + + // Step 3: Open per-db meta tables and check versions in parallel + std::vector db_names; + std::vector> meta_handles; + + for (const auto& db : databases) { + if (db.db_name == "template0" || db.db_name == "template1") { + continue; + } + try { + auto handle = pg::dl_catalog::open_db_meta_table(root_path, db.db_name, creds); + if (handle) { + db_names.push_back(db.db_name); + meta_handles.push_back(std::move(handle)); } - PG_CATCH(); - { - // Another backend created this table concurrently — not an error. - MemoryContextSwitchTo(saved_context); - CurrentResourceOwner = saved_owner; - RollbackAndReleaseCurrentSubTransaction(); - FlushErrorState(); - elog(DEBUG1, "pg_deeplake sync: concurrent creation of %s, skipping", qualified_name.c_str()); + } catch (...) { + // Per-db catalog may not exist yet — skip silently + elog(DEBUG1, "pg_deeplake sync: no per-db catalog for '%s', skipping", db.db_name.c_str()); + } + } + + if (db_names.empty()) { + return; + } + + // Fire all version() promises in parallel (1 round-trip wall-clock) + icm::vector> version_promises; + version_promises.reserve(db_names.size()); + for (auto& handle : meta_handles) { + version_promises.push_back(handle->version()); + } + auto versions = async::combine(std::move(version_promises)).get_future().get(); + + // Step 4: Identify databases whose version changed since last sync + std::vector changed_dbs; + for (size_t i = 0; i < db_names.size(); ++i) { + int64_t ver = static_cast(versions[i]); + auto it = last_db_versions.find(db_names[i]); + if (it == last_db_versions.end() || it->second != ver) { + changed_dbs.push_back(db_names[i]); + last_db_versions[db_names[i]] = ver; + } + } + + if (changed_dbs.empty()) { + return; + } + + // Step 5: For each changed database, load schemas first, then tables+columns and sync + for (const auto& db_name : changed_dbs) { + try { + // Sync schemas before tables so CREATE TABLE can find the target schema + auto schemas = pg::dl_catalog::load_schemas(root_path, db_name, creds); + if (!schemas.empty()) { + deeplake_sync_schemas_for_db(db_name, schemas); } - PG_END_TRY(); - pfree(buf.data); + auto [tables, columns] = pg::dl_catalog::load_tables_and_columns(root_path, db_name, creds); + auto indexes = pg::dl_catalog::load_indexes(root_path, db_name, creds); + deeplake_sync_tables_for_db(db_name, tables, columns, indexes); + elog(LOG, "pg_deeplake sync: synced %zu schemas, %zu tables, %zu indexes for database '%s'", + schemas.size(), tables.size(), indexes.size(), db_name.c_str()); + } catch (const std::exception& e) { + elog(WARNING, "pg_deeplake sync: failed to sync database '%s': %s", db_name.c_str(), e.what()); + } catch (...) { + elog(WARNING, "pg_deeplake sync: failed to sync database '%s': unknown error", db_name.c_str()); } } } @@ -421,6 +575,7 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) int64_t last_catalog_version = 0; std::string last_root_path; // Track root_path to detect changes + std::unordered_map last_db_versions; while (!got_sigterm) { @@ -467,15 +622,16 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) if (!last_root_path.empty()) { pg::table_storage::instance().reset_and_load_table_metadata(); last_catalog_version = 0; + last_db_versions.clear(); } last_root_path = root_path; } - // Use existing catalog version API to check for changes (now fast with cache) + // Fast global version check (single HEAD request via cached meta table) int64_t current_version = pg::dl_catalog::get_catalog_version(root_path, creds); if (current_version != last_catalog_version) { - // Save state for database sync (which happens outside transaction) + // Save state for sync (which happens outside transaction) sync_root_path = root_path; sync_creds = creds; need_sync = true; @@ -494,36 +650,16 @@ PGDLLEXPORT void deeplake_sync_worker_main(Datum main_arg) PopActiveSnapshot(); CommitTransactionCommand(); - // Sync databases via libpq OUTSIDE transaction context - // (CREATE DATABASE cannot run inside a transaction block) + // All sync happens OUTSIDE transaction context via libpq if (need_sync && !sync_root_path.empty()) { try { - deeplake_sync_databases_from_catalog(sync_root_path, sync_creds); + sync_all_databases(sync_root_path, sync_creds, last_db_versions); + elog(DEBUG1, "pg_deeplake sync: completed (global version %ld)", last_catalog_version); } catch (const std::exception& e) { - elog(WARNING, "pg_deeplake sync: database sync failed: %s", e.what()); + elog(WARNING, "pg_deeplake sync: sync failed: %s", e.what()); } catch (...) { - elog(WARNING, "pg_deeplake sync: database sync failed: unknown error"); + elog(WARNING, "pg_deeplake sync: sync failed: unknown error"); } - - // Re-enter transaction for table sync - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - - PG_TRY(); - { - deeplake_sync_tables_from_catalog(sync_root_path, sync_creds); - elog(DEBUG1, "pg_deeplake sync: synced (catalog version %ld)", last_catalog_version); - } - PG_CATCH(); - { - EmitErrorReport(); - FlushErrorState(); - } - PG_END_TRY(); - - PopActiveSnapshot(); - CommitTransactionCommand(); } pgstat_report_stat(true); diff --git a/cpp/deeplake_pg/table_storage.cpp b/cpp/deeplake_pg/table_storage.cpp index b4f67ad7b8..b18b84b406 100644 --- a/cpp/deeplake_pg/table_storage.cpp +++ b/cpp/deeplake_pg/table_storage.cpp @@ -12,6 +12,7 @@ extern "C" { #include #include #include +#include #include #include #include @@ -130,6 +131,15 @@ void convert_pg_to_nd(const pg::table_data& table_data, } } +std::string get_current_database_name() +{ + const char* dbname = get_database_name(MyDatabaseId); + if (!dbname) return "postgres"; + std::string result(dbname); + pfree(const_cast(dbname)); + return result; +} + } // unnamed namespace namespace pg { @@ -163,6 +173,11 @@ icm::string_map<> session_credentials::get_credentials() std::string session_credentials::get_root_path() { + // Environment variable takes priority over per-session GUC + auto root = base::getenv("DEEPLAKE_ROOT_PATH", ""); + if (!root.empty()) { + return root; + } if (root_path_guc_string != nullptr && std::strlen(root_path_guc_string) > 0) { return std::string(root_path_guc_string); } @@ -240,7 +255,9 @@ void table_storage::save_table_metadata(const pg::table_data& table_data) return; } auto creds = session_credentials::get_credentials(); + const auto db_name = get_current_database_name(); pg::dl_catalog::ensure_catalog(root_dir, creds); + pg::dl_catalog::ensure_db_catalog(root_dir, db_name, creds); auto [schema_name, simple_table_name] = split_table_name(table_name); const std::string table_id = schema_name + "." + simple_table_name; @@ -251,7 +268,8 @@ void table_storage::save_table_metadata(const pg::table_data& table_data) meta.table_name = simple_table_name; meta.dataset_path = ds_path; meta.state = "ready"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); + meta.db_name = db_name; + pg::dl_catalog::upsert_table(root_dir, db_name, creds, meta); // Save column metadata to catalog TupleDesc tupdesc = table_data.get_tuple_descriptor(); @@ -269,10 +287,25 @@ void table_storage::save_table_metadata(const pg::table_data& table_data) col.position = i; columns.push_back(std::move(col)); } - pg::dl_catalog::upsert_columns(root_dir, creds, columns); + pg::dl_catalog::upsert_columns(root_dir, db_name, creds, columns); + + // Belt-and-suspenders: ensure the schema is recorded even if + // the CREATE SCHEMA hook was missed (e.g., schema created before + // the extension was loaded, or via a different code path). + if (schema_name != "public") { + try { + pg::dl_catalog::schema_meta s_meta; + s_meta.schema_name = schema_name; + s_meta.state = "ready"; + pg::dl_catalog::upsert_schema(root_dir, db_name, creds, s_meta); + } catch (...) { + elog(DEBUG1, "pg_deeplake: failed to upsert schema '%s' in catalog (non-fatal)", schema_name.c_str()); + } + } + pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); + catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); } } @@ -297,9 +330,11 @@ void table_storage::load_table_metadata() // Stateless catalog sync (only when enabled and root_dir is configured) if (pg::stateless_enabled && !root_dir.empty()) { - // Fast path: if already loaded, just check version without ensure_catalog + const auto db_name = get_current_database_name(); + + // Fast path: if already loaded, just check per-db version if (tables_loaded_) { - const auto current_version = pg::dl_catalog::get_catalog_version(root_dir, creds); + const auto current_version = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, creds); if (current_version == catalog_version_) { return; } @@ -310,18 +345,23 @@ void table_storage::load_table_metadata() catalog_version_ = current_version; } - // Ensure catalog exists and get version in one call - const auto version = pg::dl_catalog::ensure_catalog(root_dir, creds); + // Ensure both shared and per-database catalogs exist + pg::dl_catalog::ensure_catalog(root_dir, creds); + const auto version = pg::dl_catalog::ensure_db_catalog(root_dir, db_name, creds); if (catalog_version_ == 0) { catalog_version_ = version; } tables_loaded_ = true; - // Load tables and columns in parallel - auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_dir, creds); + // Load tables, columns, and indexes from per-database path + auto [catalog_tables, catalog_columns] = pg::dl_catalog::load_tables_and_columns(root_dir, db_name, creds); + auto catalog_indexes = pg::dl_catalog::load_indexes(root_dir, db_name, creds); if (!catalog_tables.empty()) { for (const auto& meta : catalog_tables) { + if (meta.state == "dropping") { + continue; + } const std::string qualified_name = meta.schema_name + "." + meta.table_name; auto* rel = makeRangeVar(pstrdup(meta.schema_name.c_str()), pstrdup(meta.table_name.c_str()), -1); Oid relid = RangeVarGetRelid(rel, NoLock, true); @@ -348,6 +388,29 @@ void table_storage::load_table_metadata() continue; } + // Find primary key columns from indexes + std::vector pk_columns; + for (const auto& idx : catalog_indexes) { + if (idx.table_id == meta.table_id && idx.index_type == "inverted_index") { + // Parse comma-separated column names + std::string current; + for (char c : idx.column_names) { + if (c == ',') { + if (!current.empty()) { + pk_columns.push_back(current); + current.clear(); + } + } else { + current += c; + } + } + if (!current.empty()) { + pk_columns.push_back(current); + } + break; + } + } + // Build CREATE TABLE IF NOT EXISTS from catalog metadata. // Wrap in a subtransaction so that if another backend concurrently // creates the same table (race on composite type), the error is @@ -367,6 +430,19 @@ void table_storage::load_table_metadata() first = false; appendStringInfo(&buf, "%s %s", quote_identifier(col.column_name.c_str()), col.pg_type.c_str()); } + + // Add PRIMARY KEY table constraint if found in catalog indexes + if (!pk_columns.empty()) { + appendStringInfoString(&buf, ", PRIMARY KEY ("); + for (size_t i = 0; i < pk_columns.size(); ++i) { + if (i > 0) { + appendStringInfoString(&buf, ", "); + } + appendStringInfoString(&buf, quote_identifier(pk_columns[i].c_str())); + } + appendStringInfoChar(&buf, ')'); + } + appendStringInfo(&buf, ") USING deeplake"); MemoryContext saved_context = CurrentMemoryContext; @@ -375,8 +451,10 @@ void table_storage::load_table_metadata() BeginInternalSubTransaction(NULL); PG_TRY(); { - catalog_only_guard co_guard; - pg::utils::spi_connector connector; + table_storage::set_catalog_only_create(true); + if (SPI_connect() != SPI_OK_CONNECT) { + elog(ERROR, "Could not connect to SPI manager"); + } bool pushed_snapshot = false; if (!ActiveSnapshotSet()) { PushActiveSnapshot(GetTransactionSnapshot()); @@ -396,11 +474,14 @@ void table_storage::load_table_metadata() PopActiveSnapshot(); } + SPI_finish(); + table_storage::set_catalog_only_create(false); ReleaseCurrentSubTransaction(); } PG_CATCH(); { // Another backend created this table concurrently — not an error. + table_storage::set_catalog_only_create(false); MemoryContextSwitchTo(saved_context); CurrentResourceOwner = saved_owner; RollbackAndReleaseCurrentSubTransaction(); @@ -538,6 +619,7 @@ void table_storage::load_table_metadata() try { // Seed the DL catalog with legacy metadata (only when stateless is enabled). if (pg::stateless_enabled && !root_dir.empty()) { + const auto db_name = get_current_database_name(); auto [schema_name, simple_table_name] = split_table_name(table_name); pg::dl_catalog::table_meta meta; meta.table_id = schema_name + "." + simple_table_name; @@ -545,7 +627,8 @@ void table_storage::load_table_metadata() meta.table_name = simple_table_name; meta.dataset_path = ds_path; meta.state = "ready"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); + meta.db_name = db_name; + pg::dl_catalog::upsert_table(root_dir, db_name, creds, meta); catalog_seeded = true; } @@ -584,8 +667,10 @@ void table_storage::load_table_metadata() } } if (catalog_seeded && pg::stateless_enabled && !root_dir.empty()) { + const auto db_name = get_current_database_name(); + pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); + catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); } load_views(); load_schema_name(); @@ -702,8 +787,11 @@ void table_storage::create_table(const std::string& table_name, Oid table_id, Tu if (session_root.empty()) { session_root = pg::utils::get_deeplake_root_directory(); } - // Construct path: root_dir/schema_name/table_name - dataset_path = session_root + "/" + schema_name + "/" + simple_table_name; + // Construct path: root_dir/db_name/schema_name/table_name + // root_path is the global root; include the database name so each + // database's datasets are stored under their own prefix. + const auto db_name = get_current_database_name(); + dataset_path = session_root + "/" + db_name + "/" + schema_name + "/" + simple_table_name; } // Get credentials from current session @@ -995,7 +1083,9 @@ void table_storage::drop_table(const std::string& table_name) return root; }(); if (!root_dir.empty()) { + const auto db_name = get_current_database_name(); pg::dl_catalog::ensure_catalog(root_dir, creds); + pg::dl_catalog::ensure_db_catalog(root_dir, db_name, creds); auto [schema_name, simple_table_name] = split_table_name(table_name); pg::dl_catalog::table_meta meta; meta.table_id = schema_name + "." + simple_table_name; @@ -1003,9 +1093,11 @@ void table_storage::drop_table(const std::string& table_name) meta.table_name = simple_table_name; meta.dataset_path = table_data.get_dataset_path().url(); meta.state = "dropping"; - pg::dl_catalog::upsert_table(root_dir, creds, meta); + meta.db_name = db_name; + pg::dl_catalog::upsert_table(root_dir, db_name, creds, meta); + pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials()); - catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials()); + catalog_version_ = pg::dl_catalog::get_db_catalog_version(root_dir, db_name, session_credentials::get_credentials()); } } diff --git a/cpp/deeplake_pg/table_storage.hpp b/cpp/deeplake_pg/table_storage.hpp index c15588cfe7..62ee23ecb1 100644 --- a/cpp/deeplake_pg/table_storage.hpp +++ b/cpp/deeplake_pg/table_storage.hpp @@ -298,6 +298,11 @@ class table_storage return catalog_only_create_; } + static void set_catalog_only_create(bool value) noexcept + { + catalog_only_create_ = value; + } + private: static inline thread_local bool in_ddl_context_ = false; static inline thread_local bool catalog_only_create_ = false; diff --git a/postgres/pg_deeplake--1.0.sql b/postgres/pg_deeplake--1.0.sql index 907e9bd9bd..e77414a27b 100644 --- a/postgres/pg_deeplake--1.0.sql +++ b/postgres/pg_deeplake--1.0.sql @@ -13,6 +13,10 @@ COMMENT ON DOMAIN VIDEO IS 'Binary video data stored as BYTEA'; CREATE DOMAIN FILE AS BYTEA; COMMENT ON DOMAIN FILE IS 'Binary file data stored as BYTEA'; +-- FILE_ID domain: UUID alias for file identifiers +CREATE DOMAIN FILE_ID AS UUID; +COMMENT ON DOMAIN FILE_ID IS 'UUID identifier for files'; + CREATE FUNCTION handle_index_creation() RETURNS event_trigger AS 'pg_deeplake' LANGUAGE C VOLATILE; -- Create the event trigger to listen for CREATE INDEX events diff --git a/postgres/tests/py_tests/conftest.py b/postgres/tests/py_tests/conftest.py index c1037e0934..aa0061c7bf 100644 --- a/postgres/tests/py_tests/conftest.py +++ b/postgres/tests/py_tests/conftest.py @@ -236,9 +236,9 @@ async def db_conn(pg_server) -> AsyncGenerator[asyncpg.Connection, None]: ) try: - # Setup: Clean extension state - await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await conn.execute("CREATE EXTENSION pg_deeplake") + # Setup: ensure extension is available without potentially blocking DROP. + # DROP EXTENSION can wait on concurrent backend locks and stall tests. + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") # Load utility functions from utils.psql script_path = Path(__file__).parent # py_tests/ diff --git a/postgres/tests/py_tests/test_concurrent_insert_index.py b/postgres/tests/py_tests/test_concurrent_insert_index.py index 9252766658..4c55b716a6 100644 --- a/postgres/tests/py_tests/test_concurrent_insert_index.py +++ b/postgres/tests/py_tests/test_concurrent_insert_index.py @@ -217,7 +217,6 @@ async def test_concurrent_insert_and_index_creation(db_conn: asyncpg.Connection) try: # Initialization: Create domains and table print("Setting up domains and table...") - await db_conn.execute("CREATE DOMAIN file_id AS UUID") await db_conn.execute("CREATE DOMAIN chunk_id AS TEXT") await db_conn.execute(""" @@ -381,7 +380,6 @@ async def do_cleanup(): try: await db_conn.execute("DROP TABLE IF EXISTS chunk_text_image CASCADE") await db_conn.execute("DROP DOMAIN IF EXISTS chunk_id CASCADE") - await db_conn.execute("DROP DOMAIN IF EXISTS file_id CASCADE") print("✓ Cleanup complete") except Exception as e: print(f"⚠ Cleanup failed: {e}") diff --git a/postgres/tests/py_tests/test_drop_table_column.py b/postgres/tests/py_tests/test_drop_table_column.py index 93f6b858a8..505f9282d5 100644 --- a/postgres/tests/py_tests/test_drop_table_column.py +++ b/postgres/tests/py_tests/test_drop_table_column.py @@ -25,7 +25,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): try: # Create table with multiple columns await db_conn.execute(""" - CREATE TABLE vectors ( + CREATE TABLE drop_col_vectors ( id SERIAL PRIMARY KEY, v1 float4[], v2 float4[] @@ -34,7 +34,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): # Create index on v2 (not v1) await db_conn.execute(""" - CREATE INDEX index_for_v2 ON vectors USING deeplake_index (v2 DESC) + CREATE INDEX index_for_v2 ON drop_col_vectors USING deeplake_index (v2 DESC) """) # Verify index exists in pg_class @@ -60,7 +60,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): f"Dataset directory '{dataset_path}' should exist before DROP COLUMN" # DROP non-indexed column (v1) - index should remain - await db_conn.execute("ALTER TABLE vectors DROP COLUMN v1") + await db_conn.execute("ALTER TABLE drop_col_vectors DROP COLUMN v1") # Verify index still exists after dropping non-indexed column await assertions.assert_query_row_count( @@ -83,7 +83,7 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): f"Dataset directory '{dataset_path_after_v1}' should exist after dropping non-indexed column" # DROP indexed column (v2) - index should be removed - await db_conn.execute("ALTER TABLE vectors DROP COLUMN v2") + await db_conn.execute("ALTER TABLE drop_col_vectors DROP COLUMN v2") # Verify index removed from pg_class await assertions.assert_query_row_count( @@ -108,4 +108,4 @@ async def test_drop_table_column(db_conn: asyncpg.Connection): finally: # Cleanup (in case test fails) await db_conn.execute("DROP INDEX IF EXISTS index_for_v2 CASCADE") - await db_conn.execute("DROP TABLE IF EXISTS vectors CASCADE") + await db_conn.execute("DROP TABLE IF EXISTS drop_col_vectors CASCADE") diff --git a/postgres/tests/py_tests/test_root_path.py b/postgres/tests/py_tests/test_root_path.py index f8e0c3d5ff..e482dcd340 100644 --- a/postgres/tests/py_tests/test_root_path.py +++ b/postgres/tests/py_tests/test_root_path.py @@ -55,7 +55,10 @@ async def test_root_path_basic(db_conn: asyncpg.Connection, temp_dir_for_postgre print(f"✓ Dataset path from metadata: {ds_path}") # Verify the path starts with our root_path - expected_path = os.path.join(root_path, "public", "test_root_path") + # The extension now auto-includes the database name in the path: + # root_path / db_name / schema / table + db_name = await db_conn.fetchval("SELECT current_database()") + expected_path = os.path.join(root_path, db_name, "public", "test_root_path") # Strip trailing slash for comparison ds_path_normalized = ds_path.rstrip('/') expected_path_normalized = expected_path.rstrip('/') diff --git a/postgres/tests/py_tests/test_startup_latency.py b/postgres/tests/py_tests/test_startup_latency.py index 6f01809043..d263629571 100644 --- a/postgres/tests/py_tests/test_startup_latency.py +++ b/postgres/tests/py_tests/test_startup_latency.py @@ -138,8 +138,7 @@ async def measure_connection_latency( # 2. Measure extension load time if with_extension: ext_start = time.perf_counter() - await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await conn.execute("CREATE EXTENSION pg_deeplake") + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") metrics.extension_load_time_ms = (time.perf_counter() - ext_start) * 1000 # 3. Measure root_path set time (triggers catalog loading in stateless mode) @@ -205,8 +204,7 @@ async def measure_catalog_discovery_latency( try: # Load extension ext_start = time.perf_counter() - await conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await conn.execute("CREATE EXTENSION pg_deeplake") + await conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") metrics.extension_load_time_ms = (time.perf_counter() - ext_start) * 1000 # Set root_path - this triggers catalog discovery @@ -338,8 +336,7 @@ async def test_stateless_catalog_loading_latency(pg_server, temp_root_path): ) try: - await setup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await setup_conn.execute("CREATE EXTENSION pg_deeplake") + await setup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await setup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") # Create multiple tables to populate the catalog @@ -389,8 +386,7 @@ async def test_stateless_catalog_loading_latency(pg_server, temp_root_path): statement_cache_size=0 ) try: - await cleanup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await cleanup_conn.execute("CREATE EXTENSION pg_deeplake") + await cleanup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await cleanup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") for i in range(num_tables): await cleanup_conn.execute(f"DROP TABLE IF EXISTS catalog_test_{i} CASCADE") @@ -477,8 +473,7 @@ async def test_multi_table_catalog_scaling(pg_server, temp_root_path): ) try: - await setup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await setup_conn.execute("CREATE EXTENSION pg_deeplake") + await setup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await setup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") # Create tables @@ -513,8 +508,7 @@ async def test_multi_table_catalog_scaling(pg_server, temp_root_path): statement_cache_size=0 ) try: - await cleanup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await cleanup_conn.execute("CREATE EXTENSION pg_deeplake") + await cleanup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await cleanup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") for i in range(num_tables): await cleanup_conn.execute(f"DROP TABLE IF EXISTS scale_test_{num_tables}_{i} CASCADE") @@ -576,8 +570,7 @@ async def test_cold_start_simulation(pg_server, temp_root_path): ) try: - await setup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await setup_conn.execute("CREATE EXTENSION pg_deeplake") + await setup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await setup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") await setup_conn.execute(""" @@ -645,8 +638,7 @@ async def test_cold_start_simulation(pg_server, temp_root_path): statement_cache_size=0 ) try: - await cleanup_conn.execute("DROP EXTENSION IF EXISTS pg_deeplake CASCADE") - await cleanup_conn.execute("CREATE EXTENSION pg_deeplake") + await cleanup_conn.execute("CREATE EXTENSION IF NOT EXISTS pg_deeplake") await cleanup_conn.execute(f"SET deeplake.root_path = '{temp_root_path}'") await cleanup_conn.execute("DROP TABLE IF EXISTS existing_data CASCADE") finally: