Skip to content

Commit 20e207d

Browse files
committed
Fix SPI stack leak, error logging, and search_path during DDL WAL replay
1 parent 9567162 commit 20e207d

File tree

3 files changed

+319
-251
lines changed

3 files changed

+319
-251
lines changed

cpp/deeplake_pg/extension_init.cpp

Lines changed: 124 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ extern "C" {
99
#include <postgres.h>
1010

1111
#include <catalog/namespace.h>
12+
#include <commands/dbcommands.h>
1213
#include <commands/defrem.h>
14+
#include <commands/extension.h>
15+
#include <miscadmin.h>
1316
#include <commands/vacuum.h>
1417
#include <nodes/nodeFuncs.h>
1518
#include <optimizer/planner.h>
@@ -25,7 +28,7 @@ extern "C" {
2528

2629
#include "column_statistics.hpp"
2730
#include "deeplake_executor.hpp"
28-
#include "dl_catalog.hpp"
31+
#include "dl_wal.hpp"
2932
#include "pg_deeplake.hpp"
3033
#include "pg_version_compat.h"
3134
#include "sync_worker.hpp"
@@ -42,6 +45,7 @@ extern "C" {
4245
#include <climits>
4346
#include <cmath>
4447
#include <cstdint>
48+
#include <unistd.h>
4549
#include <map>
4650
#include <memory>
4751
#include <numeric>
@@ -70,6 +74,58 @@ bool stateless_enabled = false; // Enable stateless catalog sync across ins
7074

7175
namespace {
7276

77+
bool is_ddl_log_suppressed()
78+
{
79+
if (pg::table_storage::is_catalog_only_create()) {
80+
return true;
81+
}
82+
if (creating_extension) {
83+
return true;
84+
}
85+
const char* app_name = GetConfigOption("application_name", true, false);
86+
return app_name && strcmp(app_name, "pg_deeplake_sync") == 0;
87+
}
88+
89+
void append_to_ddl_log_if_needed(const char* command_tag, const char* object_identity, const char* query_string)
90+
{
91+
if (!pg::stateless_enabled || is_ddl_log_suppressed()) {
92+
return;
93+
}
94+
if (query_string == nullptr || query_string[0] == '\0') {
95+
return;
96+
}
97+
98+
auto root_path = pg::session_credentials::get_root_path();
99+
if (root_path.empty()) {
100+
root_path = pg::utils::get_deeplake_root_directory();
101+
}
102+
if (root_path.empty()) {
103+
return;
104+
}
105+
106+
try {
107+
auto creds = pg::session_credentials::get_credentials();
108+
const char* dbname = get_database_name(MyDatabaseId);
109+
std::string db_name = dbname ? dbname : "postgres";
110+
if (dbname) {
111+
pfree(const_cast<char*>(dbname));
112+
}
113+
114+
pg::dl_wal::ddl_log_entry entry;
115+
entry.seq = pg::dl_wal::next_ddl_seq();
116+
entry.origin_instance_id = pg::dl_wal::local_instance_id();
117+
const char* current_search_path = GetConfigOption("search_path", true, false);
118+
entry.search_path = current_search_path != nullptr ? current_search_path : "";
119+
entry.command_tag = command_tag != nullptr ? command_tag : "";
120+
entry.object_identity = object_identity != nullptr ? object_identity : "";
121+
entry.ddl_sql = query_string;
122+
123+
pg::dl_wal::append_ddl_log(root_path, db_name, creds, entry);
124+
} catch (const std::exception& e) {
125+
elog(WARNING, "pg_deeplake: failed to append DDL to WAL log: %s", e.what());
126+
}
127+
}
128+
73129
bool is_count_star(TargetEntry* node)
74130
{
75131
if (node == nullptr || node->expr == nullptr || !IsA(node->expr, Aggref)) {
@@ -691,11 +747,11 @@ static void process_utility(PlannedStmt* pstmt,
691747
}
692748
if (!root_path.empty()) {
693749
auto creds = pg::session_credentials::get_credentials();
694-
pg::dl_catalog::database_meta db_meta;
750+
pg::dl_wal::ensure_catalog(root_path, creds);
751+
pg::dl_wal::database_meta db_meta;
695752
db_meta.db_name = dbstmt->dbname;
696753
db_meta.state = "dropping";
697-
pg::dl_catalog::upsert_database(root_path, creds, db_meta);
698-
pg::dl_catalog::bump_catalog_version(root_path, creds);
754+
pg::dl_wal::upsert_database(root_path, creds, db_meta);
699755
elog(LOG, "pg_deeplake: marked database '%s' as dropping in catalog", dbstmt->dbname);
700756
}
701757
} catch (const std::exception& e) {
@@ -727,7 +783,8 @@ static void process_utility(PlannedStmt* pstmt,
727783
}
728784
if (!root_path.empty()) {
729785
auto creds = pg::session_credentials::get_credentials();
730-
pg::dl_catalog::database_meta db_meta;
786+
pg::dl_wal::ensure_catalog(root_path, creds);
787+
pg::dl_wal::database_meta db_meta;
731788
db_meta.db_name = dbstmt->dbname;
732789
db_meta.state = "ready";
733790

@@ -748,8 +805,7 @@ static void process_utility(PlannedStmt* pstmt,
748805
}
749806
}
750807

751-
pg::dl_catalog::upsert_database(root_path, creds, db_meta);
752-
pg::dl_catalog::bump_catalog_version(root_path, creds);
808+
pg::dl_wal::upsert_database(root_path, creds, db_meta);
753809
elog(DEBUG1, "pg_deeplake: recorded CREATE DATABASE '%s' in catalog", dbstmt->dbname);
754810
}
755811
} catch (const std::exception& e) {
@@ -758,6 +814,14 @@ static void process_utility(PlannedStmt* pstmt,
758814
}
759815
}
760816

817+
// Post-hook: record CREATE SCHEMA in DDL WAL log
818+
if (IsA(pstmt->utilityStmt, CreateSchemaStmt)) {
819+
CreateSchemaStmt* schemastmt = (CreateSchemaStmt*)pstmt->utilityStmt;
820+
if (schemastmt->schemaname != nullptr) {
821+
append_to_ddl_log_if_needed("CREATE SCHEMA", schemastmt->schemaname, queryString);
822+
}
823+
}
824+
761825
// Post-process ALTER TABLE ADD COLUMN to add column to deeplake dataset
762826
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
763827
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;
@@ -1256,6 +1320,59 @@ static void process_utility(PlannedStmt* pstmt,
12561320
}
12571321
}
12581322
}
1323+
1324+
if (nodeTag(pstmt->utilityStmt) == T_DropStmt) {
1325+
DropStmt* stmt = (DropStmt*)pstmt->utilityStmt;
1326+
if (stmt->removeType == OBJECT_SCHEMA) {
1327+
append_to_ddl_log_if_needed("DROP SCHEMA", nullptr, queryString);
1328+
} else if (stmt->removeType == OBJECT_TABLE) {
1329+
append_to_ddl_log_if_needed("DROP TABLE", nullptr, queryString);
1330+
} else if (stmt->removeType == OBJECT_INDEX) {
1331+
append_to_ddl_log_if_needed("DROP INDEX", nullptr, queryString);
1332+
} else if (stmt->removeType == OBJECT_VIEW) {
1333+
append_to_ddl_log_if_needed("DROP VIEW", nullptr, queryString);
1334+
}
1335+
}
1336+
1337+
if (IsA(pstmt->utilityStmt, CreateStmt)) {
1338+
CreateStmt* stmt = (CreateStmt*)pstmt->utilityStmt;
1339+
if (stmt->accessMethod != nullptr && std::strcmp(stmt->accessMethod, "deeplake") == 0) {
1340+
const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public";
1341+
std::string object_id = std::string(schema_name) + "." + stmt->relation->relname;
1342+
append_to_ddl_log_if_needed("CREATE TABLE", object_id.c_str(), queryString);
1343+
}
1344+
}
1345+
1346+
if (IsA(pstmt->utilityStmt, AlterTableStmt)) {
1347+
if (queryString != nullptr && strncasecmp(queryString, "ALTER TABLE", 11) == 0) {
1348+
AlterTableStmt* stmt = (AlterTableStmt*)pstmt->utilityStmt;
1349+
const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public";
1350+
std::string object_id = std::string(schema_name) + "." + stmt->relation->relname;
1351+
append_to_ddl_log_if_needed("ALTER TABLE", object_id.c_str(), queryString);
1352+
}
1353+
}
1354+
1355+
if (IsA(pstmt->utilityStmt, IndexStmt)) {
1356+
if (queryString && strncasecmp(queryString, "CREATE", 6) == 0 &&
1357+
strncasecmp(queryString, "CREATE TABLE", 12) != 0) {
1358+
IndexStmt* stmt = (IndexStmt*)pstmt->utilityStmt;
1359+
const char* schema_name = stmt->relation->schemaname ? stmt->relation->schemaname : "public";
1360+
std::string object_id = std::string(schema_name) + "." + stmt->relation->relname;
1361+
append_to_ddl_log_if_needed("CREATE INDEX", object_id.c_str(), queryString);
1362+
}
1363+
}
1364+
1365+
if (IsA(pstmt->utilityStmt, ViewStmt)) {
1366+
ViewStmt* stmt = (ViewStmt*)pstmt->utilityStmt;
1367+
const char* schema_name = stmt->view->schemaname ? stmt->view->schemaname : "public";
1368+
std::string object_id = std::string(schema_name) + "." + stmt->view->relname;
1369+
append_to_ddl_log_if_needed("CREATE VIEW", object_id.c_str(), queryString);
1370+
}
1371+
1372+
if (IsA(pstmt->utilityStmt, RenameStmt)) {
1373+
append_to_ddl_log_if_needed("RENAME", nullptr, queryString);
1374+
}
1375+
12591376
if (IsA(pstmt->utilityStmt, VariableSetStmt)) {
12601377
VariableSetStmt* vstmt = (VariableSetStmt*)pstmt->utilityStmt;
12611378
if (vstmt->name != nullptr && pg_strcasecmp(vstmt->name, "search_path") == 0) {
@@ -1533,26 +1650,6 @@ static void executor_end(QueryDesc* query_desc)
15331650
}
15341651

15351652
if (pg::query_info::is_in_executor_context(query_desc)) {
1536-
if (query_desc->operation == CMD_INSERT || query_desc->operation == CMD_UPDATE ||
1537-
query_desc->operation == CMD_DELETE || query_desc->operation == CMD_UTILITY) {
1538-
// Use PG_TRY/CATCH to handle errors during flush without cascading aborts
1539-
PG_TRY();
1540-
{
1541-
if (!pg::table_storage::instance().flush_all()) {
1542-
pg::table_storage::instance().rollback_all();
1543-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to flush table storage")));
1544-
}
1545-
}
1546-
PG_CATCH();
1547-
{
1548-
// Error occurred during flush - rollback and suppress to prevent cascade
1549-
// This prevents "Deeplake does not support transaction aborts" cascade
1550-
pg::table_storage::instance().rollback_all();
1551-
// Don't re-throw - let the transaction abort naturally
1552-
FlushErrorState();
1553-
}
1554-
PG_END_TRY();
1555-
}
15561653
pg::query_info::pop_context(query_desc);
15571654
pg::table_storage::instance().reset_requested_columns();
15581655
}

cpp/deeplake_pg/pg_deeplake.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include "pg_deeplake.hpp"
2+
#include "dl_wal.hpp"
23
#include "logger.hpp"
34
#include "table_storage.hpp"
5+
#include "utils.hpp"
46

57
#include <deeplake_api/deeplake_api.hpp>
68
#include <deeplake_core/deeplake_index_type.hpp>
@@ -9,6 +11,9 @@
911
extern "C" {
1012
#endif
1113

14+
#include <access/xact.h>
15+
#include <commands/dbcommands.h>
16+
#include <miscadmin.h>
1217
#include <storage/ipc.h>
1318

1419
#ifdef __cplusplus
@@ -260,6 +265,8 @@ void save_index_metadata(Oid oid)
260265
if (SPI_execute(buf.data, false, 0) != SPI_OK_INSERT) {
261266
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Failed to save metadata")));
262267
}
268+
269+
// Cross-instance propagation is driven by DDL WAL logging in ProcessUtility.
263270
}
264271

265272
void load_index_metadata()
@@ -353,6 +360,23 @@ void deeplake_xact_callback(XactEvent event, void *arg)
353360
}
354361
}
355362

363+
void deeplake_subxact_callback(SubXactEvent event,
364+
SubTransactionId my_subid,
365+
SubTransactionId parent_subid,
366+
void* arg)
367+
{
368+
switch (event) {
369+
case SUBXACT_EVENT_ABORT_SUB:
370+
pg::table_storage::instance().rollback_subxact(my_subid);
371+
break;
372+
case SUBXACT_EVENT_COMMIT_SUB:
373+
pg::table_storage::instance().commit_subxact(my_subid, parent_subid);
374+
break;
375+
default:
376+
break;
377+
}
378+
}
379+
356380
void init_deeplake()
357381
{
358382
static bool initialized = false;
@@ -377,6 +401,7 @@ void init_deeplake()
377401
pg::table_storage::instance(); /// initialize table storage
378402

379403
RegisterXactCallback(deeplake_xact_callback, nullptr);
404+
RegisterSubXactCallback(deeplake_subxact_callback, nullptr);
380405
}
381406

382407
} // namespace pg

0 commit comments

Comments
 (0)