33
44#include < util/datetime/base.h>
55
6+ #include < ydb/core/blob_depot/mon_main.h>
67#include < ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h>
78#include < ydb/tests/tools/fqrun/src/fq_runner.h>
89#include < ydb/tests/tools/kqprun/runlib/application.h>
910#include < ydb/tests/tools/kqprun/runlib/utils.h>
1011
12+ #ifdef PROFILE_MEMORY_ALLOCATIONS
13+ #include < library/cpp/lfalloc/alloc_profiler/profiler.h>
14+ #endif
15+
1116using namespace NKikimrRun ;
1217
1318namespace NFqRun {
@@ -156,6 +161,21 @@ class TMain : public TMainBase {
156161 }
157162 });
158163
164+ options.AddLongOption (" as-cfg" , " File with actor system config (TActorSystemConfig), use '-' for default" )
165+ .RequiredArgument (" file" )
166+ .DefaultValue (" ./configuration/as_config.conf" )
167+ .Handler1 ([this ](const NLastGetopt::TOptsParser* option) {
168+ const TString file (option->CurValOrDef ());
169+ if (file == " -" ) {
170+ return ;
171+ }
172+
173+ RunnerOptions.FqSettings .ActorSystemConfig = NKikimrConfig::TActorSystemConfig ();
174+ if (!google::protobuf::TextFormat::ParseFromString (LoadFile (file), &(*RunnerOptions.FqSettings .ActorSystemConfig ))) {
175+ ythrow yexception () << " Bad format of actor system configuration" ;
176+ }
177+ });
178+
159179 options.AddLongOption (" emulate-s3" , " Enable readings by s3 provider from files, `bucket` value in connection - path to folder with files" )
160180 .NoArgument ()
161181 .SetFlag (&RunnerOptions.FqSettings .EmulateS3 );
@@ -165,17 +185,27 @@ class TMain : public TMainBase {
165185 .Handler1 ([this ](const NLastGetopt::TOptsParser* option) {
166186 TStringBuf topicName, others;
167187 TStringBuf (option->CurVal ()).Split (' @' , topicName, others);
188+
168189 TStringBuf path, partitionCountStr;
169190 TStringBuf (others).Split (' :' , path, partitionCountStr);
170191 size_t partitionCount = !partitionCountStr.empty () ? FromString<size_t >(partitionCountStr) : 1 ;
192+ if (!partitionCount) {
193+ ythrow yexception () << " Topic partition count should be at least one" ;
194+ }
171195 if (topicName.empty () || path.empty ()) {
172- ythrow yexception () << " Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl ;
196+ ythrow yexception () << " Incorrect PQ file mapping, expected form topic@path[:partitions_count]" ;
173197 }
174198 if (!PqFilesMapping.emplace (topicName, NYql::TDummyTopic (" pq" , TString (topicName), TString (path), partitionCount)).second ) {
175199 ythrow yexception () << " Got duplicated topic name: " << topicName;
176200 }
177201 });
178202
203+ options.AddLongOption (" cancel-on-file-finish" , " Cancel emulate YDS topics when topic file finished" )
204+ .RequiredArgument (" topic" )
205+ .Handler1 ([this ](const NLastGetopt::TOptsParser* option) {
206+ TopicsSettings[option->CurVal ()].CancelOnFileFinish = true ;
207+ });
208+
179209 // Outputs
180210
181211 options.AddLongOption (" result-file" , " File with query results (use '-' to write in stdout)" )
@@ -210,6 +240,7 @@ class TMain : public TMainBase {
210240 ExecutionOptions.Validate (RunnerOptions);
211241
212242 RunnerOptions.FqSettings .YqlToken = GetEnv (YQL_TOKEN_VARIABLE);
243+ RunnerOptions.FqSettings .FunctionRegistry = CreateFunctionRegistry ().Get ();
213244
214245 auto & gatewayConfig = *RunnerOptions.FqSettings .FqConfig .mutable_gateways ();
215246 FillTokens (gatewayConfig.mutable_pq ());
@@ -224,14 +255,39 @@ class TMain : public TMainBase {
224255
225256 if (!PqFilesMapping.empty ()) {
226257 auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>();
227- for (const auto & [_, topic] : PqFilesMapping) {
258+ for (auto [_, topic] : PqFilesMapping) {
259+ if (const auto it = TopicsSettings.find (topic.TopicName ); it != TopicsSettings.end ()) {
260+ topic.CancelOnFileFinish = it->second .CancelOnFileFinish ;
261+ TopicsSettings.erase (it);
262+ }
228263 fileGateway->AddDummyTopic (topic);
229264 }
230265 RunnerOptions.FqSettings .PqGateway = std::move (fileGateway);
231266 }
267+ if (!TopicsSettings.empty ()) {
268+ ythrow yexception () << " Found topic settings for not existing topic: '" << TopicsSettings.begin ()->first << " '" ;
269+ }
270+
271+ #ifdef PROFILE_MEMORY_ALLOCATIONS
272+ if (RunnerOptions.FqSettings .VerboseLevel >= EVerbose::Info) {
273+ Cout << CoutColors.Cyan () << " Starting profile memory allocations" << CoutColors.Default () << Endl;
274+ }
275+ NAllocProfiler::StartAllocationSampling (true );
276+ #else
277+ if (ProfileAllocationsOutput) {
278+ ythrow yexception () << " Profile memory allocations disabled, please rebuild fqrun with flag `-D PROFILE_MEMORY_ALLOCATIONS`" ;
279+ }
280+ #endif
232281
233282 RunScript (ExecutionOptions, RunnerOptions);
234283
284+ #ifdef PROFILE_MEMORY_ALLOCATIONS
285+ if (RunnerOptions.FqSettings .VerboseLevel >= EVerbose::Info) {
286+ Cout << CoutColors.Cyan () << " Finishing profile memory allocations" << CoutColors.Default () << Endl;
287+ }
288+ FinishProfileMemoryAllocations ();
289+ #endif
290+
235291 return 0 ;
236292 }
237293
@@ -249,6 +305,11 @@ class TMain : public TMainBase {
249305 TExecutionOptions ExecutionOptions;
250306 TRunnerOptions RunnerOptions;
251307 std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;
308+
309+ struct TTopicSettings {
310+ bool CancelOnFileFinish = false ;
311+ };
312+ std::unordered_map<TString, TTopicSettings> TopicsSettings;
252313};
253314
254315} // anonymous namespace
0 commit comments