Skip to content

Commit 7ad155a

Browse files
committed
YQ-4664 used AS threads in topic sdk IO operations (#25668)
1 parent 143da4c commit 7ad155a

File tree

6 files changed

+146
-49
lines changed

6 files changed

+146
-49
lines changed

ydb/core/fq/libs/row_dispatcher/topic_session.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@
55
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
66
#include <ydb/core/fq/libs/row_dispatcher/format_handler/format_handler.h>
77
#include <ydb/core/protos/config.pb.h>
8-
98
#include <ydb/library/actors/core/actor_bootstrapped.h>
109
#include <ydb/library/actors/core/hfunc.h>
1110
#include <ydb/library/yql/dq/actors/dq.h>
12-
11+
#include <ydb/library/yql/providers/pq/common/pq_events_processor.h>
1312
#include <ydb/public/sdk/cpp/adapters/issue/issue.h>
14-
1513
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
1614

1715
#include <util/generic/queue.h>
@@ -62,6 +60,7 @@ struct TEvPrivate {
6260
EvCreateSession,
6361
EvSendStatistic,
6462
EvReconnectSession,
63+
EvExecuteTopicEvent,
6564
EvEnd
6665
};
6766
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
@@ -71,13 +70,16 @@ struct TEvPrivate {
7170
struct TEvCreateSession : public TEventLocal<TEvCreateSession, EvCreateSession> {};
7271
struct TEvSendStatistic : public TEventLocal<TEvSendStatistic, EvSendStatistic> {};
7372
struct TEvReconnectSession : public TEventLocal<TEvReconnectSession, EvReconnectSession> {};
73+
struct TEvExecuteTopicEvent : public NYql::TTopicEventBase<TEvExecuteTopicEvent, EvExecuteTopicEvent> {
74+
using TTopicEventBase::TTopicEventBase;
75+
};
7476
};
7577

7678
constexpr ui64 SendStatisticPeriodSec = 2;
7779
constexpr ui64 MaxHandledEventsCount = 1000;
7880
constexpr ui64 MaxHandledEventsSize = 1000000;
7981

80-
class TTopicSession : public TActorBootstrapped<TTopicSession> {
82+
class TTopicSession : public TActorBootstrapped<TTopicSession>, NYql::TTopicEventProcessor<TEvPrivate::TEvExecuteTopicEvent> {
8183
private:
8284
using TBase = TActorBootstrapped<TTopicSession>;
8385

@@ -315,7 +317,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
315317
[[maybe_unused]] static constexpr char ActorName[] = "FQ_ROW_DISPATCHER_SESSION";
316318

317319
private:
318-
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl) const;
320+
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(bool useSsl);
319321
NYql::ITopicClient& GetTopicClient(bool useSsl);
320322
NYdb::NTopic::TReadSessionSettings GetReadSessionSettings(const TString& consumerName) const;
321323
void CreateTopicSession();
@@ -351,6 +353,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
351353
private:
352354

353355
STRICT_STFUNC_EXC(StateFunc,
356+
hFunc(TEvPrivate::TEvExecuteTopicEvent, HandleTopicEvent);
354357
hFunc(NFq::TEvPrivate::TEvPqEventsReady, Handle);
355358
hFunc(NFq::TEvPrivate::TEvCreateSession, Handle);
356359
hFunc(NFq::TEvPrivate::TEvSendStatistic, Handle);
@@ -364,6 +367,7 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
364367

365368
STRICT_STFUNC_EXC(ErrorState,
366369
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
370+
hFunc(TEvPrivate::TEvExecuteTopicEvent, HandleTopicEvent);
367371
IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady);
368372
IgnoreFunc(NFq::TEvPrivate::TEvCreateSession);
369373
IgnoreFunc(TEvRowDispatcher::TEvGetNextBatch);
@@ -446,12 +450,16 @@ void TTopicSession::SubscribeOnNextEvent() {
446450
});
447451
}
448452

449-
NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) const {
450-
return PqGateway->GetTopicClientSettings()
451-
.Database(Database)
453+
NYdb::NTopic::TTopicClientSettings TTopicSession::GetTopicClientSettings(bool useSsl) {
454+
auto opts = PqGateway->GetTopicClientSettings();
455+
SetupTopicClientSettings(ActorContext().ActorSystem(), SelfId(), opts);
456+
457+
opts.Database(Database)
452458
.DiscoveryEndpoint(Endpoint)
453459
.SslCredentials(NYdb::TSslCredentials(useSsl))
454460
.CredentialsProviderFactory(CredentialsProviderFactory);
461+
462+
return opts;
455463
}
456464

457465
NYql::ITopicClient& TTopicSession::GetTopicClient(bool useSsl) {
@@ -983,7 +991,7 @@ void TTopicSession::RefreshParsers() {
983991
}
984992
}
985993

986-
} // anonymous namespace
994+
} // anonymous namespace
987995

