Skip to content

Commit 63ecadf

Browse files
authored
Merge pull request #3139 from activeloopai/stateless-extension
Stateless sync. Per db tables, indexes, schemas.
2 parents d9224c3 + 72bf869 commit 63ecadf

14 files changed

+874
-229
lines changed

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 357 additions & 81 deletions
Large diffs are not rendered by default.

cpp/deeplake_pg/dl_catalog.hpp

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
#include <icm/string_map.hpp>
44

55
#include <cstdint>
6+
#include <memory>
67
#include <string>
78
#include <vector>
89

10+
namespace deeplake_api { class catalog_table; }
11+
912
namespace pg::dl_catalog {
1013

1114
struct table_meta
@@ -15,6 +18,7 @@ struct table_meta
1518
std::string table_name;
1619
std::string dataset_path;
1720
std::string state;
21+
std::string db_name;
1822
int64_t updated_at = 0;
1923
};
2024

@@ -36,6 +40,14 @@ struct index_meta
3640
int32_t order_type = 0;
3741
};
3842

43+
struct schema_meta
44+
{
45+
std::string schema_name; // PK
46+
std::string owner;
47+
std::string state; // "ready" or "dropping"
48+
int64_t updated_at = 0;
49+
};
50+
3951
struct database_meta
4052
{
4153
std::string db_name; // PK
@@ -48,23 +60,44 @@ struct database_meta
4860
int64_t updated_at = 0;
4961
};
5062

63+
// Shared (cluster-wide) catalog: meta + databases
5164
int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds);
5265

53-
std::vector<table_meta> load_tables(const std::string& root_path, icm::string_map<> creds);
54-
std::vector<column_meta> load_columns(const std::string& root_path, icm::string_map<> creds);
55-
std::vector<index_meta> load_indexes(const std::string& root_path, icm::string_map<> creds);
66+
// Per-database catalog: tables + columns + indexes + meta
67+
int64_t ensure_db_catalog(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
68+
69+
// Per-database loaders (read from {root}/{db_name}/__deeplake_catalog/)
70+
std::vector<table_meta> load_tables(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
71+
std::vector<column_meta> load_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
72+
std::vector<index_meta> load_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
5673

5774
// Load tables and columns in parallel for better performance
5875
std::pair<std::vector<table_meta>, std::vector<column_meta>>
59-
load_tables_and_columns(const std::string& root_path, icm::string_map<> creds);
76+
load_tables_and_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
77+
78+
// Per-database schema catalog
79+
std::vector<schema_meta> load_schemas(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
80+
void upsert_schema(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const schema_meta& meta);
6081

61-
void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta);
62-
void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector<column_meta>& columns);
82+
// Per-database upserts (write to {root}/{db_name}/__deeplake_catalog/)
83+
void upsert_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const table_meta& meta);
84+
void upsert_columns(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<column_meta>& columns);
85+
void upsert_indexes(const std::string& root_path, const std::string& db_name, icm::string_map<> creds, const std::vector<index_meta>& indexes);
6386

87+
// Shared (cluster-wide) database catalog
6488
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds);
6589
void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta);
6690

91+
// Global (shared) catalog version
6792
int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds);
6893
void bump_catalog_version(const std::string& root_path, icm::string_map<> creds);
6994

95+
// Per-database catalog version
96+
int64_t get_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
97+
void bump_db_catalog_version(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
98+
99+
// Open the per-database meta table handle (for parallel .version() calls in sync worker)
100+
std::shared_ptr<deeplake_api::catalog_table>
101+
open_db_meta_table(const std::string& root_path, const std::string& db_name, icm::string_map<> creds);
102+
70103
} // namespace pg::dl_catalog

