Skip to content

Commit 84c50dc

Browse files
authored
merge to stable-25-3 YQ-4312 streaming in ydb fixes and features (#25898)
2 parents f701ab5 + 7ad155a commit 84c50dc

File tree

21 files changed

+861
-383
lines changed

21 files changed

+861
-383
lines changed

ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ class TTopicFormatHandler : public NActors::TActor<TTopicFormatHandler>, public
437437

438438
if (const auto clientOffset = client->GetNextMessageOffset()) {
439439
if (Parser && CurrentOffset && *CurrentOffset > *clientOffset) {
440-
LOG_ROW_DISPATCHER_DEBUG("Parser was flushed due to new historical offset " << *clientOffset << "(previous parser offset: " << *CurrentOffset << ")");
440+
LOG_ROW_DISPATCHER_DEBUG("Parser was flushed due to new historical offset " << *clientOffset << " (previous parser offset: " << *CurrentOffset << ")");
441441
Parser->Refresh(true);
442442
}
443443
}

ydb/core/fq/libs/row_dispatcher/leader_election.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@
99
#include <ydb/library/actors/core/actor_bootstrapped.h>
1010
#include <ydb/library/actors/core/hfunc.h>
1111
#include <ydb/library/actors/protos/actors.pb.h>
12+
#include <ydb/library/logger/actor.h>
1213

1314
#include <ydb/core/base/path.h>
1415
#include <ydb/core/protos/config.pb.h>
1516

17+
#include <memory>
18+
1619
namespace NFq {
1720

1821
using namespace NActors;
@@ -83,7 +86,11 @@ struct TLeaderElectionMetrics {
8386
::NMonitoring::TDynamicCounters::TCounterPtr LeaderChanged;
8487
};
8588

86-
class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
89+
struct TActorSystemPtrMixin {
90+
NKikimr::TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr = std::make_shared<NKikimr::TDeferredActorLogBackend::TAtomicActorSystemPtr>(nullptr);
91+
};
92+
93+
class TLeaderElection: public TActorBootstrapped<TLeaderElection>, public TActorSystemPtrMixin {
8794

8895
enum class EState {
8996
Init,
@@ -93,8 +100,8 @@ class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
93100
Started
94101
};
95102
NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig Config;
96-
const NKikimr::TYdbCredentialsProviderFactory& CredentialsProviderFactory;
97-
NYdb::TDriver Driver;
103+
NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
104+
std::unique_ptr<NYdb::TDriver> Driver;
98105
TYdbConnectionPtr YdbConnection;
99106
TString TablePathPrefix;
100107
const TString TenantId;
@@ -165,20 +172,19 @@ class TLeaderElection: public TActorBootstrapped<TLeaderElection> {
165172
void ProcessState();
166173
void ResetState();
167174
void SetTimeout();
175+
NYdb::TDriverConfig GetYdbDriverConfig() const;
168176
};
169177

170178
TLeaderElection::TLeaderElection(
171179
NActors::TActorId parentId,
172180
NActors::TActorId coordinatorId,
173181
const NKikimrConfig::TSharedReadingConfig::TCoordinatorConfig& config,
174182
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
175-
NYdb::TDriver driver,
183+
NYdb::TDriver /*driver*/,
176184
const TString& tenant,
177185
const ::NMonitoring::TDynamicCounterPtr& counters)
178186
: Config(config)
179187
, CredentialsProviderFactory(credentialsProviderFactory)
180-
, Driver(driver)
181-
, YdbConnection(config.GetLocalMode() ? nullptr : NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, Driver))
182188
, TablePathPrefix(JoinPath(config.GetDatabase().GetDatabase(), config.GetCoordinationNodePath()))
183189
, TenantId(JoinSeq(":", NKikimr::SplitPath(tenant)))
184190
, CoordinationNodePath(JoinPath(TablePathPrefix, TenantId))
@@ -218,13 +224,19 @@ TYdbSdkRetryPolicy::TPtr MakeSchemaRetryPolicy() {
218224

219225
void TLeaderElection::Bootstrap() {
220226
Become(&TLeaderElection::StateFunc);
227+
Y_ABORT_UNLESS(!ActorSystemPtr->load(std::memory_order_relaxed), "Double ActorSystemPtr init");
228+
ActorSystemPtr->store(TActivationContext::ActorSystem(), std::memory_order_relaxed);
229+
221230
LogPrefix = "TLeaderElection " + SelfId().ToString() + " ";
222231
LOG_ROW_DISPATCHER_DEBUG("Successfully bootstrapped, local coordinator id " << CoordinatorId.ToString()
223232
<< ", tenant id " << TenantId << ", local mode " << Config.GetLocalMode() << ", coordination node path " << CoordinationNodePath);
224233
if (Config.GetLocalMode()) {
225234
TActivationContext::ActorSystem()->Send(ParentId, new NFq::TEvRowDispatcher::TEvCoordinatorChanged(CoordinatorId, 0));
226235
return;
227236
}
237+
238+
Driver = std::make_unique<NYdb::TDriver>(GetYdbDriverConfig());
239+
YdbConnection = NewYdbConnection(Config.GetDatabase(), CredentialsProviderFactory, *Driver);
228240
ProcessState();
229241
}
230242

@@ -469,6 +481,13 @@ void TLeaderElection::HandleException(const std::exception& e) {
469481
ResetState();
470482
}
471483

484+
NYdb::TDriverConfig TLeaderElection::GetYdbDriverConfig() const {
485+
NYdb::TDriverConfig cfg;
486+
cfg.SetDiscoveryMode(NYdb::EDiscoveryMode::Async);
487+
cfg.SetLog(std::make_unique<NKikimr::TDeferredActorLogBackend>(ActorSystemPtr, NKikimrServices::EServiceKikimr::YDB_SDK));
488+
return cfg;
489+
}
490+
472491
} // anonymous namespace
473492

474493
////////////////////////////////////////////////////////////////////////////////

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@
55
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
66
#include <ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h>
77
#include <ydb/core/protos/config.pb.h>
8-
98
#include <ydb/library/actors/core/actor_bootstrapped.h>
109
#include <ydb/library/actors/core/hfunc.h>
1110
#include <ydb/library/yql/dq/actors/dq.h>
12-
11+
#include <ydb/library/yql/providers/pq/common/pq_events_processor.h>
1312
#include <ydb/public/sdk/cpp/adapters/issue/issue.h>
14-
1513
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
1614

1715
#include <util/generic/queue.h>
@@ -62,6 +60,7 @@ struct TEvPrivate {
6260
EvCreateSession,
6361
EvSendStatistic,
6462
EvReconnectSession,
63+
EvExecuteTopicEvent,
6564
EvEnd
6665
};
6766
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
@@ -71,13 +70,16 @@ struct TEvPrivate {
7170
struct TEvCreateSession : public TEventLocal<TEvCreateSession, EvCreateSession> {};
7271
struct TEvSendStatistic : public TEventLocal<TEvSendStatistic, EvSendStatistic> {};
7372
struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {};
73+
struct TEvExecuteTopicEvent : public NYql::TTopicEventBase<TEvExecuteTopicEvent, EvExecuteTopicEvent> {
74+
using TTopicEventBase::TTopicEventBase;
75+
};
7476
};
7577

7678
constexpr ui64 SendStatisticPeriodSec = 2;
7779
constexpr ui64 MaxHandledEventsCount = 1000;
7880
constexpr ui64 MaxHandledEventsSize = 1000000;
7981

80-
class TTopicSession : public TActorBootstrapped<TTopicSession> {
82+
class TTopicSession : public TActorBootstrapped<TTopicSession>, NYql::TTopicEventProcessor<TEvPrivate::TEvExecuteTopicEvent> {
8183
private:
8284
using TBase = TActorBootstrapped<TTopicSession>;
8385

@@ -315,7 +317,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
315317
[[maybe_unused]] static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_SESSION";
316318

317319
private:
318-
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl) const;
320+
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl);
319321
NYql::ITopicClient& GetTopicClient(bool useSsl);
320322
NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const TString& consumerName) const;
321323
void CreateTopicSession();
@@ -351,6 +353,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
351353
private:
352354

353355
STRICT_STFUNC_EXC(StateFunc,
356+
hFunc(TEvPrivate::TEvExecuteTopicEvent, HandleTopicEvent);
354357
hFunc(NFq::TEvPrivate::TEvPqEventsReady, Handle);
355358
hFunc(NFq::TEvPrivate::TEvCreateSession, Handle);
356359
hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
@@ -364,6 +367,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
364367

365368
STRICT_STFUNC_EXC(ErrorState,
366369
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
370+
hFunc(TEvPrivate::TEvExecuteTopicEvent, HandleTopicEvent);
367371
IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady);
368372
IgnoreFunc(NFq::TEvPrivate::TEvCreateSession);
369373
IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch);
@@ -427,6 +431,7 @@ void TTopicSession::PassAway() {
427431

428432
void TTopicSession::SubscribeOnNextEvent() {
429433
if (!ReadSession || IsWaitingEvents) {
434+
LOG_ROW_DISPATCHER_TRACE("Skip SubscribeOnNextEvent, has ReadSession: " << (ReadSession ? "true" : "false") << ", IsWaitingEvents: " << IsWaitingEvents);
430435
return;
431436
}
432437

@@ -445,12 +450,16 @@ void TTopicSession::SubscribeOnNextEvent() {
445450
});
446451
}
447452

448-
NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) const {
449-
return PqGateway->GetTopicClientSettings()
450-
.Database(Database)
453+
NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) {
454+
auto opts = PqGateway->GetTopicClientSettings();
455+
SetupTopicClientSettings(ActorContext().ActorSystem(), SelfId(), opts);
456+
457+
opts.Database(Database)
451458
.DiscoveryEndpoint(Endpoint)
452459
.SslCredentials(NYdb::TSslCredentials(useSsl))
453460
.CredentialsProviderFactory(CredentialsProviderFactory);
461+
462+
return opts;
454463
}
455464