988996
////////////////////////////////////////////////////////////////////////////////
989997

@@ -1006,4 +1014,4 @@ std::unique_ptr<IActor> NewTopicSession(
10061014
return std::unique_ptr<IActor>(new TTopicSession(readGroup, topicPath, endpoint, database, config, functionRegistry, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize));
10071015
}
10081016

1009-
} // namespace NFq
1017+
} // namespace NFq

ydb/core/fq/libs/row_dispatcher/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929
ydb/library/yql/dq/actors/common
3030
ydb/library/yql/dq/actors/compute
3131
ydb/library/yql/dq/proto
32+
ydb/library/yql/providers/pq/common
3233
ydb/library/yql/providers/pq/provider
3334

3435
ydb/public/sdk/cpp/adapters/issue

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,44 @@
11
#include "dq_pq_read_actor.h"
2+
#include "dq_pq_meta_extractor.h"
3+
#include "dq_pq_rd_read_actor.h"
4+
#include "dq_pq_read_actor_base.h"
25
#include "probes.h"
36

7+
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
8+
#include <ydb/library/actors/core/actor.h>
9+
#include <ydb/library/actors/core/event_local.h>
10+
#include <ydb/library/actors/core/events.h>
11+
#include <ydb/library/actors/core/hfunc.h>
12+
#include <ydb/library/actors/core/log.h>
13+
#include <ydb/library/actors/log_backend/actor_log_backend.h>
414
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
515
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
616
#include <ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h>
717
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
818
#include <ydb/library/yql/dq/common/dq_common.h>
919
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
10-
11-
#include <yql/essentials/minikql/comp_nodes/mkql_saveload.h>
12-
#include <yql/essentials/minikql/mkql_alloc.h>
13-
#include <yql/essentials/minikql/mkql_string_util.h>
14-
#include <ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h>
15-
#include <ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h>
16-
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor_base.h>
20+
#include <ydb/library/yql/providers/pq/common/pq_events_processor.h>
1721
#include <ydb/library/yql/providers/pq/common/pq_meta_fields.h>
1822
#include <ydb/library/yql/providers/pq/common/pq_partition_key.h>
1923
#include <ydb/library/yql/providers/pq/proto/dq_io_state.pb.h>
20-
#include <yql/essentials/utils/log/log.h>
21-
#include <yql/essentials/utils/yql_panic.h>
22-
24+
#include <ydb/public/sdk/cpp/adapters/issue/issue.h>
2325
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h>
2426
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
2527
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
2628

27-
#include <ydb/library/actors/core/actor.h>
28-
#include <ydb/library/actors/core/event_local.h>
29-
#include <ydb/library/actors/core/events.h>
30-
#include <ydb/library/actors/core/hfunc.h>
31-
#include <ydb/library/actors/core/log.h>
32-
#include <ydb/library/actors/log_backend/actor_log_backend.h>
33-
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
34-
35-
#include <ydb/core/fq/libs/row_dispatcher/events/data_plane.h>
29+
#include <yql/essentials/minikql/comp_nodes/mkql_saveload.h>
30+
#include <yql/essentials/minikql/mkql_alloc.h>
31+
#include <yql/essentials/minikql/mkql_string_util.h>
32+
#include <yql/essentials/utils/log/log.h>
33+
#include <yql/essentials/utils/yql_panic.h>
3634

37-
#include <ydb/public/sdk/cpp/adapters/issue/issue.h>
35+
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
3836

3937
#include <util/generic/algorithm.h>
4038
#include <util/generic/hash.h>
4139
#include <util/generic/utility.h>
4240
#include <util/string/join.h>
4341

44-
4542
#include <queue>
4643
#include <variant>
4744

