Skip to content

Commit 94ee8fc

Browse files
authored
merge to stable-25-3 YQ-4312 features and fixes for streaming 2 (#26409)
2 parents 24e134a + 3acee58 commit 94ee8fc

File tree

58 files changed

+1072
-458
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1072
-458
lines changed

ydb/core/fq/libs/init/init.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ void Init(
304304
pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : CreatePqNativeGateway(pqServices),
305305
yqSharedResources->UserSpaceYdbDriver,
306306
appData->Mon,
307-
appData->Counters);
307+
appData->Counters,
308+
MakeNodesManagerId());
308309
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
309310
}
310311

ydb/core/fq/libs/row_dispatcher/coordinator.cpp

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,15 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
189189
TCoordinatorMetrics Metrics;
190190
THashSet<TActorId> InterconnectSessions;
191191
ui64 NodesCount = 0;
192+
NActors::TActorId NodesManagerId;
192193

193194
public:
194195
TActorCoordinator(
195196
NActors::TActorId localRowDispatcherId,
196197
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
197198
const TString& tenant,
198-
const ::NMonitoring::TDynamicCounterPtr& counters);
199+
const ::NMonitoring::TDynamicCounterPtr& counters,
200+
NActors::TActorId nodesManagerId);
199201

200202
void Bootstrap();
201203

@@ -210,6 +212,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
210212
void Handle(TEvPrivate::TEvPrintState::TPtr&);
211213
void Handle(TEvPrivate::TEvListNodes::TPtr&);
212214
void Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr&);
215+
void Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr&);
213216

214217
STRICT_STFUNC(
215218
StateFunc, {
@@ -222,6 +225,7 @@ class TActorCoordinator : public TActorBootstrapped<TActorCoordinator> {
222225
hFunc(TEvPrivate::TEvPrintState, Handle);
223226
hFunc(TEvPrivate::TEvListNodes, Handle);
224227
hFunc(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult, Handle);
228+
hFunc(NFq::TEvNodesManager::TEvGetNodesResponse, Handle);
225229
})
226230

227231
private:
@@ -243,12 +247,14 @@ TActorCoordinator::TActorCoordinator(
243247
NActors::TActorId localRowDispatcherId,
244248
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
245249
const TString& tenant,
246-
const ::NMonitoring::TDynamicCounterPtr& counters)
250+
const ::NMonitoring::TDynamicCounterPtr& counters,
251+
NActors::TActorId nodesManagerId)
247252
: Config(config)
248253
, LocalRowDispatcherId(localRowDispatcherId)
249254
, LogPrefix("Coordinator: ")
250255
, Tenant(tenant)
251256
, Metrics(counters)
257+
, NodesManagerId(nodesManagerId)
252258
{
253259
AddRowDispatcher(localRowDispatcherId, true);
254260
}
@@ -257,8 +263,8 @@ void TActorCoordinator::Bootstrap() {
257263
Become(&TActorCoordinator::StateFunc);
258264
Send(LocalRowDispatcherId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
259265
ScheduleNodeInfoRequest();
260-
Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState());
261-
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId());
266+
// Schedule(TDuration::Seconds(PrintStatePeriodSec), new TEvPrivate::TEvPrintState()); // Logs (InternalState) is too big
267+
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped coordinator, id " << SelfId() << ", NodesManagerId " << NodesManagerId);
262268
auto nodeGroup = Metrics.Counters->GetSubgroup("node", ToString(SelfId().NodeId()));
263269
Metrics.IsActive = nodeGroup->GetCounter("IsActive");
264270
}
@@ -361,6 +367,12 @@ void TActorCoordinator::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected:
361367
void TActorCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
362368
LOG_ROW_DISPATCHER_DEBUG("TEvUndelivered, ev: " << ev->Get()->ToString());
363369

