Skip to content

Commit fbaa9f0

Browse files
authored
Merge pull request #3137 from activeloopai/db-stateless
Make db creation stateless.
2 parents d5f1741 + 803bdbe commit fbaa9f0

File tree

8 files changed

+1005
-9
lines changed

8 files changed

+1005
-9
lines changed

DEEPLAKE_API_VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4.5.1
1+
4.5.2

cpp/CMakeLists.pg.cmake

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ foreach(PG_VERSION ${PG_VERSIONS})
7070
endif()
7171

7272
set(PG_SERVER_INCLUDE_DIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/include/server")
73+
set(PG_INCLUDE_DIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/include")
7374
set(PG_PKGLIBDIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/lib")
7475
set(PG_SHAREDIR "${postgres_INSTALL_DIR_REL_${PG_VERSION}_0}/share")
7576

@@ -79,7 +80,7 @@ foreach(PG_VERSION ${PG_VERSIONS})
7980
)
8081

8182
target_include_directories(${PG_LIB}
82-
SYSTEM PRIVATE ${PG_SERVER_INCLUDE_DIR}
83+
SYSTEM PRIVATE ${PG_SERVER_INCLUDE_DIR} ${PG_INCLUDE_DIR}
8384
PRIVATE
8485
${indicators_INCLUDE_DIRS}
8586
)

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ constexpr const char* k_tables_name = "tables";
2929
constexpr const char* k_columns_name = "columns";
3030
constexpr const char* k_indexes_name = "indexes";
3131
constexpr const char* k_meta_name = "meta";
32+
constexpr const char* k_databases_name = "databases";
3233

3334
std::string join_path(const std::string& root, const std::string& name)
3435
{
@@ -131,6 +132,7 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
131132
const auto columns_path = join_path(root_path, k_columns_name);
132133
const auto indexes_path = join_path(root_path, k_indexes_name);
133134
const auto meta_path = join_path(root_path, k_meta_name);
135+
const auto databases_path = join_path(root_path, k_databases_name);
134136

135137
try {
136138
// Build schemas for all catalog tables
@@ -165,9 +167,20 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
165167
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
166168
.set_primary_key("catalog_version");
167169

168-
// Launch all 4 open_or_create operations in parallel
170+
deeplake_api::catalog_table_schema databases_schema;
171+
databases_schema.add("db_name", deeplake_core::type::text(codecs::compression::null))
172+
.add("owner", deeplake_core::type::text(codecs::compression::null))
173+
.add("encoding", deeplake_core::type::text(codecs::compression::null))
174+
.add("lc_collate", deeplake_core::type::text(codecs::compression::null))
175+
.add("lc_ctype", deeplake_core::type::text(codecs::compression::null))
176+
.add("template_db", deeplake_core::type::text(codecs::compression::null))
177+
.add("state", deeplake_core::type::text(codecs::compression::null))
178+
.add("updated_at", deeplake_core::type::generic(nd::type::scalar(nd::dtype::int64)))
179+
.set_primary_key("db_name");
180+
181+
// Launch all 5 open_or_create operations in parallel
169182
icm::vector<async::promise<std::shared_ptr<deeplake_api::catalog_table>>> promises;
170-
promises.reserve(4);
183+
promises.reserve(5);
171184
promises.push_back(
172185
deeplake_api::open_or_create_catalog_table(tables_path, std::move(tables_schema), icm::string_map<>(creds)));
173186
promises.push_back(
@@ -176,12 +189,14 @@ int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
176189
deeplake_api::open_or_create_catalog_table(indexes_path, std::move(indexes_schema), icm::string_map<>(creds)));
177190
promises.push_back(
178191
deeplake_api::open_or_create_catalog_table(meta_path, std::move(meta_schema), icm::string_map<>(creds)));
192+
promises.push_back(
193+
deeplake_api::open_or_create_catalog_table(databases_path, std::move(databases_schema), icm::string_map<>(creds)));
179194

180195
// Wait for all to complete
181196
auto results = async::combine(std::move(promises)).get_future().get();
182-
if (results.size() != 4) {
197+
if (results.size() != 5) {
183198
elog(ERROR,
184-
"Failed to initialize catalog at %s: expected 4 catalog tables, got %zu",
199+
"Failed to initialize catalog at %s: expected 5 catalog tables, got %zu",
185200
root_path.c_str(),
186201
static_cast<size_t>(results.size()));
187202
}
@@ -499,6 +514,83 @@ void upsert_columns(const std::string& root_path, icm::string_map<> creds, const
499514
table->upsert_many(std::move(rows)).get_future().get();
500515
}
501516

517+
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds)
518+
{
519+
std::vector<database_meta> out;
520+
try {
521+
auto table = open_catalog_table(root_path, k_databases_name, std::move(creds));
522+
if (!table) {
523+
return out;
524+
}
525+
auto snapshot = table->read().get_future().get();
526+
if (snapshot.row_count() == 0) {
527+
return out;
528+
}
529+
530+
std::unordered_map<std::string, database_meta> latest;
531+
for (const auto& row : snapshot.rows()) {
532+
auto db_name_it = row.find("db_name");
533+
auto owner_it = row.find("owner");
534+
auto encoding_it = row.find("encoding");
535+
auto lc_collate_it = row.find("lc_collate");
536+
auto lc_ctype_it = row.find("lc_ctype");
537+
auto template_it = row.find("template_db");
538+
auto state_it = row.find("state");
539+
auto updated_it = row.find("updated_at");
540+
if (db_name_it == row.end() || state_it == row.end()) {
541+
continue;
542+
}
543+
544+
database_meta meta;
545+
meta.db_name = deeplake_api::array_to_string(db_name_it->second);
546+
if (owner_it != row.end()) meta.owner = deeplake_api::array_to_string(owner_it->second);
547+
if (encoding_it != row.end()) meta.encoding = deeplake_api::array_to_string(encoding_it->second);
548+
if (lc_collate_it != row.end()) meta.lc_collate = deeplake_api::array_to_string(lc_collate_it->second);
549+
if (lc_ctype_it != row.end()) meta.lc_ctype = deeplake_api::array_to_string(lc_ctype_it->second);
550+
if (template_it != row.end()) meta.template_db = deeplake_api::array_to_string(template_it->second);
551+
meta.state = deeplake_api::array_to_string(state_it->second);
552+
if (updated_it != row.end()) {
553+
auto updated_vec = load_int64_vector(updated_it->second);
554+
meta.updated_at = updated_vec.empty() ? 0 : updated_vec.front();
555+
}
556+
557+
auto it = latest.find(meta.db_name);
558+
if (it == latest.end() || it->second.updated_at <= meta.updated_at) {
559+
latest[meta.db_name] = std::move(meta);
560+
}
561+
}
562+
563+
out.reserve(latest.size());
564+
for (auto& [_, meta] : latest) {
565+
if (meta.state == "ready") {
566+
out.push_back(std::move(meta));
567+
}
568+
}
569+
return out;
570+
} catch (const std::exception& e) {
571+
elog(WARNING, "Failed to load catalog databases: %s", e.what());
572+
return out;
573+
} catch (...) {
574+
elog(WARNING, "Failed to load catalog databases: unknown error");
575+
return out;
576+
}
577+
}
578+
579+
void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta)
580+
{
581+
auto table = open_catalog_table(root_path, k_databases_name, std::move(creds));
582+
icm::string_map<nd::array> row;
583+
row["db_name"] = nd::adapt(meta.db_name);
584+
row["owner"] = nd::adapt(meta.owner);
585+
row["encoding"] = nd::adapt(meta.encoding);
586+
row["lc_collate"] = nd::adapt(meta.lc_collate);
587+
row["lc_ctype"] = nd::adapt(meta.lc_ctype);
588+
row["template_db"] = nd::adapt(meta.template_db);
589+
row["state"] = nd::adapt(meta.state);
590+
row["updated_at"] = nd::adapt(meta.updated_at == 0 ? now_ms() : meta.updated_at);
591+
table->upsert(std::move(row)).get_future().get();
592+
}
593+
502594
int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds)
503595
{
504596
try {

cpp/deeplake_pg/dl_catalog.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ struct index_meta
3636
int32_t order_type = 0;
3737
};
3838

39+
struct database_meta
40+
{
41+
std::string db_name; // PK
42+
std::string owner;
43+
std::string encoding;
44+
std::string lc_collate;
45+
std::string lc_ctype;
46+
std::string template_db;
47+
std::string state; // "ready" or "dropping"
48+
int64_t updated_at = 0;
49+
};
50+
3951
int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds);
4052

4153
std::vector<table_meta> load_tables(const std::string& root_path, icm::string_map<> creds);
@@ -49,6 +61,9 @@ load_tables_and_columns(const std::string& root_path, icm::string_map<> creds);
4961
void upsert_table(const std::string& root_path, icm::string_map<> creds, const table_meta& meta);
5062
void upsert_columns(const std::string& root_path, icm::string_map<> creds, const std::vector<column_meta>& columns);
5163

64+
std::vector<database_meta> load_databases(const std::string& root_path, icm::string_map<> creds);
65+
void upsert_database(const std::string& root_path, icm::string_map<> creds, const database_meta& meta);
66+
5267
int64_t get_catalog_version(const std::string& root_path, icm::string_map<> creds);
5368
void bump_catalog_version(const std::string& root_path, icm::string_map<> creds);
5469

cpp/deeplake_pg/extension_init.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ extern "C" {
2525

2626
#include "column_statistics.hpp"
2727
#include "deeplake_executor.hpp"
28+
#include "dl_catalog.hpp"
2829
#include "pg_deeplake.hpp"
2930
#include "pg_version_compat.h"
3031
#include "sync_worker.hpp"
@@ -473,6 +474,10 @@ static void deeplake_shmem_request()
473474
// Request shared memory for table DDL lock
474475
RequestAddinShmemSpace(pg::table_ddl_lock::get_shmem_size());
475476
RequestNamedLWLockTranche("deeplake_table_ddl", 1);
477+
478+
// Request shared memory for pending extension install queue
479+
RequestAddinShmemSpace(pg::pending_install_queue::get_shmem_size());
480+
RequestNamedLWLockTranche("deeplake_install_queue", 1);
476481
}
477482

478483
static void deeplake_shmem_startup()
@@ -483,6 +488,7 @@ static void deeplake_shmem_startup()
483488

484489
pg::table_version_tracker::initialize();
485490
pg::table_ddl_lock::initialize();
491+
pg::pending_install_queue::initialize();
486492
}
487493

488494
static void process_utility(PlannedStmt* pstmt,
@@ -675,12 +681,83 @@ static void process_utility(PlannedStmt* pstmt,
675681
}
676682
}
677683

684+
// Pre-hook: mark database as "dropping" in S3 catalog before PostgreSQL drops it
685+
if (IsA(pstmt->utilityStmt, DropdbStmt) && pg::stateless_enabled) {
686+
DropdbStmt* dbstmt = (DropdbStmt*)pstmt->utilityStmt;
687+
try {
688+
auto root_path = pg::session_credentials::get_root_path();
689+
if (root_path.empty()) {
690+
root_path = pg::utils::get_deeplake_root_directory();
691+
}
692+
if (!root_path.empty()) {
693+
auto creds = pg::session_credentials::get_credentials();
694+
pg::dl_catalog::database_meta db_meta;
695+
db_meta.db_name = dbstmt->dbname;
696+
db_meta.state = "dropping";
697+
pg::dl_catalog::upsert_database(root_path, creds, db_meta);
698+
pg::dl_catalog::bump_catalog_version(root_path, creds);
699+
elog(LOG, "pg_deeplake: marked database '%s' as dropping in catalog", dbstmt->dbname);
700+
}
701+
} catch (const std::exception& e) {
702+
elog(WARNING, "pg_deeplake: failed to mark database '%s' as dropping in catalog: %s", dbstmt->dbname, e.what());
703+
}
704+
}
705+
678706
if (prev_process_utility_hook != nullptr) {
679707
prev_process_utility_hook(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, completionTag);
680708
} else {
681709
standard_ProcessUtility(pstmt, queryString, readOnlyTree, context, params, queryEnv, dest, completionTag);
682710
}
683711

712+
// Post-hook: record CREATE DATABASE in S3 catalog and install extension
713+
if (IsA(pstmt->utilityStmt, CreatedbStmt)) {
714+
CreatedbStmt* dbstmt = (CreatedbStmt*)pstmt->utilityStmt;
715+
716+
// Queue the database for async extension install by the sync worker.
717+
// The inline PQconnectdb approach fails on PG15+ because CREATE DATABASE
718+
// is WAL-logged/transactional and the pg_database row isn't committed yet.
719+
pg::pending_install_queue::enqueue(dbstmt->dbname);
720+
721+
// Record in S3 catalog if stateless mode is enabled
722+
if (pg::stateless_enabled) {
723+
try {
724+
auto root_path = pg::session_credentials::get_root_path();
725+
if (root_path.empty()) {
726+
root_path = pg::utils::get_deeplake_root_directory();
727+
}
728+
if (!root_path.empty()) {
729+
auto creds = pg::session_credentials::get_credentials();
730+
pg::dl_catalog::database_meta db_meta;
731+
db_meta.db_name = dbstmt->dbname;
732+
db_meta.state = "ready";
733+
734+
// Extract options from CREATE DATABASE statement
735+
ListCell* lc = nullptr;
736+
foreach (lc, dbstmt->options) {
737+
DefElem* def = (DefElem*)lfirst(lc);
738+
if (strcmp(def->defname, "owner") == 0) {
739+
db_meta.owner = defGetString(def);
740+
} else if (strcmp(def->defname, "encoding") == 0) {
741+
db_meta.encoding = defGetString(def);
742+
} else if (strcmp(def->defname, "lc_collate") == 0) {
743+
db_meta.lc_collate = defGetString(def);
744+
} else if (strcmp(def->defname, "lc_ctype") == 0) {
745+
db_meta.lc_ctype = defGetString(def);
746+
} else if (strcmp(def->defname, "template") == 0) {
747+
db_meta.template_db = defGetString(def);
748+
}
749+
}
750+
751+
pg::dl_catalog::upsert_database(root_path, creds, db_meta);
752+
pg::dl_catalog::bump_catalog_version(root_path, creds);
753+
elog(DEBUG1, "pg_deeplake: recorded CREATE DATABASE '%s' in catalog", dbstmt->dbname);
754+
}
755+
} catch (const std::exception& e) {
756+
elog(DEBUG1, "pg_deeplake: failed to record CREATE DATABASE '%s' in catalog: %s", dbstmt->dbname, e.what());
757+
}
758+
}
759+
}
760+
684761
// Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset
685762
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
686763
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;

0 commit comments

Comments
 (0)