@@ -81,6 +78,7 @@ struct TEvPrivate {
8178
EvReconnectSession,
8279
EvReceivedClusters,
8380
EvDescribeTopicResult,
81+
EvExecuteTopicEvent,
8482

8583
EvEnd
8684
};
@@ -116,10 +114,14 @@ struct TEvPrivate {
116114
ui32 PartitionsCount;
117115
TMaybe<NYdb::TStatus> Status;
118116
};
117+
struct TEvExecuteTopicEvent : public TTopicEventBase<TEvExecuteTopicEvent, EvExecuteTopicEvent> {
118+
using TTopicEventBase::TTopicEventBase;
119+
};
119120
};
120121

121-
} // namespace
122-
class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
122+
} // anonymous namespace
123+
124+
class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase, TTopicEventProcessor<TEvPrivate::TEvExecuteTopicEvent> {
123125
static constexpr bool StaticDiscovery = true;
124126
struct TMetrics {
125127
TMetrics(
@@ -232,8 +234,10 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
232234
return opts;
233235
}
234236

235-
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(TClusterState& state) const {
237+
NYdb::NTopic::TTopicClientSettings GetTopicClientSettings(TClusterState& state) {
236238
NYdb::NTopic::TTopicClientSettings opts = PqGateway->GetTopicClientSettings();
239+
SetupTopicClientSettings(ActorContext().ActorSystem(), SelfId(), opts);
240+
237241
opts.Database(SourceParams.GetDatabase())
238242
.DiscoveryEndpoint(SourceParams.GetEndpoint())
239243
.SslCredentials(NYdb::TSslCredentials(SourceParams.GetUseSsl()))
@@ -330,6 +334,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
330334
hFunc(TEvPrivate::TEvReconnectSession, Handle);
331335
hFunc(TEvPrivate::TEvReceivedClusters, Handle);
332336
hFunc(TEvPrivate::TEvDescribeTopicResult, Handle);
337+
hFunc(TEvPrivate::TEvExecuteTopicEvent, HandleTopicEvent);
333338
)
334339

335340
void Handle(TEvPrivate::TEvSourceDataReady::TPtr& ev) {

ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
#include "dq_pq_write_actor.h"
22
#include "probes.h"
33

4+
#include <ydb/library/actors/core/actor.h>
5+
#include <ydb/library/actors/core/event_local.h>
6+
#include <ydb/library/actors/core/events.h>
7+
#include <ydb/library/actors/core/hfunc.h>
8+
#include <ydb/library/actors/core/log.h>
9+
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
410
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
511
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
612
#include <ydb/library/yql/dq/common/dq_common.h>
7-
#include <ydb/library/yql/dq/actors/compute/dq_checkpoints_states.h>
13+
#include <ydb/library/yql/providers/pq/common/pq_events_processor.h>
14+
#include <ydb/library/yql/providers/pq/proto/dq_io_state.pb.h>
15+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
16+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h>
17+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
818

9-
#include <yql/essentials/utils/log/log.h>
1019
#include <yql/essentials/minikql/comp_nodes/mkql_saveload.h>
1120
#include <yql/essentials/minikql/mkql_alloc.h>
1221
#include <yql/essentials/minikql/mkql_string_util.h>
13-
#include <ydb/library/yql/providers/pq/proto/dq_io_state.pb.h>
22+
#include <yql/essentials/utils/log/log.h>
1423
#include <yql/essentials/utils/yql_panic.h>
1524

16-
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
17-
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h>
18-
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
19-
20-
#include <ydb/library/actors/core/actor.h>
21-
#include <ydb/library/actors/core/event_local.h>
22-
#include <ydb/library/actors/core/events.h>
23-
#include <ydb/library/actors/core/hfunc.h>
24-
#include <ydb/library/actors/core/log.h>
2525
#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
2626

2727
#include <util/generic/algorithm.h>
@@ -78,6 +78,7 @@ struct TEvPrivate {
7878
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
7979

8080
EvPqEventsReady = EvBegin,
81+
EvExecuteTopicEvent,
8182

8283
EvEnd
8384
};
@@ -87,15 +88,19 @@ struct TEvPrivate {
8788
// Events
8889

8990
struct TEvPqEventsReady : public TEventLocal<TEvPqEventsReady, EvPqEventsReady> {};
91+
92+
struct TEvExecuteTopicEvent : public TTopicEventBase<TEvExecuteTopicEvent, EvExecuteTopicEvent> {
93+
using TTopicEventBase::TTopicEventBase;
94+
};
9095
};
9196

9297
TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) {
9398
return TStringBuilder() << "[Checkpoint " << checkpoint.GetGeneration() << "." << checkpoint.GetId() << "] ";
9499
}
95100

96-
} // namespace
101+
} // anonymous namespace
97102

98-
class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput {
103+
class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqComputeActorAsyncOutput, TTopicEventProcessor<TEvPrivate::TEvExecuteTopicEvent> {
99104
struct TMetrics {
100105
TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters)
101106
: TxId(std::visit([](auto arg) { return ToString(arg); }, txId))
@@ -282,6 +287,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
282287
private:
283288
STRICT_STFUNC(StateFunc,
284289
hFunc(TEvPrivate::TEvPqEventsReady, Handle);
290+
hFunc(TEvPrivate::TEvExecuteTopicEvent, HandleTopicEvent);
285291
)
286292

287293
void Handle(TEvPrivate::TEvPqEventsReady::TPtr&) {
@@ -329,8 +335,10 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
329335
return *FederatedTopicClient;
330336
}
331337

332-
NYdb::NFederatedTopic::TFederatedTopicClientSettings GetFederatedTopicClientSettings() const {
338+
NYdb::NFederatedTopic::TFederatedTopicClientSettings GetFederatedTopicClientSettings() {
333339
NYdb::NFederatedTopic::TFederatedTopicClientSettings opts = PqGateway->GetFederatedTopicClientSettings();
340+
SetupTopicClientSettings(ActorContext().ActorSystem(), SelfId(), opts);
341+
334342
opts.Database(SinkParams.GetDatabase())
335343
.DiscoveryEndpoint(SinkParams.GetEndpoint())
336344
.SslCredentials(NYdb::TSslCredentials(SinkParams.GetUseSsl()))
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#pragma once
2+
3+
#include <ydb/library/actors/core/actorid.h>
4+
#include <ydb/library/actors/core/actorsystem.h>
5+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
6+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/executor.h>
7+
8+
namespace NYql {
9+
10+
template <typename TEv, ui32 TEventType>
11+
class TTopicEventBase : public NActors::TEventLocal<TEv, TEventType> {
12+
public:
13+
explicit TTopicEventBase(NYdb::NTopic::IExecutor::TFunction&& f)
14+
: Function(std::move(f))
15+
{}
16+
17+
void Execute() {
18+
Function();
19+
}
20+
21+
private:
22+
NYdb::NTopic::IExecutor::TFunction Function;
23+
};
24+
25+
template <typename TTopicEvent>
26+
class TTopicEventProcessor {
27+
class TEventProxy final : public NYdb::NTopic::IExecutor {
28+
public:
29+
TEventProxy(NActors::TActorSystem* actorSystem, const NActors::TActorId& executerId)
30+
: ActorSystem(actorSystem)
31+
, ExecuterId(executerId)
32+
{
33+
Y_ENSURE(actorSystem);
34+
}
35+
36+
bool IsAsync() const final {
37+
return true;
38+
}
39+
40+
void Post(TFunction&& f) final {
41+
ActorSystem->Send(ExecuterId, new TTopicEvent(std::move(f)));
42+
}
43+
44+
private:
45+
void DoStart() final {
46+
}
47+
48+
private:
49+
NActors::TActorSystem* ActorSystem = nullptr;
50+
const NActors::TActorId ExecuterId;
51+
};
52+
53+
public:
54+
template <typename TSettings>
55+
void SetupTopicClientSettings(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, TSettings& settings) {
56+
if (!ExecuterProxy) {
57+
ExecuterProxy = MakeIntrusive<TEventProxy>(actorSystem, selfId);
58+
}
59+
60+
settings.DefaultHandlersExecutor(ExecuterProxy);
61+
settings.DefaultCompressionExecutor(ExecuterProxy);
62+
}
63+
64+
protected:
65+
void HandleTopicEvent(TTopicEvent::TPtr& event) {
66+
event->Get()->Execute();
67+
}
68+
69+
private:
70+
NYdb::NTopic::IExecutor::TPtr ExecuterProxy;
71+
};
72+
73+
} // namespace NYql

ydb/library/yql/providers/pq/common/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ SRCS(
77
)
88

99
PEERDIR(
10+
ydb/library/actors/core
11+
ydb/public/sdk/cpp/src/client/topic
1012
yql/essentials/public/types
1113
)
1214

0 commit comments

Comments
 (0)