456465
NYql::ITopicClient& TTopicSession::GetTopicClient(bool useSsl) {
@@ -982,7 +991,7 @@ void TTopicSession::RefreshParsers() {
982991
}
983992
}
984993

985-
} // anonymous namespace
994+
} // anonymous namespace
986995

987996
////////////////////////////////////////////////////////////////////////////////
988997

@@ -1005,4 +1014,4 @@ std::unique_ptr<IActor> NewTopicSession(
10051014
return std::unique_ptr<IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, functionRegistry, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize));
10061015
}
10071016

1008-
} // namespace NFq
1017+
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ constexpr ui64 TimeoutBeforeStartSessionSec = 3;
3030
constexpr ui64 GrabTimeoutSec = 4 * TimeoutBeforeStartSessionSec;
3131
static_assert(GrabTimeoutSec <= WAIT_TIMEOUT.Seconds());
3232

33-
template<bool MockTopicSession>
33+
template <bool MockTopicSession>
3434
class TFixture : public NTests::TBaseFixture {
3535
public:
3636
using TBase = NTests::TBaseFixture;
@@ -69,7 +69,7 @@ class TFixture : public NTests::TBaseFixture {
6969
CompileNotifier = Runtime.AllocateEdgeActor();
7070
const auto compileServiceActorId = Runtime.Register(CreatePurecalcCompileServiceMock(CompileNotifier));
7171

72-
if (MockTopicSession) {
72+
if constexpr (MockTopicSession) {
7373
PqGatewayNotifier = Runtime.AllocateEdgeActor();
7474
MockPqGateway = CreateMockPqGateway({.Runtime = &Runtime, .Notifier = PqGatewayNotifier});
7575
}
@@ -115,9 +115,10 @@ class TFixture : public NTests::TBaseFixture {
115115
UNIT_ASSERT_C(ping, "Compilation is not performed for predicate: " << predicate);
116116
}
117117

118-
if (MockTopicSession) {
118+
if constexpr (MockTopicSession) {
119119
Runtime.GrabEdgeEvent<TEvMockPqEvents::TEvCreateSession>(PqGatewayNotifier, TDuration::Seconds(GrabTimeoutSec));
120-
MockPqGateway->AddEvent(TopicPath, NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent(nullptr, 0, 0), 0);
120+
MockReadSession = MockPqGateway->ExtractReadSession(TopicPath);
121+
MockReadSession->AddStartSessionEvent();
121122
}
122123
}
123124

@@ -223,40 +224,18 @@ class TFixture : public NTests::TBaseFixture {
223224
return TRow().AddUint64(100 * index).AddString(TStringBuilder() << "value" << index);
224225
}
225226

226-
using TMessageInformation = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessageInformation;
227-
using TMessage = NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage;
228-
229-
TMessageInformation MakeNextMessageInformation(size_t offset, size_t uncompressedSize) {
230-
auto now = TInstant::Now();
231-
TMessageInformation msgInfo(
232-
offset,
233-
"ProducerId",
234-
0,
235-
now,
236-
now,
237-
MakeIntrusive<NYdb::NTopic::TWriteSessionMeta>(),
238-
MakeIntrusive<NYdb::NTopic::TMessageMeta>(),
239-
uncompressedSize,
240-
"messageGroupId"
241-
);
242-
return msgInfo;
243-
}
244-
245227
void PQWrite(
246228
const std::vector<TString>& sequence,
247229
ui64 firstMessageOffset = 0) {
248-
if (!MockTopicSession) {
230+
if constexpr (!MockTopicSession) {
249231
NYql::NDq::PQWrite(sequence, TopicPath, GetDefaultPqEndpoint());
250232
} else {
251233
ui64 offset = firstMessageOffset;
252-
TVector<TMessage> msgs;
253-
size_t size = 0;
234+
TVector<IMockPqReadSession::TMessage> msgs;
254235
for (const auto& s : sequence) {
255-
TMessage msg(s, nullptr, MakeNextMessageInformation(offset++, s.size()), CreatePartitionSession());
256-
msgs.emplace_back(msg);
257-
size += s.size();
236+
msgs.push_back({.Offset = offset++, .Data = s});
258237
}
259-
MockPqGateway->AddEvent(TopicPath, NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent(msgs, {}, CreatePartitionSession()), size);
238+
MockReadSession->AddDataReceivedEvent(msgs);
260239
}
261240
}
262241

@@ -280,6 +259,7 @@ class TFixture : public NTests::TBaseFixture {
280259
ui32 PartitionId = 0;
281260
NKikimrConfig::TSharedReadingConfig Config;
282261
TIntrusivePtr<IMockPqGateway> MockPqGateway;
262+
IMockPqReadSession::TPtr MockReadSession;
283263

284264
const TString Json1 = "{\"dt\":100,\"value\":\"value1\"}";
285265
const TString Json2 = "{\"dt\":200,\"value\":\"value2\"}";
@@ -628,7 +608,8 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) {
628608

629609
StopSession(ReadActorId2, source);
630610
Runtime.GrabEdgeEvent<TEvMockPqEvents::TEvCreateSession>(PqGatewayNotifier, TDuration::Seconds(GrabTimeoutSec));
631-
MockPqGateway->AddEvent(TopicPath, NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent(nullptr, 0, 0), 0);
611+
MockReadSession = MockPqGateway->ExtractReadSession(TopicPath);
612+
MockReadSession->AddStartSessionEvent();
632613

633614
std::vector<TString> data3 = { Json4 };
634615
PQWrite(data3, 4);

ydb/core/fq/libs/row_dispatcher/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/library/yql/dq/actors/common
3030
ydb/library/yql/dq/actors/compute
3131
ydb/library/yql/dq/proto
32+
ydb/library/yql/providers/pq/common
3233
ydb/library/yql/providers/pq/provider
3334

3435
ydb/public/sdk/cpp/adapters/issue

ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ namespace {
122122
return allTypes.contains(type);
123123
}
124124

125+
void IKqpFederatedQuerySetupFactory::Cleanup() {
126+
}
127+
125128
// TKqpFederatedQuerySetupFactoryDefault contains network clients and service actors necessary
126129
// for federated queries. HTTP Gateway (required by S3 provider) is run by default even without
127130
// explicit configuration. Token Accessor and Connector Client are run only if config is provided.
@@ -234,6 +237,11 @@ namespace {
234237
return result;
235238
}
236239

240+
void TKqpFederatedQuerySetupFactoryDefault::Cleanup() {
241+
HttpGateway.reset();
242+
PqGateway.Reset();
243+
}
244+
237245
IKqpFederatedQuerySetupFactory::TPtr MakeKqpFederatedQuerySetupFactory(
238246
NActors::TActorSystemSetup* setup,
239247
const NKikimr::TAppData* appData,

ydb/core/kqp/federated_query/kqp_federated_query_helpers.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ namespace NKikimr::NKqp {
6262

6363
struct IKqpFederatedQuerySetupFactory {
6464
using TPtr = std::shared_ptr<IKqpFederatedQuerySetupFactory>;
65+
virtual void Cleanup();
6566
virtual std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem* actorSystem) = 0;
6667
virtual ~IKqpFederatedQuerySetupFactory() = default;
6768
};
@@ -82,6 +83,8 @@ namespace NKikimr::NKqp {
8283

8384
std::optional<TKqpFederatedQuerySetup> Make(NActors::TActorSystem* actorSystem) override;
8485

86+
void Cleanup() override;
87+
8588
private:
8689
NYql::THttpGatewayConfig HttpGatewayConfig;
8790
NYql::IHTTPGateway::TPtr HttpGateway;
@@ -153,6 +156,11 @@ namespace NKikimr::NKqp {
153156
DqTaskTransformFactory, PqGatewayConfig, PqGateway, ActorSystemPtr, Driver};
154157
}
155158

159+
void Cleanup() override {
160+
HttpGateway.reset();
161+
PqGateway.Reset();
162+
}
163+
156164
private:
157165
NYql::IHTTPGateway::TPtr HttpGateway;
158166
NYql::NConnector::IClient::TPtr ConnectorClient;

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,13 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
165165
appConfig.MutableColumnShardConfig()->SetDisabledOnSchemeShard(false);
166166
appConfig.MutableTableServiceConfig()->SetEnableRowsDuplicationCheck(true);
167167
if (settings.EnableStorageProxy) {
168-
appConfig.MutableQueryServiceConfig()->MutableCheckpointsConfig()->SetEnabled(true);
169-
appConfig.MutableQueryServiceConfig()->MutableCheckpointsConfig()->SetCheckpointingPeriodMillis(200);
170-
appConfig.MutableQueryServiceConfig()->MutableCheckpointsConfig()->SetMaxInflight(1);
171-
appConfig.MutableQueryServiceConfig()->MutableCheckpointsConfig()->MutableExternalStorage()->SetEndpoint(GetEnv("YDB_ENDPOINT"));
172-
appConfig.MutableQueryServiceConfig()->MutableCheckpointsConfig()->MutableExternalStorage()->SetDatabase(GetEnv("YDB_DATABASE"));
168+
auto& checkpoints = *appConfig.MutableQueryServiceConfig()->MutableCheckpointsConfig();
169+
checkpoints.SetEnabled(true);
170+
checkpoints.SetCheckpointingPeriodMillis(settings.CheckpointPeriod.MilliSeconds());
171+
checkpoints.SetMaxInflight(1);
172+
checkpoints.MutableExternalStorage()->SetEndpoint(GetEnv("YDB_ENDPOINT"));
173+
checkpoints.MutableExternalStorage()->SetDatabase(GetEnv("YDB_DATABASE"));
174+
checkpoints.MutableCheckpointGarbageConfig()->SetEnabled(true);
173175
}
174176
if (!appConfig.GetQueryServiceConfig().HasAllExternalDataSourcesAreAvailable()) {
175177
appConfig.MutableQueryServiceConfig()->SetAllExternalDataSourcesAreAvailable(true);

0 commit comments

Comments
 (0)