Skip to content

Commit e54b471

Browse files
authored
stable-25-3-1: fix potential hang if no new reads are started in the new data batch (#26517)
2 parents 9cdf17e + 4ee595e commit e54b471

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,8 +305,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
305305
ReadRowsCount += replyResultStats.ReadRowsCount;
306306
ReadBytesCount += replyResultStats.ReadBytesCount;
307307

308-
if (!StreamLookupWorker->IsOverloaded()) {
308+
auto overloaded = StreamLookupWorker->IsOverloaded();
309+
if (!overloaded.has_value()) {
309310
FetchInputRows();
311+
} else {
312+
CA_LOG_N("Pausing stream lookup because it's overloaded by reason: "
313+
<< overloaded.value_or("empty"));
310314
}
311315

312316
if (Partitioning) {
@@ -316,9 +320,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
316320
const bool inputRowsFinished = LastFetchStatus == NUdf::EFetchStatus::Finish;
317321
const bool allReadsFinished = AllReadsFinished();
318322
const bool allRowsProcessed = StreamLookupWorker->AllRowsProcessed();
323+
const bool hasPendingResults = StreamLookupWorker->HasPendingResults();
319324

320-
if (inputRowsFinished && allReadsFinished && !allRowsProcessed) {
321-
// all reads are completed, but we have unprocessed rows
325+
if (hasPendingResults) {
326+
// has more results
322327
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
323328
}
324329

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
303303
return readRequests;
304304
}
305305

306+
bool HasPendingResults() final {
307+
return !ReadResults.empty();
308+
}
309+
306310
void AddResult(TShardReadResult result) final {
307311
const auto& record = result.ReadResult->Get()->Record;
308312
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
@@ -317,8 +321,8 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
317321
ReadResults.emplace_back(std::move(result));
318322
}
319323

320-
bool IsOverloaded() final {
321-
return false;
324+
std::optional<TString> IsOverloaded() final {
325+
return std::nullopt;
322326
}
323327

324328
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
@@ -585,8 +589,20 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
585589
YQL_ENSURE(false);
586590
}
587591

588-
bool IsOverloaded() final {
589-
return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT;
592+
std::optional<TString> IsOverloaded() final {
593+
if (UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT ||
594+
PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT ||
595+
ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT)
596+
{
597+
TStringBuilder overloadDescriptor;
598+
overloadDescriptor << "unprocessed rows: " << UnprocessedRows.size()
599+
<< ", pending left: " << PendingLeftRowsByKey.size()
600+
<< ", result rows: " << ResultRowsBySeqNo.size();
601+
602+
return TString(overloadDescriptor);
603+
}
604+
605+
return std::nullopt;
590606
}
591607

592608
std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) final {
@@ -778,6 +794,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
778794
return requests;
779795
}
780796

797+
bool HasPendingResults() final {
798+
auto nextSeqNo = KeepRowsOrder() ? ResultRowsBySeqNo.find(CurrentResultSeqNo) : ResultRowsBySeqNo.begin();
799+
800+
if (nextSeqNo != ResultRowsBySeqNo.end() && !nextSeqNo->second.Rows.empty()) {
801+
return true;
802+
}
803+
804+
return false;
805+
}
806+
781807
void AddResult(TShardReadResult result) final {
782808
const auto& record = result.ReadResult->Get()->Record;
783809
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
@@ -880,7 +906,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
880906
batch.clear();
881907

882908
auto getNextResult = [&]() {
883-
if (!ShoulKeepRowsOrder()) {
909+
if (!KeepRowsOrder()) {
884910
return ResultRowsBySeqNo.begin();
885911
}
886912

@@ -1063,12 +1089,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10631089
}
10641090
};
10651091

1066-
bool ShoulKeepRowsOrder() const {
1092+
bool KeepRowsOrder() const {
10671093
return Settings.KeepRowsOrder;
10681094
}
10691095

10701096
bool IsRowSeqNoValid(const ui64& seqNo) const {
1071-
if (!ShoulKeepRowsOrder()) {
1097+
if (!KeepRowsOrder()) {
10721098
return true;
10731099
}
10741100

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ class TKqpStreamLookupWorker {
7878
virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
7979
virtual TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)> reader) = 0;
8080
virtual bool AllRowsProcessed() = 0;
81+
virtual bool HasPendingResults() = 0;
8182
virtual void ResetRowsProcessing(ui64 readId) = 0;
82-
virtual bool IsOverloaded() = 0;
83+
virtual std::optional<TString> IsOverloaded() = 0;
8384

8485
protected:
8586
const NMiniKQL::TTypeEnvironment& TypeEnv;

0 commit comments

Comments
 (0)