@@ -56,6 +56,17 @@ namespace NFq {
5656
5757using namespace NKikimr ;
5858
59+ NYdb::NTopic::TTopicClientSettings GetCommonTopicClientSettings (const NFq::NConfig::TCommonConfig& config) {
60+ NYdb::NTopic::TTopicClientSettings settings;
61+ if (config.GetTopicClientHandlersExecutorThreadsNum ()) {
62+ settings.DefaultHandlersExecutor (NYdb::NTopic::CreateThreadPoolExecutor (config.GetTopicClientHandlersExecutorThreadsNum ()));
63+ }
64+ if (config.GetTopicClientCompressionExecutorThreadsNum ()) {
65+ settings.DefaultCompressionExecutor (NYdb::NTopic::CreateThreadPoolExecutor (config.GetTopicClientCompressionExecutorThreadsNum ()));
66+ }
67+ return settings;
68+ }
69+
5970void Init (
6071 const NFq::NConfig::TConfig& protoConfig,
6172 ui32 nodeId,
@@ -67,7 +78,7 @@ void Init(
6778 const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
6879 ui32 icPort,
6980 const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories,
70- NYql::IPqGateway ::TPtr defaultPqGateway
81+ NYql::IPqGatewayFactory ::TPtr pqGatewayFactory
7182 )
7283{
7384 Y_ABORT_UNLESS (iyqSharedResources, " No YQ shared resources created" );
@@ -190,22 +201,26 @@ void Init(
190201 credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory (tokenAccessorConfig.GetEndpoint (), tokenAccessorConfig.GetUseSsl (), caContent, tokenAccessorConfig.GetConnectionPoolSize ());
191202 }
192203
204+ auto commonTopicClientSettings = GetCommonTopicClientSettings (protoConfig.GetCommon ());
205+
193206 if (protoConfig.GetRowDispatcher ().GetEnabled ()) {
194207 NYql::TPqGatewayServices pqServices (
195208 yqSharedResources->UserSpaceYdbDriver ,
196209 nullptr ,
197210 nullptr ,
198211 std::make_shared<NYql::TPqGatewayConfig>(),
199- nullptr );
200-
212+ nullptr ,
213+ nullptr ,
214+ commonTopicClientSettings
215+ );
201216 auto rowDispatcher = NFq::NewRowDispatcherService (
202217 protoConfig.GetRowDispatcher (),
203218 NKikimr::CreateYdbCredentialsProviderFactory,
204219 yqSharedResources,
205220 credentialsFactory,
206221 tenant,
207222 yqCounters->GetSubgroup (" subsystem" , " row_dispatcher" ),
208- defaultPqGateway ? defaultPqGateway : CreatePqNativeGateway (pqServices),
223+ pqGatewayFactory ? pqGatewayFactory-> CreatePqGateway () : CreatePqNativeGateway (pqServices),
209224 appData->Mon ,
210225 appData->Counters );
211226 actorRegistrator (NFq::RowDispatcherServiceActorId (), rowDispatcher.release ());
@@ -224,9 +239,11 @@ void Init(
224239 pqCmConnections,
225240 credentialsFactory,
226241 std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways ().GetPq ()),
227- appData->FunctionRegistry
242+ appData->FunctionRegistry ,
243+ nullptr ,
244+ commonTopicClientSettings
228245 );
229- auto pqGateway = defaultPqGateway ? defaultPqGateway : NYql::CreatePqNativeGateway (std::move (pqServices));
246+ auto pqGateway = pqGatewayFactory ? pqGatewayFactory-> CreatePqGateway () : NYql::CreatePqNativeGateway (std::move (pqServices));
230247 RegisterDqPqReadActorFactory (*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver , credentialsFactory, pqGateway,
231248 yqCounters->GetSubgroup (" subsystem" , " DqSourceTracker" ), protoConfig.GetCommon ().GetPqReconnectPeriod ());
232249
@@ -330,6 +347,15 @@ void Init(
330347 }
331348
332349 if (protoConfig.GetPendingFetcher ().GetEnabled ()) {
350+ NYql::TPqGatewayServices pqServices (
351+ yqSharedResources->UserSpaceYdbDriver ,
352+ pqCmConnections,
353+ credentialsFactory,
354+ std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways ().GetPq ()),
355+ appData->FunctionRegistry ,
356+ nullptr ,
357+ commonTopicClientSettings
358+ );
333359 auto fetcher = CreatePendingFetcher (
334360 yqSharedResources,
335361 NKikimr::CreateYdbCredentialsProviderFactory,
@@ -347,7 +373,7 @@ void Init(
347373 tenant,
348374 appData->Mon ,
349375 s3ActorsFactory,
350- defaultPqGateway
376+ pqGatewayFactory ? pqGatewayFactory : NYql::CreatePqNativeGatewayFactory (pqServices)
351377 );
352378
353379 actorRegistrator (MakePendingFetcherId (nodeId), fetcher);
0 commit comments