|
78 | 78 | #include "yb/util/trace.h" |
79 | 79 | #include "yb/util/yb_pg_errcodes.h" |
80 | 80 |
|
81 | | -#include "yb/vector_index/vectorann.h" |
| 81 | +#include "yb/vector_index/vector_index_if.h" |
82 | 82 |
|
83 | 83 | #include "yb/yql/pggate/util/pg_doc_data.h" |
84 | 84 | #include "yb/yql/pgwrapper/pg_wrapper.h" |
85 | 85 |
|
86 | 86 | using namespace std::literals; |
87 | 87 |
|
88 | | -using yb::vector_index::ANNPagingState; |
89 | | -using yb::vector_index::DocKeyWithDistance; |
90 | | -using yb::vector_index::IndexableVectorType; |
91 | | -using yb::vector_index::DummyANNFactory; |
92 | | -using yb::vector_index::VectorANN; |
93 | | - |
94 | 88 | DECLARE_bool(ysql_disable_index_backfill); |
95 | 89 |
|
96 | 90 | DEPRECATE_FLAG(double, ysql_scan_timeout_multiplier, "10_2022"); |
@@ -2115,59 +2109,6 @@ class PgsqlReadRequestYbctidProvider { |
2115 | 2109 | std::optional<int64> current_order_; |
2116 | 2110 | }; |
2117 | 2111 |
|
2118 | | -template<IndexableVectorType Vector> |
2119 | | -class ANNKeyProvider { |
2120 | | - public: |
2121 | | - explicit ANNKeyProvider( |
2122 | | - const DocReadContext& doc_read_context, PgsqlResponsePB& response, size_t prefetch_size, |
2123 | | - const Vector& query_vec, VectorANN<Vector>* ann, const ANNPagingState& paging_state) |
2124 | | - : response_(response), prefetch_size_(prefetch_size), query_vec_(query_vec), ann_(ann) { |
2125 | | - next_batch_paging_state_ = paging_state; |
2126 | | - CHECK_GT(prefetch_size_, 0); |
2127 | | - } |
2128 | | - |
2129 | | - bool RefillBatch() { |
2130 | | - CHECK(key_batch_.empty()); |
2131 | | - auto batch = ann_->GetTopKVectors( |
2132 | | - query_vec_, prefetch_size_, next_batch_paging_state_.distance(), |
2133 | | - next_batch_paging_state_.main_key(), false); |
2134 | | - |
2135 | | - std::sort(batch.begin(), batch.end(), std::less<DocKeyWithDistance>()); |
2136 | | - key_batch_.insert(key_batch_.end(), batch.begin(), batch.end()); |
2137 | | - if (key_batch_.empty()) { |
2138 | | - return false; |
2139 | | - } |
2140 | | - next_batch_paging_state_ = |
2141 | | - ANNPagingState(key_batch_.back().distance_, key_batch_.back().dockey_); |
2142 | | - return true; |
2143 | | - } |
2144 | | - |
2145 | | - Slice FetchKey() { |
2146 | | - if (key_batch_.empty() && !RefillBatch()) { |
2147 | | - return Slice(); |
2148 | | - } |
2149 | | - |
2150 | | - auto ret = key_batch_.front(); |
2151 | | - current_entry_ = ret; |
2152 | | - key_batch_.pop_front(); |
2153 | | - return ret.dockey_; |
2154 | | - } |
2155 | | - |
2156 | | - ANNPagingState GetNextBatchPagingState() const { return next_batch_paging_state_; } |
2157 | | - |
2158 | | - void AddedKeyToResultSet() { response_.add_vector_index_distances(current_entry_->distance_); } |
2159 | | - |
2160 | | - private: |
2161 | | - PgsqlResponsePB& response_; |
2162 | | - size_t prefetch_size_; |
2163 | | - const FloatVector& query_vec_; |
2164 | | - VectorANN<Vector>* ann_; |
2165 | | - std::deque<DocKeyWithDistance> key_batch_; |
2166 | | - ANNPagingState next_batch_paging_state_; |
2167 | | - |
2168 | | - std::optional<DocKeyWithDistance> current_entry_; |
2169 | | -}; |
2170 | | - |
2171 | 2112 | Result<size_t> PgsqlReadOperation::Execute() { |
2172 | 2113 | // Verify that this request references no columns marked for deletion. |
2173 | 2114 | RETURN_NOT_OK(VerifyNoRefColsMarkedForDeletion(data_.doc_read_context.schema(), request_)); |
@@ -2205,9 +2146,6 @@ Result<size_t> PgsqlReadOperation::Execute() { |
2205 | 2146 | } else { |
2206 | 2147 | std::tie(fetched_rows, has_paging_state) = VERIFY_RESULT(ExecuteSample()); |
2207 | 2148 | } |
2208 | | - } else if (request_.has_vector_idx_options()) { |
2209 | | - std::tie(fetched_rows, has_paging_state) = VERIFY_RESULT(ExecuteVectorSearch( |
2210 | | - data_.doc_read_context, request_.vector_idx_options())); |
2211 | 2149 | } else if (request_.index_request().has_vector_idx_options()) { |
2212 | 2150 | fetched_rows = VERIFY_RESULT(ExecuteVectorLSMSearch( |
2213 | 2151 | request_.index_request().vector_idx_options())); |
@@ -2680,91 +2618,6 @@ Result<std::tuple<size_t, bool>> PgsqlReadOperation::ExecuteSampleBlockBased() { |
2680 | 2618 | return std::tuple<size_t, bool>{fetched_items, false}; |
2681 | 2619 | } |
2682 | 2620 |
|
2683 | | -Result<std::tuple<size_t, bool>> PgsqlReadOperation::ExecuteVectorSearch( |
2684 | | - const DocReadContext& doc_read_context, const PgVectorReadOptionsPB& options) { |
2685 | | - // Build the vectorann and then make an index_doc_read_context on the vectorann |
2686 | | - // to get the index iterator. Then do ExecuteBatchKeys on the index iterator. |
2687 | | - RSTATUS_DCHECK(options.has_vector(), InvalidArgument, "Query vector not provided"); |
2688 | | - |
2689 | | - auto query_vec = options.vector().binary_value(); |
2690 | | - |
2691 | | - auto ysql_query_vec = pointer_cast<const vector_index::YSQLVector*>(query_vec.data()); |
2692 | | - |
2693 | | - auto query_vec_ref = VERIFY_RESULT( |
2694 | | - VectorANN<FloatVector>::GetVectorFromYSQLWire(*ysql_query_vec, query_vec.size())); |
2695 | | - |
2696 | | - DummyANNFactory<FloatVector> ann_factory; |
2697 | | - auto ann_store = ann_factory.Create(ysql_query_vec->dim); |
2698 | | - |
2699 | | - dockv::ReaderProjection index_doc_projection; |
2700 | | - |
2701 | | - auto key_col_id = doc_read_context.schema().column_id(0); |
2702 | | - |
2703 | | - // Building the schema to extract the vector and key from the main DocDB store. |
2704 | | - // Vector should be the first value after the key. |
2705 | | - auto vector_col_id = |
2706 | | - doc_read_context.schema().column_id(doc_read_context.schema().num_key_columns()); |
2707 | | - index_doc_projection.Init(doc_read_context.schema(), {key_col_id, vector_col_id}); |
2708 | | - |
2709 | | - FilteringIterator table_iter(&table_iter_); |
2710 | | - RETURN_NOT_OK(table_iter.Init(data_, request_, index_doc_projection, doc_read_context)); |
2711 | | - dockv::PgTableRow row(index_doc_projection); |
2712 | | - const auto& table_id = request_.table_id(); |
2713 | | - |
2714 | | - // Build the VectorANN. |
2715 | | - for (;;) { |
2716 | | - const auto fetch_result = |
2717 | | - VERIFY_RESULT(FetchTableRow(table_id, &table_iter, nullptr /* index */, &row)); |
2718 | | - // If changing this code, see also PgsqlReadOperation::ExecuteBatchKeys. |
2719 | | - if (fetch_result == FetchResult::NotFound) { |
2720 | | - break; |
2721 | | - } |
2722 | | - ++scanned_table_rows_; |
2723 | | - if (fetch_result == FetchResult::Found) { |
2724 | | - auto vec_value = row.GetValueByColumnId(vector_col_id); |
2725 | | - if (!vec_value.has_value()) { |
2726 | | - continue; |
2727 | | - } |
2728 | | - |
2729 | | - // Add the vector to the ANN store |
2730 | | - auto encoded = dockv::EncodedDocVectorValue::FromSlice(vec_value->binary_value()); |
2731 | | - auto vec = VERIFY_RESULT(VectorANN<FloatVector>::GetVectorFromYSQLWire(encoded.data)); |
2732 | | - auto doc_iter = down_cast<DocRowwiseIterator*>(table_iter_.get()); |
2733 | | - ann_store->Add(VERIFY_RESULT(encoded.DecodeId()), std::move(vec), doc_iter->GetTupleId()); |
2734 | | - } |
2735 | | - } |
2736 | | - |
2737 | | - // Check for paging state. |
2738 | | - ANNPagingState ann_paging_state; |
2739 | | - if (request_.has_paging_state()) { |
2740 | | - ann_paging_state = |
2741 | | - ANNPagingState{request_.paging_state().distance(), request_.paging_state().main_key()}; |
2742 | | - } |
2743 | | - |
2744 | | - // All rows have been added to the ANN store, now we can create the iterator. |
2745 | | - auto initial_prefetch_size = request_.vector_idx_options().prefetch_size(); |
2746 | | - initial_prefetch_size = std::max(initial_prefetch_size, 25); |
2747 | | - |
2748 | | - ANNKeyProvider key_provider( |
2749 | | - doc_read_context, response_, initial_prefetch_size, query_vec_ref, ann_store.get(), |
2750 | | - ann_paging_state); |
2751 | | - auto fetched_rows = VERIFY_RESULT(ExecuteBatchKeys(key_provider)); |
2752 | | - |
2753 | | - auto next_paging_state = key_provider.GetNextBatchPagingState(); |
2754 | | - |
2755 | | - // Set paging state. |
2756 | | - bool has_paging_state = !next_paging_state.valid(); |
2757 | | - if (has_paging_state) { |
2758 | | - auto* paging_state = response_.mutable_paging_state(); |
2759 | | - paging_state->set_distance(next_paging_state.distance()); |
2760 | | - paging_state->set_main_key(next_paging_state.main_key().ToBuffer()); |
2761 | | - |
2762 | | - BindReadTimeToPagingState(data_.read_operation_data.read_time); |
2763 | | - } |
2764 | | - |
2765 | | - return std::tuple<size_t, bool>{fetched_rows, has_paging_state}; |
2766 | | -} |
2767 | | - |
2768 | 2621 | Result<size_t> PgsqlReadOperation::ExecuteVectorLSMSearch(const PgVectorReadOptionsPB& options) { |
2769 | 2622 | RSTATUS_DCHECK( |
2770 | 2623 | data_.vector_index, IllegalState, "Search vector when vector index is null: $0", request_); |
|
0 commit comments