@@ -38,7 +38,7 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
3838 return results;
3939 }
4040
41- TSession CreateTableForVectorSearch (TTableClient& db, bool nullable, const TString& dataCol = " data" ) {
41+ TSession CreateTableForVectorSearch (TTableClient& db, bool nullable, const TString& dataCol = " data" , bool singlePartition = false ) {
4242 auto session = db.CreateSession ().GetValueSync ().GetSession ();
4343
4444 {
@@ -55,13 +55,17 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
5555 .AddNonNullableColumn (dataCol, EPrimitiveType::String);
5656 }
5757 tableBuilder.SetPrimaryKeyColumns ({" pk" });
58- tableBuilder.BeginPartitioningSettings ()
59- .SetMinPartitionsCount (3 )
60- .EndPartitioningSettings ();
61- auto partitions = TExplicitPartitions{}
62- .AppendSplitPoints (TValueBuilder{}.BeginTuple ().AddElement ().OptionalInt64 (4 ).EndTuple ().Build ())
63- .AppendSplitPoints (TValueBuilder{}.BeginTuple ().AddElement ().OptionalInt64 (6 ).EndTuple ().Build ());
64- tableBuilder.SetPartitionAtKeys (partitions);
58+
59+ if (!singlePartition) {
60+ tableBuilder.BeginPartitioningSettings ()
61+ .SetMinPartitionsCount (3 )
62+ .EndPartitioningSettings ();
63+ auto partitions = TExplicitPartitions{}
64+ .AppendSplitPoints (TValueBuilder{}.BeginTuple ().AddElement ().OptionalInt64 (4 ).EndTuple ().Build ())
65+ .AppendSplitPoints (TValueBuilder{}.BeginTuple ().AddElement ().OptionalInt64 (6 ).EndTuple ().Build ());
66+ tableBuilder.SetPartitionAtKeys (partitions);
67+ }
68+
6569 auto result = session.CreateTable (" /Root/TestTable" , tableBuilder.Build ()).ExtractValueSync ();
6670 UNIT_ASSERT_VALUES_EQUAL (result.IsTransportError (), false );
6771 UNIT_ASSERT_VALUES_EQUAL_C (result.GetStatus (), EStatus::SUCCESS, result.GetIssues ().ToString ());
@@ -88,6 +92,42 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
8892 return session;
8993 }
9094
95+ template <bool Nullable>
96+ void VerifyVectorSearchResults (TKikimrRunner& kikimr, TSession& session, TTxSettings txSettings = TTxSettings::SerializableRW (), bool useRunCall = true ) {
97+ // Verify actual results - check that top 3 PKs are correct
98+ // Target vector is 0x67, 0x71 (103, 113)
99+ // Cosine distances calculated:
100+ // pk=8 (117, 118): 0.000882 - closest
101+ // pk=5 (80, 96): 0.000985
102+ // pk=9 (118, 118): 0.001070
103+
104+ const TString query = Q_ (R"(
105+ $TargetEmbedding = String::HexDecode("677102");
106+ SELECT pk, Knn::CosineDistance(emb, $TargetEmbedding) AS distance FROM `/Root/TestTable`
107+ ORDER BY distance
108+ LIMIT 3
109+ )" );
110+
111+ auto result = useRunCall
112+ ? kikimr.RunCall ([&] {
113+ return session.ExecuteDataQuery (query, TTxControl::BeginTx (txSettings).CommitTx ()).ExtractValueSync ();
114+ })
115+ : session.ExecuteDataQuery (query, TTxControl::BeginTx (txSettings).CommitTx ()).ExtractValueSync ();
116+
117+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
118+
119+ // Extract PKs and distances from result
120+ auto results = ExtractPksAndScores<Nullable>(result.GetResultSetParser (0 ));
121+
122+ UNIT_ASSERT_VALUES_EQUAL (results.size (), 3u );
123+ UNIT_ASSERT_VALUES_EQUAL (results[0 ].first , 8 );
124+ UNIT_ASSERT_VALUES_EQUAL (results[1 ].first , 5 );
125+ UNIT_ASSERT_VALUES_EQUAL (results[2 ].first , 9 );
126+ CheckDistance (results[0 ].second , 0 .000882f );
127+ CheckDistance (results[1 ].second , 0 .000985f );
128+ CheckDistance (results[2 ].second , 0 .001070f );
129+ }
130+
91131 Y_UNIT_TEST_TWIN (VectorSearchKnnPushdown, Nullable) {
92132 auto setting = NKikimrKqp::TKqpSetting ();
93133 auto serverSettings = TKikimrSettings ()
@@ -96,7 +136,6 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
96136
97137 TKikimrRunner kikimr (serverSettings);
98138 auto runtime = kikimr.GetTestServer ().GetRuntime ();
99- runtime->SetLogPriority (NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);
100139
101140 auto db = kikimr.RunCall ([&] { return kikimr.GetTableClient (); });
102141 auto session = kikimr.RunCall ([&] { return CreateTableForVectorSearch (db, Nullable, " ___data" ); });
@@ -135,15 +174,6 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
135174 UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
136175 };
137176
138- auto runQueryWithResult = [&](const TString& query) {
139- auto result = kikimr.RunCall ([&] {
140- return session.ExecuteDataQuery (query,
141- TTxControl::BeginTx (TTxSettings::SerializableRW ()).CommitTx ()).ExtractValueSync ();
142- });
143- UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
144- return result;
145- };
146-
147177 auto runQueryWithParams = [&](const TString& query, TParams params) {
148178 auto result = kikimr.RunCall ([&] {
149179 return session.ExecuteDataQuery (query,
@@ -271,31 +301,8 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
271301 LIMIT 3;
272302 )" ));
273303
274- // Verify actual results - check that top 3 PKs are correct
275- // Target vector is 0x67, 0x71 (103, 113)
276- // Cosine distances calculated:
277- // pk=8 (117, 118): 0.000882 - closest
278- // pk=5 (80, 96): 0.000985
279- // pk=9 (118, 118): 0.001070
280- {
281- auto result = runQueryWithResult (Q_ (R"(
282- $TargetEmbedding = String::HexDecode("677102");
283- SELECT pk, Knn::CosineDistance(emb, $TargetEmbedding) AS distance FROM `/Root/TestTable`
284- ORDER BY distance
285- LIMIT 3
286- )" ));
287-
288- // Extract PKs and distances from result
289- auto results = ExtractPksAndScores<Nullable>(result.GetResultSetParser (0 ));
290-
291- UNIT_ASSERT_VALUES_EQUAL (results.size (), 3u );
292- UNIT_ASSERT_VALUES_EQUAL (results[0 ].first , 8 );
293- UNIT_ASSERT_VALUES_EQUAL (results[1 ].first , 5 );
294- UNIT_ASSERT_VALUES_EQUAL (results[2 ].first , 9 );
295- CheckDistance (results[0 ].second , 0 .000882f );
296- CheckDistance (results[1 ].second , 0 .000985f );
297- CheckDistance (results[2 ].second , 0 .001070f );
298- }
304+ // Verify actual results
305+ VerifyVectorSearchResults<Nullable>(kikimr, session);
299306
300307 // Test with subquery: target vector from another table
301308 {
@@ -534,8 +541,67 @@ Y_UNIT_TEST_SUITE(KqpKnn) {
534541 DoVectorKnnPushdownTest (EVectorType::Int8);
535542 }
536543
544+ Y_UNIT_TEST_TWIN (VectorSearchKnnPushdownFollower, StaleRO) {
545+ const TString tableName = " /Root/TestTable" ;
546+
547+ auto setting = NKikimrKqp::TKqpSetting ();
548+ auto serverSettings = TKikimrSettings ()
549+ .SetEnableForceFollowers (true )
550+ .SetKqpSettings ({setting});
551+
552+ TKikimrRunner kikimr (serverSettings);
553+ auto runtime = kikimr.GetTestServer ().GetRuntime ();
554+
555+ auto db = kikimr.GetTableClient ();
556+ auto session = CreateTableForVectorSearch (db, false , " data" , true ); // singlePartition = true for followers
557+
558+ // Setup observer to verify VectorTopK pushdown
559+ auto observer = runtime->AddObserver <TEvDataShard::TEvRead>([&](auto & ev) {
560+ auto & read = ev->Get ()->Record ;
561+ UNIT_ASSERT (read.HasVectorTopK ());
562+ UNIT_ASSERT_VALUES_EQUAL (read.GetVectorTopK ().GetLimit (), 3u );
563+ });
564+
565+ // Enable followers on the table
566+ {
567+ const TString alterTable (Q_ (Sprintf (R"(
568+ ALTER TABLE `%s` SET (READ_REPLICAS_SETTINGS = "PER_AZ:1");
569+ )" , tableName.c_str ())));
570+
571+ auto result = session.ExecuteSchemeQuery (alterTable).ExtractValueSync ();
572+ UNIT_ASSERT_C (result.IsSuccess (), result.GetIssues ().ToString ());
573+ }
574+
575+ // Verify followers are configured
576+ {
577+ auto result = session.DescribeTable (tableName).ExtractValueSync ();
578+ UNIT_ASSERT_VALUES_EQUAL_C (result.GetStatus (), NYdb::EStatus::SUCCESS, result.GetIssues ().ToString ());
579+
580+ const auto & table = result.GetTableDescription ();
581+ UNIT_ASSERT (table.GetReadReplicasSettings ()->GetMode () == NYdb::NTable::TReadReplicasSettings::EMode::PerAz);
582+ UNIT_ASSERT_VALUES_EQUAL (table.GetReadReplicasSettings ()->GetReadReplicasCount (), 1 );
583+ }
584+
585+ // Perform read
586+ VerifyVectorSearchResults<false >(kikimr, session, StaleRO ? TTxSettings::StaleRO () : TTxSettings::SerializableRW (), false );
587+
588+ if (StaleRO) {
589+ // from leader - should NOT read
590+ CheckTableReads (session, tableName, false , false );
591+ // from followers - should read
592+ CheckTableReads (session, tableName, true , true );
593+ } else {
594+ // from leader - should read
595+ CheckTableReads (session, tableName, false , true );
596+ // from followers - should NOT read
597+ CheckTableReads (session, tableName, true , false );
598+ }
599+
600+ // Observer is automatically removed when it goes out of scope
601+ observer.Remove ();
602+ }
603+
537604}
538605
539606}
540607}
541-
0 commit comments