Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cpp/deeplake_pg/duckdb_deeplake_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,10 @@ class deeplake_scan_function_helper
for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) {
const int64_t row_idx = current_row + row_in_batch;
auto value = td.get_streamers().value<std::string_view>(col_idx, row_idx);
// workaround. value is not always remain valid. Trying to make a copy as soon as possible.
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected grammar: 'value is not always remain valid' should be 'value does not always remain valid'.

Suggested change
// workaround. value is not always remain valid. Trying to make a copy as soon as possible.
// workaround. value does not always remain valid. Trying to make a copy as soon as possible.

Copilot uses AI. Check for mistakes.
// Most likely due to nd::array temporary object destruction.
std::string str_value(value);
if (is_uuid) {
std::string str_value(value);
// Treat empty string as NULL for UUID columns
if (str_value.empty()) {
duckdb::FlatVector::SetNull(output_vector, row_in_batch, true);
Expand All @@ -862,7 +864,7 @@ class deeplake_scan_function_helper
} else {
auto* duckdb_data = duckdb::FlatVector::GetData<duckdb::string_t>(output_vector);
duckdb_data[row_in_batch] =
add_string(output_vector, value.data(), value.size());
add_string(output_vector, str_value.data(), str_value.size());
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions cpp/deeplake_pg/extension_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ bool print_runtime_stats = false;
bool support_json_index = false;
bool is_filter_pushdown_enabled = true;
int32_t max_streamable_column_width = 128;
int32_t max_num_threads_for_global_state = 4;
int32_t max_num_threads_for_global_state = std::thread::hardware_concurrency();
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::thread::hardware_concurrency() can return 0 if the value is not well-defined or not computable. This conflicts with the minimum value of 1 defined for this GUC parameter at line 203. Initialize with std::max(1u, std::thread::hardware_concurrency()) instead.

Copilot uses AI. Check for mistakes.
bool treat_numeric_as_double = true; // Treat numeric types as double by default
bool print_progress_during_seq_scan = false;
bool use_shared_mem_for_refresh = false;
Expand Down Expand Up @@ -199,7 +199,7 @@ void initialize_guc_parameters()
"Maximum number of threads for global state operations.",
nullptr, // optional long description
&pg::max_num_threads_for_global_state, // linked C variable
6, // default value
base::system_report::cpu_cores(), // default value
1, // min value
base::system_report::cpu_cores(), // max value
PGC_USERSET, // context (USERSET, SUSET, etc.)
Expand All @@ -216,10 +216,10 @@ void initialize_guc_parameters()
"for detecting dataset refreshes, which can improve performance but may "
"have implications on concurrency. "
"It make sense to disable this for OLTP workloads.",
&pg::use_shared_mem_for_refresh, // linked C variable
true, // default value
PGC_USERSET, // context (USERSET, SUSET, etc.)
0, // flags
&pg::use_shared_mem_for_refresh, // linked C variable
true, // default value
PGC_USERSET, // context (USERSET, SUSET, etc.)
0, // flags
nullptr,
nullptr,
nullptr // check_hook, assign_hook, show_hook
Expand Down
2 changes: 1 addition & 1 deletion cpp/deeplake_pg/pg_deeplake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ void load_index_metadata()
const uint32_t oid = is_null ? 0 : DatumGetUInt32(datum_order_type);

if (!pg::pg_index::has_index_info(oid)) {
pg::pg_index::create_index_info(oid);
pg::pg_index::load_index_info(oid);
} else {
pg::index_info::current().reset();
}
Expand Down
8 changes: 8 additions & 0 deletions cpp/deeplake_pg/pg_deeplake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,14 @@ class pg_index
index_info::current().reset();
}

static void load_index_info(Oid oid)
{
// Load index metadata into memory cache without creating DeepLake indexes
// (indexes already exist in the dataset, we're just restoring the in-memory state)
instance().indexes_.emplace(oid, index_info::current());
index_info::current().reset();
}

static bool has_index_info(Oid oid)
{
return instance().indexes_.find(oid) != instance().indexes_.end();
Expand Down
41 changes: 11 additions & 30 deletions cpp/deeplake_pg/table_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,48 +92,29 @@ struct table_data
{
struct batch_data
{
std::mutex mutex_;
async::promise<nd::array> promise_;
nd::array owner_;
const uint8_t* data_ = nullptr;
impl::string_stream_array_holder holder_ = impl::string_stream_array_holder();
std::atomic<bool> initialized_{false};
impl::string_stream_array_holder holder_;

batch_data() = default;
batch_data(const batch_data& other) = delete;
batch_data(batch_data&& other) noexcept
: owner_(std::move(other.owner_))
, data_(other.data_)
, holder_(std::move(other.holder_))
, initialized_(other.initialized_.load(std::memory_order_relaxed))
{
other.data_ = nullptr;
}

batch_data(batch_data&& other) noexcept = delete;
batch_data& operator=(const batch_data& other) = delete;
batch_data& operator=(batch_data&& other) noexcept
{
if (this != &other) {
owner_ = std::move(other.owner_);
data_ = other.data_;
holder_ = std::move(other.holder_);
initialized_.store(other.initialized_.load(std::memory_order_relaxed), std::memory_order_relaxed);
other.data_ = nullptr;
}
return *this;
}
batch_data& operator=(batch_data&& other) noexcept = delete;
};

struct column_data
{
std::vector<batch_data> batches;
base::spin_lock mutex;
};

std::vector<std::unique_ptr<bifrost::column_streamer>> streamers;
using column_data = std::vector<batch_data>;
std::vector<column_data> column_to_batches;

inline void reset() noexcept
{
streamers.clear();
for (auto& batches : column_to_batches) {
for (auto& batch : batches) {
batch.promise_.cancel();
}
}
column_to_batches.clear();
}

Expand Down
122 changes: 52 additions & 70 deletions cpp/deeplake_pg/table_data_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ inline table_data::streamer_info& table_data::get_streamers() noexcept

inline bool table_data::column_has_streamer(uint32_t idx) const noexcept
{
return streamers_.streamers.size() > idx && streamers_.streamers[idx] != nullptr;
return streamers_.column_to_batches.size() > idx && !streamers_.column_to_batches[idx].empty();
}

inline void table_data::reset_streamers() noexcept
Expand Down Expand Up @@ -529,45 +529,49 @@ inline std::pair<int64_t, int64_t> table_data::get_row_range(int32_t worker_id)

inline void table_data::create_streamer(int32_t idx, int32_t worker_id)
{
if (streamers_.streamers.empty()) {
const auto s = num_columns();
streamers_.streamers.resize(s);
std::vector<streamer_info::column_data> temp_data(s);
streamers_.column_to_batches.swap(temp_data);
}
if (!streamers_.streamers[idx]) {
if (pg::memory_tracker::has_memory_limit()) {
const auto column_size =
pg::utils::get_column_width(get_base_atttypid(idx), get_atttypmod(idx)) * num_rows();
pg::memory_tracker::ensure_memory_available(column_size);
}
if (worker_id != -1) {
auto [start_row, end_row] = get_row_range(worker_id);
auto new_column = heimdall_common::create_filtered_column(
*(get_column_view(idx)), icm::index_mapping_t<int64_t>::slice({start_row, end_row, 1}));
streamers_.streamers[idx] = std::make_unique<bifrost::column_streamer>(new_column, batch_size_);
} else {
streamers_.streamers[idx] = std::make_unique<bifrost::column_streamer>(get_column_view(idx), batch_size_);
}
const int64_t batch_index = (num_rows() - 1) / batch_size_;
streamers_.column_to_batches[idx].batches.resize(batch_index + 1);
const auto col_count = num_columns();
if (streamers_.column_to_batches.empty()) {
streamers_.column_to_batches.resize(col_count);
}
ASSERT(idx >= 0 && idx < col_count);
auto& column_batches = streamers_.column_to_batches[idx];
if (!column_batches.empty()) {
return;
}
if (pg::memory_tracker::has_memory_limit()) {
const auto column_size = pg::utils::get_column_width(get_base_atttypid(idx), get_atttypmod(idx)) * num_rows();
pg::memory_tracker::ensure_memory_available(column_size);
}
heimdall::column_view_ptr cv = get_column_view(idx);
if (worker_id != -1) {
auto [start_row, end_row] = get_row_range(worker_id);
cv = heimdall_common::create_filtered_column(*(cv),
icm::index_mapping_t<int64_t>::slice({start_row, end_row, 1}));
}
const int64_t row_count = num_rows();
const int64_t batch_count = (row_count + batch_size_ - 1) / batch_size_;
column_batches = std::vector<streamer_info::batch_data>(batch_count);
for (int64_t i = 0; i < batch_count; ++i) {
const auto range_start = i * batch_size_;
const auto range_end = std::min<int64_t>(range_start + batch_size_, row_count);
auto p = async::run_on_main([cv, range_start, range_end, row_count]() {
return cv->request_range(
range_start, range_end, storage::fetch_options(static_cast<int>(row_count - range_start)));
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fetch_options calculation uses row_count - range_start which represents the remaining rows from the start of this batch to the end of the dataset. This should likely be range_end - range_start to represent the actual size of the current batch being fetched, or the calculation may need adjustment based on the API's expectations.

Suggested change
range_start, range_end, storage::fetch_options(static_cast<int>(row_count - range_start)));
range_start, range_end, storage::fetch_options(static_cast<int>(range_end - range_start)));

Copilot uses AI. Check for mistakes.
});
column_batches[i].promise_ = std::move(p);
}
}

inline nd::array table_data::streamer_info::get_sample(int32_t column_number, int64_t row_number)
{
const int64_t batch_index = row_number >> batch_size_log2_;
const int64_t row_in_batch = row_number & batch_mask_;

auto& batches = column_to_batches[column_number].batches;
auto& batch = batches[batch_index];
if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] {
std::scoped_lock lock(column_to_batches[column_number].mutex);
for (int64_t i = 0; i <= batch_index; ++i) {
if (!batches[static_cast<size_t>(i)].initialized_.load(std::memory_order_relaxed)) {
batches[static_cast<size_t>(i)].owner_ = streamers[static_cast<size_t>(column_number)]->next_batch();
batches[static_cast<size_t>(i)].initialized_.store(true, std::memory_order_release);
}
auto& batch = column_to_batches[column_number][batch_index];
if (static_cast<bool>(batch.promise_)) [[unlikely]] {
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static_cast<bool>(batch.promise_) pattern is used to check if the promise is valid. Consider documenting what constitutes a valid vs invalid promise state, or use a more self-documenting approach such as a helper method batch.is_initialized() to improve code readability across all three usages (lines 570, 593, 612).

Copilot uses AI. Check for mistakes.
std::lock_guard lock(batch.mutex_);
if (static_cast<bool>(batch.promise_)) {
batch.owner_ = batch.promise_.get_future().get();
batch.promise_ = async::promise<nd::array>();
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resetting the promise by assigning a default-constructed promise is unclear. Consider adding a comment explaining that this marks the batch as initialized, or use a more explicit pattern such as a separate boolean flag or a method like batch.mark_initialized() to make the intent clearer.

Copilot uses AI. Check for mistakes.
}
}
return batch.owner_[static_cast<size_t>(row_in_batch)];
Expand All @@ -576,23 +580,7 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in
template <typename T>
inline T table_data::streamer_info::value(int32_t column_number, int64_t row_number)
{
const int64_t batch_index = row_number >> batch_size_log2_;
const int64_t row_in_batch = row_number & batch_mask_;

auto& batches = column_to_batches[column_number].batches;
auto& batch = batches[batch_index];
if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] {
std::scoped_lock lock(column_to_batches[column_number].mutex);
for (int64_t i = 0; i <= batch_index; ++i) {
if (!batches[static_cast<size_t>(i)].initialized_.load(std::memory_order_relaxed)) {
batches[static_cast<size_t>(i)].owner_ = utils::eval_with_nones<T>(streamers[static_cast<size_t>(column_number)]->next_batch());
batches[static_cast<size_t>(i)].data_ = batches[static_cast<size_t>(i)].owner_.data().data();
batches[static_cast<size_t>(i)].initialized_.store(true, std::memory_order_release);
}
}
}

return reinterpret_cast<const T*>(batch.data_)[static_cast<size_t>(row_in_batch)];
return *(value_ptr<T>(column_number, row_number));
}

template <typename T>
Expand All @@ -601,16 +589,13 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6
const int64_t batch_index = row_number >> batch_size_log2_;
const int64_t row_in_batch = row_number & batch_mask_;

auto& batches = column_to_batches[column_number].batches;
auto& batch = batches[batch_index];
if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] {
std::scoped_lock lock(column_to_batches[column_number].mutex);
for (int64_t i = 0; i <= batch_index; ++i) {
if (!batches[static_cast<size_t>(i)].initialized_.load(std::memory_order_relaxed)) {
batches[static_cast<size_t>(i)].owner_ = utils::eval_with_nones<T>(streamers[static_cast<size_t>(column_number)]->next_batch());
batches[static_cast<size_t>(i)].data_ = batches[static_cast<size_t>(i)].owner_.data().data();
batches[static_cast<size_t>(i)].initialized_.store(true, std::memory_order_release);
}
auto& batch = column_to_batches[column_number][batch_index];
if (static_cast<bool>(batch.promise_)) [[unlikely]] {
std::lock_guard lock(batch.mutex_);
if (static_cast<bool>(batch.promise_)) {
batch.owner_ = utils::eval_with_nones<T>(batch.promise_.get_future().get());
batch.data_ = batch.owner_.data().data();
batch.promise_ = async::promise<nd::array>();
}
}

Expand All @@ -623,16 +608,13 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number,
const int64_t batch_index = row_number >> batch_size_log2_;
const int64_t row_in_batch = row_number & batch_mask_;

auto& batches = column_to_batches[column_number].batches;
auto& batch = batches[batch_index];
if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] {
std::scoped_lock lock(column_to_batches[column_number].mutex);
for (int64_t i = 0; i <= batch_index; ++i) {
if (!batches[static_cast<size_t>(i)].initialized_.load(std::memory_order_relaxed)) {
batches[static_cast<size_t>(i)].owner_ = streamers[static_cast<size_t>(column_number)]->next_batch();
batches[static_cast<size_t>(i)].holder_ = impl::string_stream_array_holder(batches[static_cast<size_t>(i)].owner_);
batches[static_cast<size_t>(i)].initialized_.store(true, std::memory_order_release);
}
auto& batch = column_to_batches[column_number][batch_index];
if (static_cast<bool>(batch.promise_)) [[unlikely]] {
std::lock_guard lock(batch.mutex_);
if (static_cast<bool>(batch.promise_)) {
batch.owner_ = batch.promise_.get_future().get();
batch.holder_ = impl::string_stream_array_holder(batch.owner_);
batch.promise_ = async::promise<nd::array>();
}
}

Expand Down
Loading