Skip to content

Commit a2be413

Browse files
committed
Sync db catalogs.
1 parent fbaa9f0 commit a2be413

File tree

7 files changed

+462
-21
lines changed

7 files changed

+462
-21
lines changed

.github/workflows/pg-extension-build.yaml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,7 @@ jobs:
345345
uses: docker/setup-buildx-action@v3.11.1
346346

347347
- name: build and push container images
348+
if: github.ref_name != 'serverless'
348349
shell: bash
349350
env:
350351
PG_VERSION: "${{ matrix.version }}"
@@ -362,3 +363,24 @@ jobs:
362363
--platform linux/amd64,linux/arm64 \
363364
-f Dockerfile.build \
364365
.
366+
367+
- name: build and push container images (stateless)
368+
if: github.ref_name == 'serverless'
369+
shell: bash
370+
env:
371+
PG_VERSION: "${{ matrix.version }}"
372+
VERSION: "${{ needs.extension-build.outputs.version }}"
373+
COMMIT_SHA: "${{ github.sha }}"
374+
run: |-
375+
echo "Using package version: ${VERSION}"
376+
echo "Using commit SHA: ${COMMIT_SHA}"
377+
sed s/BASE_IMAGE/postgres:"${PG_VERSION}"-bookworm/g postgres/Dockerfile > Dockerfile.build
378+
docker buildx build \
379+
--push \
380+
--tag quay.io/activeloopai/pg-deeplake:"${PG_VERSION}"_"${VERSION}"_"${COMMIT_SHA}"-stateless \
381+
--tag quay.io/activeloopai/pg-deeplake:"${PG_VERSION}"-stateless \
382+
--build-arg VERSION="${PG_VERSION}"_"${VERSION}"-1 \
383+
--build-arg STATELESS=true \
384+
--platform linux/amd64,linux/arm64 \
385+
-f Dockerfile.build \
386+
.

cpp/deeplake_pg/dl_catalog.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ std::vector<int64_t> load_int64_vector(const nd::array& arr)
128128

129129
int64_t ensure_catalog(const std::string& root_path, icm::string_map<> creds)
130130
{
131+
if (root_path.empty()) {
132+
return 0;
133+
}
131134
const auto tables_path = join_path(root_path, k_tables_name);
132135
const auto columns_path = join_path(root_path, k_columns_name);
133136
const auto indexes_path = join_path(root_path, k_indexes_name);

cpp/deeplake_pg/pg_deeplake.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ void deeplake_xact_callback(XactEvent event, void *arg)
356356
void init_deeplake()
357357
{
358358
static bool initialized = false;
359-
if (initialized) {
359+
if (initialized || !IsUnderPostmaster) {
360360
return;
361361
}
362362
initialized = true;
@@ -368,6 +368,12 @@ void init_deeplake()
368368
constexpr int THREAD_POOL_MULTIPLIER = 8; // Threads per CPU core for async operations
369369
deeplake_api::initialize(std::make_shared<pg::logger_adapter>(), THREAD_POOL_MULTIPLIER * base::system_report::cpu_cores());
370370

371+
const std::string redis_url = base::getenv<std::string>("REDIS_URL", "");
372+
if (!redis_url.empty()) {
373+
deeplake_api::initialize_redis_cache(redis_url, 86400,
374+
deeplake_api::metadata_catalog_cache_pattern);
375+
}
376+
371377
pg::table_storage::instance(); /// initialize table storage
372378

373379
RegisterXactCallback(deeplake_xact_callback, nullptr);

cpp/deeplake_pg/table_storage.cpp

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ void table_storage::save_table_metadata(const pg::table_data& table_data)
236236
}
237237
return root;
238238
}();
239+
if (root_dir.empty()) {
240+
return;
241+
}
239242
auto creds = session_credentials::get_credentials();
240243
pg::dl_catalog::ensure_catalog(root_dir, creds);
241244

@@ -292,8 +295,8 @@ void table_storage::load_table_metadata()
292295
}();
293296
auto creds = session_credentials::get_credentials();
294297

