Skip to content

Commit f62910c

Browse files
gridnevvvitCopilot
andauthored
fix potential hang if no new reads are started in the new data batch (#26382)
Co-authored-by: Copilot <[email protected]>
1 parent bc6d961 commit f62910c

File tree

3 files changed

+43
-11
lines changed

3 files changed

+43
-11
lines changed

ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
306306
ReadRowsCount += replyResultStats.ReadRowsCount;
307307
ReadBytesCount += replyResultStats.ReadBytesCount;
308308

309-
if (!StreamLookupWorker->IsOverloaded()) {
309+
auto overloaded = StreamLookupWorker->IsOverloaded();
310+
if (!overloaded.has_value()) {
310311
FetchInputRows();
312+
} else {
313+
CA_LOG_N("Pausing stream lookup because it's overloaded by reason: "
314+
<< overloaded.value_or("empty"));
311315
}
312316

313317
if (Partitioning) {
@@ -317,9 +321,10 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
317321
const bool inputRowsFinished = LastFetchStatus == NUdf::EFetchStatus::Finish;
318322
const bool allReadsFinished = AllReadsFinished();
319323
const bool allRowsProcessed = StreamLookupWorker->AllRowsProcessed();
324+
const bool hasPendingResults = StreamLookupWorker->HasPendingResults();
320325

321-
if (inputRowsFinished && allReadsFinished && !allRowsProcessed) {
322-
// all reads are completed, but we have unprocessed rows
326+
if (hasPendingResults) {
327+
// has more results
323328
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
324329
}
325330

ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,10 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
303303
return readRequests;
304304
}
305305

306+
bool HasPendingResults() final {
307+
return !ReadResults.empty();
308+
}
309+
306310
void AddResult(TShardReadResult result) final {
307311
const auto& record = result.ReadResult->Get()->Record;
308312
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
@@ -317,8 +321,8 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
317321
ReadResults.emplace_back(std::move(result));
318322
}
319323

320-
bool IsOverloaded() final {
321-
return false;
324+
std::optional<TString> IsOverloaded() final {
325+
return std::nullopt;
322326
}
323327

324328
TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) final {
@@ -585,8 +589,20 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
585589
YQL_ENSURE(false);
586590
}
587591

588-
bool IsOverloaded() final {
589-
return UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT || PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT || ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT;
592+
std::optional<TString> IsOverloaded() final {
593+
if (UnprocessedRows.size() >= MAX_IN_FLIGHT_LIMIT ||
594+
PendingLeftRowsByKey.size() >= MAX_IN_FLIGHT_LIMIT ||
595+
ResultRowsBySeqNo.size() >= MAX_IN_FLIGHT_LIMIT)
596+
{
597+
TStringBuilder overloadDescriptor;
598+
overloadDescriptor << "unprocessed rows: " << UnprocessedRows.size()
599+
<< ", pending left: " << PendingLeftRowsByKey.size()
600+
<< ", result rows: " << ResultRowsBySeqNo.size();
601+
602+
return TString(overloadDescriptor);
603+
}
604+
605+
return std::nullopt;
590606
}
591607

592608
std::vector<THolder<TEvDataShard::TEvRead>> RebuildRequest(const ui64& prevReadId, ui64& newReadId) final {
@@ -778,6 +794,16 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
778794
return requests;
779795
}
780796

797+
bool HasPendingResults() final {
798+
auto nextSeqNo = KeepRowsOrder() ? ResultRowsBySeqNo.find(CurrentResultSeqNo) : ResultRowsBySeqNo.begin();
799+
800+
if (nextSeqNo != ResultRowsBySeqNo.end() && !nextSeqNo->second.Rows.empty()) {
801+
return true;
802+
}
803+
804+
return false;
805+
}
806+
781807
void AddResult(TShardReadResult result) final {
782808
const auto& record = result.ReadResult->Get()->Record;
783809
YQL_ENSURE(record.GetStatus().GetCode() == Ydb::StatusIds::SUCCESS);
@@ -880,7 +906,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
880906
batch.clear();
881907

882908
auto getNextResult = [&]() {
883-
if (!ShoulKeepRowsOrder()) {
909+
if (!KeepRowsOrder()) {
884910
return ResultRowsBySeqNo.begin();
885911
}
886912

@@ -1063,12 +1089,12 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
10631089
}
10641090
};
10651091

1066-
bool ShoulKeepRowsOrder() const {
1092+
bool KeepRowsOrder() const {
10671093
return Settings.KeepRowsOrder;
10681094
}
10691095

10701096
bool IsRowSeqNoValid(const ui64& seqNo) const {
1071-
if (!ShoulKeepRowsOrder()) {
1097+
if (!KeepRowsOrder()) {
10721098
return true;
10731099
}
10741100

ydb/core/kqp/runtime/kqp_stream_lookup_worker.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ class TKqpStreamLookupWorker {
7878
virtual TReadResultStats ReplyResult(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, i64 freeSpace) = 0;
7979
virtual TReadResultStats ReadAllResult(std::function<void(TConstArrayRef<TCell>)> reader) = 0;
8080
virtual bool AllRowsProcessed() = 0;
81+
virtual bool HasPendingResults() = 0;
8182
virtual void ResetRowsProcessing(ui64 readId) = 0;
82-
virtual bool IsOverloaded() = 0;
83+
virtual std::optional<TString> IsOverloaded() = 0;
8384

8485
protected:
8586
const NMiniKQL::TTypeEnvironment& TypeEnv;

0 commit comments

Comments
 (0)