@@ -340,7 +340,7 @@ inline table_data::streamer_info& table_data::get_streamers() noexcept
340340
341341inline bool table_data::column_has_streamer (uint32_t idx) const noexcept
342342{
343- return streamers_.streamers .size () > idx && streamers_.streamers [idx] != nullptr ;
343+ return streamers_.column_to_batches .size () > idx && ! streamers_.column_to_batches [idx]. empty () ;
344344}
345345
346346inline void table_data::reset_streamers () noexcept
@@ -529,45 +529,49 @@ inline std::pair<int64_t, int64_t> table_data::get_row_range(int32_t worker_id)
529529
530530inline void table_data::create_streamer (int32_t idx, int32_t worker_id)
531531{
532- if (streamers_.streamers .empty ()) {
533- const auto s = num_columns ();
534- streamers_.streamers .resize (s);
535- std::vector<streamer_info::column_data> temp_data (s);
536- streamers_.column_to_batches .swap (temp_data);
537- }
538- if (!streamers_.streamers [idx]) {
539- if (pg::memory_tracker::has_memory_limit ()) {
540- const auto column_size =
541- pg::utils::get_column_width (get_base_atttypid (idx), get_atttypmod (idx)) * num_rows ();
542- pg::memory_tracker::ensure_memory_available (column_size);
543- }
544- if (worker_id != -1 ) {
545- auto [start_row, end_row] = get_row_range (worker_id);
546- auto new_column = heimdall_common::create_filtered_column (
547- *(get_column_view (idx)), icm::index_mapping_t <int64_t >::slice ({start_row, end_row, 1 }));
548- streamers_.streamers [idx] = std::make_unique<bifrost::column_streamer>(new_column, batch_size_);
549- } else {
550- streamers_.streamers [idx] = std::make_unique<bifrost::column_streamer>(get_column_view (idx), batch_size_);
551- }
552- const int64_t batch_index = (num_rows () - 1 ) / batch_size_;
553- streamers_.column_to_batches [idx].batches .resize (batch_index + 1 );
532+ const auto col_count = num_columns ();
533+ if (streamers_.column_to_batches .empty ()) {
534+ streamers_.column_to_batches .resize (col_count);
535+ }
536+ ASSERT (idx >= 0 && idx < col_count);
537+ auto & column_batches = streamers_.column_to_batches [idx];
538+ if (!column_batches.empty ()) {
539+ return ;
540+ }
541+ if (pg::memory_tracker::has_memory_limit ()) {
542+ const auto column_size = pg::utils::get_column_width (get_base_atttypid (idx), get_atttypmod (idx)) * num_rows ();
543+ pg::memory_tracker::ensure_memory_available (column_size);
544+ }
545+ heimdall::column_view_ptr cv = get_column_view (idx);
546+ if (worker_id != -1 ) {
547+ auto [start_row, end_row] = get_row_range (worker_id);
548+ cv = heimdall_common::create_filtered_column (*(cv),
549+ icm::index_mapping_t <int64_t >::slice ({start_row, end_row, 1 }));
550+ }
551+ const int64_t row_count = num_rows ();
552+ const int64_t batch_count = (row_count + batch_size_ - 1 ) / batch_size_;
553+ column_batches = std::vector<streamer_info::batch_data>(batch_count);
554+ for (int64_t i = 0 ; i < batch_count; ++i) {
555+ const auto range_start = i * batch_size_;
556+ const auto range_end = std::min<int64_t >(range_start + batch_size_, row_count);
557+ auto p = async::run_on_main ([cv, range_start, range_end, row_count]() {
558+ return cv->request_range (
559+ range_start, range_end, storage::fetch_options (static_cast <int >(row_count - range_start)));
560+ });
561+ column_batches[i].promise_ = std::move (p);
554562 }
555563}
556564
557565inline nd::array table_data::streamer_info::get_sample (int32_t column_number, int64_t row_number)
558566{
559567 const int64_t batch_index = row_number >> batch_size_log2_;
560568 const int64_t row_in_batch = row_number & batch_mask_;
561-
562- auto & batches = column_to_batches[column_number].batches ;
563- auto & batch = batches[batch_index];
564- if (!batch.initialized_ .load (std::memory_order_acquire)) [[unlikely]] {
565- std::scoped_lock lock (column_to_batches[column_number].mutex );
566- for (int64_t i = 0 ; i <= batch_index; ++i) {
567- if (!batches[static_cast <size_t >(i)].initialized_ .load (std::memory_order_relaxed)) {
568- batches[static_cast <size_t >(i)].owner_ = streamers[static_cast <size_t >(column_number)]->next_batch ();
569- batches[static_cast <size_t >(i)].initialized_ .store (true , std::memory_order_release);
570- }
569+ auto & batch = column_to_batches[column_number][batch_index];
570+ if (static_cast <bool >(batch.promise_ )) [[unlikely]] {
571+ std::lock_guard lock (batch.mutex_ );
572+ if (static_cast <bool >(batch.promise_ )) {
573+ batch.owner_ = batch.promise_ .get_future ().get ();
574+ batch.promise_ = async::promise<nd::array>();
571575 }
572576 }
573577 return batch.owner_ [static_cast <size_t >(row_in_batch)];
@@ -576,23 +580,7 @@ inline nd::array table_data::streamer_info::get_sample(int32_t column_number, in
576580template <typename T>
577581inline T table_data::streamer_info::value (int32_t column_number, int64_t row_number)
578582{
579- const int64_t batch_index = row_number >> batch_size_log2_;
580- const int64_t row_in_batch = row_number & batch_mask_;
581-
582- auto & batches = column_to_batches[column_number].batches ;
583- auto & batch = batches[batch_index];
584- if (!batch.initialized_ .load (std::memory_order_acquire)) [[unlikely]] {
585- std::scoped_lock lock (column_to_batches[column_number].mutex );
586- for (int64_t i = 0 ; i <= batch_index; ++i) {
587- if (!batches[static_cast <size_t >(i)].initialized_ .load (std::memory_order_relaxed)) {
588- batches[static_cast <size_t >(i)].owner_ = utils::eval_with_nones<T>(streamers[static_cast <size_t >(column_number)]->next_batch ());
589- batches[static_cast <size_t >(i)].data_ = batches[static_cast <size_t >(i)].owner_ .data ().data ();
590- batches[static_cast <size_t >(i)].initialized_ .store (true , std::memory_order_release);
591- }
592- }
593- }
594-
595- return reinterpret_cast <const T*>(batch.data_ )[static_cast <size_t >(row_in_batch)];
583+ return *(value_ptr<T>(column_number, row_number));
596584}
597585
598586template <typename T>
@@ -601,16 +589,13 @@ inline const T* table_data::streamer_info::value_ptr(int32_t column_number, int6
601589 const int64_t batch_index = row_number >> batch_size_log2_;
602590 const int64_t row_in_batch = row_number & batch_mask_;
603591
604- auto & batches = column_to_batches[column_number].batches ;
605- auto & batch = batches[batch_index];
606- if (!batch.initialized_ .load (std::memory_order_acquire)) [[unlikely]] {
607- std::scoped_lock lock (column_to_batches[column_number].mutex );
608- for (int64_t i = 0 ; i <= batch_index; ++i) {
609- if (!batches[static_cast <size_t >(i)].initialized_ .load (std::memory_order_relaxed)) {
610- batches[static_cast <size_t >(i)].owner_ = utils::eval_with_nones<T>(streamers[static_cast <size_t >(column_number)]->next_batch ());
611- batches[static_cast <size_t >(i)].data_ = batches[static_cast <size_t >(i)].owner_ .data ().data ();
612- batches[static_cast <size_t >(i)].initialized_ .store (true , std::memory_order_release);
613- }
592+ auto & batch = column_to_batches[column_number][batch_index];
593+ if (static_cast <bool >(batch.promise_ )) [[unlikely]] {
594+ std::lock_guard lock (batch.mutex_ );
595+ if (static_cast <bool >(batch.promise_ )) {
596+ batch.owner_ = utils::eval_with_nones<T>(batch.promise_ .get_future ().get ());
597+ batch.data_ = batch.owner_ .data ().data ();
598+ batch.promise_ = async::promise<nd::array>();
614599 }
615600 }
616601
@@ -623,16 +608,13 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number,
623608 const int64_t batch_index = row_number >> batch_size_log2_;
624609 const int64_t row_in_batch = row_number & batch_mask_;
625610
626- auto & batches = column_to_batches[column_number].batches ;
627- auto & batch = batches[batch_index];
628- if (!batch.initialized_ .load (std::memory_order_acquire)) [[unlikely]] {
629- std::scoped_lock lock (column_to_batches[column_number].mutex );
630- for (int64_t i = 0 ; i <= batch_index; ++i) {
631- if (!batches[static_cast <size_t >(i)].initialized_ .load (std::memory_order_relaxed)) {
632- batches[static_cast <size_t >(i)].owner_ = streamers[static_cast <size_t >(column_number)]->next_batch ();
633- batches[static_cast <size_t >(i)].holder_ = impl::string_stream_array_holder (batches[static_cast <size_t >(i)].owner_ );
634- batches[static_cast <size_t >(i)].initialized_ .store (true , std::memory_order_release);
635- }
611+ auto & batch = column_to_batches[column_number][batch_index];
612+ if (static_cast <bool >(batch.promise_ )) [[unlikely]] {
613+ std::lock_guard lock (batch.mutex_ );
614+ if (static_cast <bool >(batch.promise_ )) {
615+ batch.owner_ = batch.promise_ .get_future ().get ();
616+ batch.holder_ = impl::string_stream_array_holder (batch.owner_ );
617+ batch.promise_ = async::promise<nd::array>();
636618 }
637619 }
638620
0 commit comments