295-
// Stateless catalog sync (only when enabled)
296-
if (pg::stateless_enabled) {
298+
// Stateless catalog sync (only when enabled and root_dir is configured)
299+
if (pg::stateless_enabled && !root_dir.empty()) {
297300
// Fast path: if already loaded, just check version without ensure_catalog
298301
if (tables_loaded_) {
299302
const auto current_version = pg::dl_catalog::get_catalog_version(root_dir, creds);
@@ -534,7 +537,7 @@ void table_storage::load_table_metadata()
534537
}
535538
try {
536539
// Seed the DL catalog with legacy metadata (only when stateless is enabled).
537-
if (pg::stateless_enabled) {
540+
if (pg::stateless_enabled && !root_dir.empty()) {
538541
auto [schema_name, simple_table_name] = split_table_name(table_name);
539542
pg::dl_catalog::table_meta meta;
540543
meta.table_id = schema_name + "." + simple_table_name;
@@ -580,7 +583,7 @@ void table_storage::load_table_metadata()
580583
base::log_channel::generic, "Failed to delete invalid table metadata for table_oid: {}", invalid_oid);
581584
}
582585
}
583-
if (catalog_seeded && pg::stateless_enabled) {
586+
if (catalog_seeded && pg::stateless_enabled && !root_dir.empty()) {
584587
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
585588
catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials());
586589
}
@@ -991,17 +994,19 @@ void table_storage::drop_table(const std::string& table_name)
991994
}
992995
return root;
993996
}();
994-
pg::dl_catalog::ensure_catalog(root_dir, creds);
995-
auto [schema_name, simple_table_name] = split_table_name(table_name);
996-
pg::dl_catalog::table_meta meta;
997-
meta.table_id = schema_name + "." + simple_table_name;
998-
meta.schema_name = schema_name;
999-
meta.table_name = simple_table_name;
1000-
meta.dataset_path = table_data.get_dataset_path().url();
1001-
meta.state = "dropping";
1002-
pg::dl_catalog::upsert_table(root_dir, creds, meta);
1003-
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
1004-
catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials());
997+
if (!root_dir.empty()) {
998+
pg::dl_catalog::ensure_catalog(root_dir, creds);
999+
auto [schema_name, simple_table_name] = split_table_name(table_name);
1000+
pg::dl_catalog::table_meta meta;
1001+
meta.table_id = schema_name + "." + simple_table_name;
1002+
meta.schema_name = schema_name;
1003+
meta.table_name = simple_table_name;
1004+
meta.dataset_path = table_data.get_dataset_path().url();
1005+
meta.state = "dropping";
1006+
pg::dl_catalog::upsert_table(root_dir, creds, meta);
1007+
pg::dl_catalog::bump_catalog_version(root_dir, session_credentials::get_credentials());
1008+
catalog_version_ = pg::dl_catalog::get_catalog_version(root_dir, session_credentials::get_credentials());
1009+
}
10051010
}
10061011

10071012
try {

cpp/deeplake_pg/utils.hpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,17 +270,22 @@ static std::string get_pg_data_directory()
270270
{
271271
const char* data_dir = GetConfigOption("data_directory", true, false);
272272
if (data_dir == nullptr) {
273-
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("Unable to retrieve data_directory")));
273+
return "";
274274
}
275275
return std::string(data_dir);
276276
}
277277

278278
static std::string get_deeplake_root_directory()
279279
{
280-
static const std::string root_dir_variable_name = "DEEPLAKE_ROOT_PATH";
281-
static const std::string pg_data_dir = get_pg_data_directory();
282-
static const std::string deeplake_root_dir = base::getenv<std::string>(root_dir_variable_name, pg_data_dir);
283-
return deeplake_root_dir;
280+
// Avoid static locals: if get_pg_data_directory() previously failed via
281+
// ereport(ERROR) (longjmp through C++ static init), the static guard
282+
// variable is permanently poisoned and subsequent calls return "".
283+
// Re-evaluate every time so a later call can succeed once GUCs are ready.
284+
auto root = base::getenv<std::string>("DEEPLAKE_ROOT_PATH", "");
285+
if (root.empty()) {
286+
root = get_pg_data_directory();
287+
}
288+
return root;
284289
}
285290

286291
inline std::pair<BlockNumber, OffsetNumber> row_number_to_tid(int64_t row_number)

postgres/Dockerfile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
FROM BASE_IMAGE
22
ARG VERSION=VERSION
33
ARG TARGETARCH
4+
ARG STATELESS=false
45

56
LABEL name="pg-deeplake" \
67
version="${VERSION}" \
@@ -28,4 +29,18 @@ COPY ./debs/ /tmp/debs/
2829
COPY --chmod=444 ./LICENSE /LICENSE
2930
COPY ./postgres/docker-entrypoint.d/ /docker-entrypoint-initdb.d/
3031
RUN apt-get install --no-install-recommends -y /tmp/debs/pg-deeplake-${VERSION}_${TARGETARCH}.deb && rm -rf /tmp/debs/
32+
COPY ./serverless/scripts/init-deeplake-stateless.sh /tmp/init-deeplake-stateless.sh
33+
COPY ./serverless/config/postgresql-overrides.conf /tmp/postgresql-overrides.conf
34+
COPY ./serverless/scripts/health-check.sh /tmp/health-check.sh
35+
RUN if [ "$STATELESS" = "true" ]; then \
36+
mv /tmp/init-deeplake-stateless.sh /docker-entrypoint-initdb.d/3-stateless-init.sh && \
37+
chmod 755 /docker-entrypoint-initdb.d/3-stateless-init.sh && \
38+
mv /tmp/postgresql-overrides.conf /etc/postgresql-overrides.conf && \
39+
chmod 644 /etc/postgresql-overrides.conf && \
40+
mv /tmp/health-check.sh /usr/local/bin/health-check.sh && \
41+
chmod 755 /usr/local/bin/health-check.sh && \
42+
mkdir -p /deeplake-data; \
43+
else \
44+
rm -f /tmp/init-deeplake-stateless.sh /tmp/postgresql-overrides.conf /tmp/health-check.sh; \
45+
fi
3146
USER 999

0 commit comments

Comments
 (0)