@@ -303,6 +303,10 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
303
303
return readRequests;
304
304
}
305
305
306
+ bool HasPendingResults () final {
307
+ return !ReadResults.empty ();
308
+ }
309
+
306
310
void AddResult (TShardReadResult result) final {
307
311
const auto & record = result.ReadResult ->Get ()->Record ;
308
312
YQL_ENSURE (record.GetStatus ().GetCode () == Ydb::StatusIds::SUCCESS);
@@ -317,8 +321,8 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
317
321
ReadResults.emplace_back (std::move (result));
318
322
}
319
323
320
- bool IsOverloaded () final {
321
- return false ;
324
+ std::optional<TString> IsOverloaded () final {
325
+ return std:: nullopt ;
322
326
}
323
327
324
328
TReadResultStats ReplyResult (NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
@@ -585,8 +589,20 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
585
589
YQL_ENSURE (false );
586
590
}
587
591
588
- bool IsOverloaded () final {
589
- return UnprocessedRows.size () >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size () >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size () >= MAX_IN_FLIGHT_LIMIT;
592
+ std::optional<TString> IsOverloaded () final {
593
+ if (UnprocessedRows.size () >= MAX_IN_FLIGHT_LIMIT ||
594
+ PendingLeftRowsByKey.size () >= MAX_IN_FLIGHT_LIMIT ||
595
+ ResultRowsBySeqNo.size () >= MAX_IN_FLIGHT_LIMIT)
596
+ {
597
+ TStringBuilder overloadDescriptor;
598
+ overloadDescriptor << " unprocessed rows: " << UnprocessedRows.size ()
599
+ << " , pending left: " << PendingLeftRowsByKey.size ()
600
+ << " , result rows: " << ResultRowsBySeqNo.size ();
601
+
602
+ return TString (overloadDescriptor);
603
+ }
604
+
605
+ return std::nullopt ;
590
606
}
591
607
592
608
std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest (const ui64& prevReadId, ui64& newReadId) final {
@@ -778,6 +794,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
778
794
return requests;
779
795
}
780
796
797
+ bool HasPendingResults () final {
798
+ auto nextSeqNo = KeepRowsOrder () ? ResultRowsBySeqNo.find (CurrentResultSeqNo) : ResultRowsBySeqNo.begin ();
799
+
800
+ if (nextSeqNo != ResultRowsBySeqNo.end () && !nextSeqNo->second .Rows .empty ()) {
801
+ return true ;
802
+ }
803
+
804
+ return false ;
805
+ }
806
+
781
807
void AddResult (TShardReadResult result) final {
782
808
const auto & record = result.ReadResult ->Get ()->Record ;
783
809
YQL_ENSURE (record.GetStatus ().GetCode () == Ydb::StatusIds::SUCCESS);
@@ -880,7 +906,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
880
906
batch.clear ();
881
907
882
908
auto getNextResult = [&]() {
883
- if (!ShoulKeepRowsOrder ()) {
909
+ if (!KeepRowsOrder ()) {
884
910
return ResultRowsBySeqNo.begin ();
885
911
}
886
912
@@ -1063,12 +1089,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
1063
1089
}
1064
1090
};
1065
1091
1066
- bool ShoulKeepRowsOrder () const {
1092
+ bool KeepRowsOrder () const {
1067
1093
return Settings.KeepRowsOrder ;
1068
1094
}
1069
1095
1070
1096
bool IsRowSeqNoValid (const ui64& seqNo) const {
1071
- if (!ShoulKeepRowsOrder ()) {
1097
+ if (!KeepRowsOrder ()) {
1072
1098
return true ;
1073
1099
}
1074
1100
0 commit comments