cpp/deeplake_pg/extension_init.cpp

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ extern "C" {
99
#include <postgres.h>
1010

1111
#include <catalog/namespace.h>
12+
#include <commands/dbcommands.h>
1213
#include <commands/defrem.h>
14+
#include <miscadmin.h>
1315
#include <commands/vacuum.h>
1416
#include <nodes/nodeFuncs.h>
1517
#include <optimizer/planner.h>
@@ -586,6 +588,35 @@ static void process_utility(PlannedStmt* pstmt,
586588
}
587589
}
588590
}
591+
592+
// Mark schema as "dropping" in the S3 catalog
593+
if (pg::stateless_enabled) {
594+
try {
595+
auto root_path = pg::session_credentials::get_root_path();
596+
if (root_path.empty()) {
597+
root_path = pg::utils::get_deeplake_root_directory();
598+
}
599+
if (!root_path.empty()) {
600+
auto creds = pg::session_credentials::get_credentials();
601+
const char* dbname = get_database_name(MyDatabaseId);
602+
std::string db_name = dbname ? dbname : "postgres";
603+
if (dbname) pfree(const_cast<char*>(dbname));
604+
605+
pg::dl_catalog::ensure_catalog(root_path, creds);
606+
pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds);
607+
608+
pg::dl_catalog::schema_meta s_meta;
609+
s_meta.schema_name = schema_name;
610+
s_meta.state = "dropping";
611+
pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta);
612+
613+
pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials());
614+
pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials());
615+
}
616+
} catch (const std::exception& e) {
617+
elog(WARNING, "pg_deeplake: failed to mark schema '%s' as dropping in catalog: %s", schema_name, e.what());
618+
}
619+
}
589620
}
590621
} else if (stmt->removeType == OBJECT_DATABASE) {
591622
const char* query = "SELECT nspname, relname "
@@ -691,6 +722,7 @@ static void process_utility(PlannedStmt* pstmt,
691722
}
692723
if (!root_path.empty()) {
693724
auto creds = pg::session_credentials::get_credentials();
725+
pg::dl_catalog::ensure_catalog(root_path, creds);
694726
pg::dl_catalog::database_meta db_meta;
695727
db_meta.db_name = dbstmt->dbname;
696728
db_meta.state = "dropping";
@@ -727,6 +759,7 @@ static void process_utility(PlannedStmt* pstmt,
727759
}
728760
if (!root_path.empty()) {
729761
auto creds = pg::session_credentials::get_credentials();
762+
pg::dl_catalog::ensure_catalog(root_path, creds);
730763
pg::dl_catalog::database_meta db_meta;
731764
db_meta.db_name = dbstmt->dbname;
732765
db_meta.state = "ready";
@@ -758,6 +791,40 @@ static void process_utility(PlannedStmt* pstmt,
758791
}
759792
}
760793

794+
// Post-hook: record CREATE SCHEMA in S3 catalog for multi-instance sync
795+
if (IsA(pstmt->utilityStmt, CreateSchemaStmt) && pg::stateless_enabled) {
796+
CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt;
797+
try {
798+
auto root_path = pg::session_credentials::get_root_path();
799+
if (root_path.empty()) {
800+
root_path = pg::utils::get_deeplake_root_directory();
801+
}
802+
if (!root_path.empty() && schemastmt->schemaname != nullptr) {
803+
auto creds = pg::session_credentials::get_credentials();
804+
const char* dbname = get_database_name(MyDatabaseId);
805+
std::string db_name = dbname ? dbname : "postgres";
806+
if (dbname) pfree(const_cast<char*>(dbname));
807+
808+
pg::dl_catalog::ensure_catalog(root_path, creds);
809+
pg::dl_catalog::ensure_db_catalog(root_path, db_name, creds);
810+
811+
pg::dl_catalog::schema_meta s_meta;
812+
s_meta.schema_name = schemastmt->schemaname;
813+
s_meta.state = "ready";
814+
if (schemastmt->authrole != nullptr) {
815+
s_meta.owner = schemastmt->authrole->rolename;
816+
}
817+
pg::dl_catalog::upsert_schema(root_path, db_name, creds, s_meta);
818+
819+
pg::dl_catalog::bump_db_catalog_version(root_path, db_name, pg::session_credentials::get_credentials());
820+
pg::dl_catalog::bump_catalog_version(root_path, pg::session_credentials::get_credentials());
821+
elog(DEBUG1, "pg_deeplake: recorded CREATE SCHEMA '%s' in catalog", schemastmt->schemaname);
822+
}
823+
} catch (const std::exception& e) {
824+
elog(DEBUG1, "pg_deeplake: failed to record CREATE SCHEMA in catalog: %s", e.what());
825+
}
826+
}
827+
761828
// Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset
762829
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
763830
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;

cpp/deeplake_pg/logger.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class logger_adapter : public base::logger_adapter
3737
elog(DEBUG1, "%s", message.c_str());
3838
break;
3939
case base::log_level::info:
40-
elog(INFO, "%s", message.c_str());
40+
elog(LOG, "%s", message.c_str());
4141
break;
4242
case base::log_level::warning:
4343
elog(WARNING, "%s", message.c_str());

cpp/deeplake_pg/pg_deeplake.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "pg_deeplake.hpp"
2+
#include "dl_catalog.hpp"
23
#include "logger.hpp"
34
#include "table_storage.hpp"
5+
#include "utils.hpp"
46

57
#include <deeplake_api/deeplake_api.hpp>
68
#include <deeplake_core/deeplake_index_type.hpp>
@@ -9,6 +11,8 @@
911
extern "C" {
1012
#endif
1113

14+
#include <commands/dbcommands.h>
15+
#include <miscadmin.h>
1216
#include <storage/ipc.h>
1317

1418
#ifdef __cplusplus
@@ -260,6 +264,41 @@ void save_index_metadata(Oid oid)
260264
if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) {
261265
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata")));
262266
}
267+
268+
// Persist index to shared catalog for stateless multi-instance sync.
269+
// Skip when in catalog-only mode — the table was synced FROM the catalog,
270+
// so writing back would be redundant and cause version bump loops.
271+
if (pg::stateless_enabled && !pg::table_storage::is_catalog_only_create()) {
272+
try {
273+
auto root_dir = pg::session_credentials::get_root_path();
274+
if (root_dir.empty()) {
275+
root_dir = pg::utils::get_deeplake_root_directory();
276+
}
277+
if (!root_dir.empty()) {
278+
auto creds = pg::session_credentials::get_credentials();
279+
const char* dbname = get_database_name(MyDatabaseId);
280+
std::string db_name = dbname ? dbname : "postgres";
281+
if (dbname) pfree(const_cast<char*>(dbname));
282+
283+
const std::string& table_id = idx_info.table_name(); // already schema-qualified
284+
285+
pg::dl_catalog::index_meta idx_meta;
286+
idx_meta.table_id = table_id;
287+
idx_meta.column_names = idx_info.get_column_names_string();
288+
idx_meta.index_type = std::string(deeplake_core::deeplake_index_type::to_string(idx_info.index_type()));
289+
idx_meta.order_type = static_cast<int32_t>(idx_info.order_type());
290+
291+
std::vector<pg::dl_catalog::index_meta> indexes = {idx_meta};
292+
pg::dl_catalog::upsert_indexes(root_dir, db_name, creds, indexes);
293+
pg::dl_catalog::bump_db_catalog_version(root_dir, db_name, creds);
294+
pg::dl_catalog::bump_catalog_version(root_dir, creds);
295+
}
296+
} catch (const std::exception& e) {
297+
elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: %s", e.what());
298+
} catch (...) {
299+
elog(DEBUG1, "pg_deeplake: failed to persist index to shared catalog: unknown error");
300+
}
301+
}
263302
}
264303

265304
void load_index_metadata()

0 commit comments

Comments
 (0)