370+
if (ev->Sender == NodesManagerId) {
371+
LOG_ROW_DISPATCHER_INFO("TEvUndelivered, from nodes manager, reason: " << ev->Get()->Reason);
372+
NActors::TActivationContext::Schedule(NodesManagerRetryPeriod, new IEventHandle(NodesManagerId, SelfId(), new NFq::TEvNodesManager::TEvGetNodesRequest(), IEventHandle::FlagTrackDelivery));
373+
return;
374+
}
375+
364376
for (auto& [actorId, info] : RowDispatchers) {
365377
if (ev->Sender != actorId) {
366378
continue;
@@ -515,7 +527,7 @@ void TActorCoordinator::Handle(TEvPrivate::TEvPrintState::TPtr&) {
515527
}
516528

517529
void TActorCoordinator::Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult::TPtr& ev) {
518-
if (!ev->Get()->Success) {
530+
if (!ev->Get()->Success) {
519531
LOG_ROW_DISPATCHER_ERROR("Failed to get TEvLookupResult, try later...");
520532
ScheduleNodeInfoRequest();
521533
return;
@@ -526,7 +538,13 @@ void TActorCoordinator::Handle(NKikimr::TEvTenantNodeEnumerator::TEvLookupResult
526538
}
527539

528540
void TActorCoordinator::Handle(TEvPrivate::TEvListNodes::TPtr&) {
529-
Register(NKikimr::CreateTenantNodeEnumerationLookup(SelfId(), Tenant));
541+
if (NodesManagerId) {
542+
LOG_ROW_DISPATCHER_DEBUG("Send TEvGetNodesRequest to NodesManager");
543+
Send(NodesManagerId, new NFq::TEvNodesManager::TEvGetNodesRequest(), IEventHandle::FlagTrackDelivery);
544+
} else {
545+
LOG_ROW_DISPATCHER_DEBUG("Send NodeEnumerationLookup request");
546+
Register(NKikimr::CreateTenantNodeEnumerationLookup(SelfId(), Tenant));
547+
}
530548
}
531549

532550
bool TActorCoordinator::IsReady() const {
@@ -550,6 +568,15 @@ void TActorCoordinator::ScheduleNodeInfoRequest() const {
550568
Schedule(NodesManagerRetryPeriod, new TEvPrivate::TEvListNodes());
551569
}
552570

571+
void TActorCoordinator::Handle(NFq::TEvNodesManager::TEvGetNodesResponse::TPtr& ev) {
572+
NodesCount = ev->Get()->NodeIds.size();
573+
LOG_ROW_DISPATCHER_INFO("Updated node info, node count: " << NodesCount);
574+
if (!NodesCount) {
575+
ScheduleNodeInfoRequest();
576+
}
577+
UpdatePendingReadActors();
578+
}
579+
553580
} // namespace
554581

555582
////////////////////////////////////////////////////////////////////////////////
@@ -558,9 +585,10 @@ std::unique_ptr<NActors::IActor> NewCoordinator(
558585
NActors::TActorId rowDispatcherId,
559586
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
560587
const TString& tenant,
561-
const ::NMonitoring::TDynamicCounterPtr& counters)
588+
const ::NMonitoring::TDynamicCounterPtr& counters,
589+
NActors::TActorId nodesManagerId)
562590
{
563-
return std::unique_ptr<NActors::IActor>(new TActorCoordinator(rowDispatcherId, config, tenant, counters));
591+
return std::unique_ptr<NActors::IActor>(new TActorCoordinator(rowDispatcherId, config, tenant, counters, nodesManagerId));
564592
}
565593

566594
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/coordinator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ std::unique_ptr<NActors::IActor> NewCoordinator(
1515
NActors::TActorId rowDispatcherId,
1616
const NKikimrConfig::TSharedReadingConfig_TCoordinatorConfig& config,
1717
const TString& tenant,
18-
const ::NMonitoring::TDynamicCounterPtr& counters);
18+
const ::NMonitoring::TDynamicCounterPtr& counters,
19+
NActors::TActorId nodesManagerId);
1920

2021
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class TTopicFilters : public ITopicFilters {
3030
, Counters_(std::move(counters))
3131
{}
3232

33-
void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) override {
33+
void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) override {
3434
LOG_ROW_DISPATCHER_TRACE("ProcessData for " << RunHandlers_.size() << " clients, number rows: " << numberRows);
3535

3636
if (!numberRows) {
@@ -220,18 +220,18 @@ class TTopicFilters : public ITopicFilters {
220220
RunHandlers_.erase(iter);
221221
}
222222

223-
void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector<ui64>& /* offsets */, const TVector<ui64>& columnIndex, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) {
223+
void PushToRunner(IProgramRunHandler::TPtr programRunHandler, const TVector<ui64>& /* offsets */, const TVector<ui64>& columnIndex, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) {
224224
const auto consumer = programRunHandler->GetConsumer();
225225
const auto& columnIds = consumer->GetColumnIds();
226226

227-
TVector<const TVector<NYql::NUdf::TUnboxedValue>*> result;
227+
TVector<std::span<NYql::NUdf::TUnboxedValue>> result;
228228
result.reserve(columnIds.size());
229229
for (ui64 columnId : columnIds) {
230230
Y_ENSURE(columnId < columnIndex.size(), "Unexpected column id " << columnId << ", it is larger than index array size " << columnIndex.size());
231231
const ui64 index = columnIndex[columnId];
232232

233233
Y_ENSURE(index < values.size(), "Unexpected column index " << index << ", it is larger than values array size " << values.size());
234-
if (const auto value = values[index]) {
234+
if (const auto value = values[index]; !value.empty()) {
235235
result.emplace_back(value);
236236
} else {
237237
LOG_ROW_DISPATCHER_TRACE("Ignore processing for " << consumer->GetClientId() << ", client got parsing error for column " << columnId);

ydb/core/fq/libs/row_dispatcher/format_handler/filters/filters_set.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class ITopicFilters : public TThrRefBase, public TNonCopyable {
1515

1616
public:
1717
// columnIndex - mapping from stable column id to index in values array
18-
virtual void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) = 0;
18+
virtual void ProcessData(const TVector<ui64>& columnIndex, const TVector<ui64>& offsets, const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) = 0;
1919
virtual void OnCompileResponse(TEvRowDispatcher::TEvPurecalcCompileResponse::TPtr& ev) = 0;
2020

2121
virtual TStatus AddPrograms(IProcessedDataConsumer::TPtr consumer, std::unordered_map<TString, IProgramHolder::TPtr> programHolders) = 0;

ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ NYT::TNode MakeWatermarkOutputSchema() {
7575
}
7676

7777
struct TInputType {
78-
const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& Values;
78+
const TVector<std::span<NYql::NUdf::TUnboxedValue>>& Values;
7979
ui64 NumberRows;
8080
};
8181

@@ -152,8 +152,9 @@ class TInputConsumer : public NYql::NPureCalc::IConsumer<TInputType> {
152152

153153
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(rowId);
154154

155-
for (ui64 fieldId = 0; const auto column : input.Values) {
156-
items[FieldsPositions[fieldId++]] = column->at(rowId);
155+
for (ui64 fieldId = 0; const auto& column : input.Values) {
156+
Y_DEBUG_ABORT_UNLESS(column.size() > rowId);
157+
items[FieldsPositions[fieldId++]] = column[rowId];
157158
}
158159

159160
Worker->Push(std::move(result));
@@ -415,7 +416,7 @@ class TProgramRunHandler final : public IProgramRunHandler, public TNonCopyable
415416
ActiveFilters_->Dec();
416417
}
417418

418-
void ProcessData(const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) const override {
419+
void ProcessData(const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) const override {
419420
LOG_ROW_DISPATCHER_TRACE("ProcessData for " << numberRows << " rows");
420421

421422
if (!ProgramHolder_) {

ydb/core/fq/libs/row_dispatcher/format_handler/filters/purecalc_filter.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ class IProgramRunHandler : public TThrRefBase {
7373
return ProgramHolder_;
7474
}
7575

76-
virtual void ProcessData(const TVector<const TVector<NYql::NUdf::TUnboxedValue>*>& values, ui64 numberRows) const = 0;
76+
virtual void ProcessData(const TVector<std::span<NYql::NUdf::TUnboxedValue>>& values, ui64 numberRows) const = 0;
7777

7878
protected:
7979
TString Name_;

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
7979
void OnParsedData(ui64 numberRows) override {
8080
LOG_ROW_DISPATCHER_TRACE("Got parsed data, number rows: " << numberRows);
8181

82-
Self.ParsedData.assign(ParerSchema.size(), nullptr);
82+
Self.ParsedData.assign(ParerSchema.size(), std::span<NYql::NUdf::TUnboxedValue>());
8383
for (size_t i = 0; i < ParerSchema.size(); ++i) {
8484
auto columnStatus = Self.Parser->GetParsedColumn(i);
8585
if (Y_LIKELY(columnStatus.IsSuccess())) {
@@ -261,8 +261,11 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
261261
};
262262

263263
for (size_t i = 0; const ui64 columnId : ColumnsIds) {
264+
auto& parsedData = Self.ParsedData[Self.ParserSchemaIndex[columnId]];
265+
Y_DEBUG_ABORT_UNLESS(parsedData.size() > rowId);
266+
264267
// All data was locked in parser, so copy is safe
265-
FilteredRow[i++] = Self.ParsedData[Self.ParserSchemaIndex[columnId]]->at(rowId);
268+
FilteredRow[i++] = parsedData[rowId];
266269
}
267270
DataPacker->AddWideItem(FilteredRow.data(), FilteredRow.size());
268271

@@ -653,7 +656,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
653656

654657
// Parsed data
655658
const TVector<ui64>* Offsets;
656-
TVector<const TVector<NYql::NUdf::TUnboxedValue>*> ParsedData;
659+
TVector<std::span<NYql::NUdf::TUnboxedValue>> ParsedData;
657660
bool RefreshScheduled = false;
658661

659662
// Metrics

0 commit comments

Comments
 (0)