7
7
namespace NYql {
8
8
9
9
namespace {
10
- class TLoader : std::enable_shared_from_this<TLoader> {
11
- public:
12
- TLoader ()
13
- : Alloc_(__LOCATION__)
14
- , Env_(Alloc_)
15
- {
16
- Alloc_.Release ();
17
- }
10
+ class TLoader : std::enable_shared_from_this<TLoader> {
11
+ public:
12
+ TLoader ()
13
+ : Alloc_(__LOCATION__)
14
+ , Env_(Alloc_)
15
+ {
16
+ Alloc_.Release ();
17
+ }
18
18
19
- void Init (const TString& serialized,
20
- const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
21
- const NKikimr::NMiniKQL::TComputationNodeFactory& nodeFactory,
22
- TLangVersion langver) {
23
- TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard (Alloc_);
24
- Pgm_ = NKikimr::NMiniKQL::DeserializeRuntimeNode (serialized, Env_);
25
- auto pgmTop = AS_CALLABLE (" BlockAsTuple" , Pgm_);
26
- MKQL_ENSURE (pgmTop->GetInputsCount () == 2 , " Expected tuple of 2 items" );
27
- auto argsNode = pgmTop->GetInput (0 );
28
- MKQL_ENSURE (!argsNode.IsImmediate () && argsNode.GetNode ()->GetType ()->IsCallable (), " Expected callable" );
29
- auto argsCallable = static_cast <NKikimr::NMiniKQL::TCallable*>(argsNode.GetNode ());
19
+ void Init (const TString& serialized,
20
+ const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
21
+ const NKikimr::NMiniKQL::TComputationNodeFactory& nodeFactory,
22
+ TLangVersion langver) {
23
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> allocGuard (Alloc_);
24
+ Pgm_ = NKikimr::NMiniKQL::DeserializeRuntimeNode (serialized, Env_);
25
+ auto pgmTop = AS_CALLABLE (" BlockAsTuple" , Pgm_);
26
+ MKQL_ENSURE (pgmTop->GetInputsCount () == 2 , " Expected tuple of 2 items" );
27
+ auto argsNode = pgmTop->GetInput (0 );
28
+ MKQL_ENSURE (!argsNode.IsImmediate () && argsNode.GetNode ()->GetType ()->IsCallable (), " Expected callable" );
29
+ auto argsCallable = static_cast <NKikimr::NMiniKQL::TCallable*>(argsNode.GetNode ());
30
30
31
- Explorer_.Walk (Pgm_.GetNode (), Env_);
32
- NKikimr::NMiniKQL::TComputationPatternOpts opts (Alloc_.Ref (), Env_, nodeFactory,
33
- &functionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, " OFF" ,
34
- NKikimr::NMiniKQL::EGraphPerProcess::Multi, nullptr , nullptr , nullptr , nullptr , langver);
35
- std::vector<NKikimr::NMiniKQL::TNode*> entryPoints;
36
- if (argsCallable->GetType ()->GetName () == " BlockAsTuple" ) {
37
- for (ui32 i = 0 ; i < argsCallable->GetInputsCount (); ++i) {
38
- entryPoints.emplace_back (argsCallable->GetInput (i).GetNode ());
39
- }
31
+ Explorer_.Walk (Pgm_.GetNode (), Env_);
32
+ NKikimr::NMiniKQL::TComputationPatternOpts opts (Alloc_.Ref (), Env_, nodeFactory,
33
+ &functionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, " OFF" ,
34
+ NKikimr::NMiniKQL::EGraphPerProcess::Multi, nullptr , nullptr , nullptr , nullptr , langver);
35
+ std::vector<NKikimr::NMiniKQL::TNode*> entryPoints;
36
+ if (argsCallable->GetType ()->GetName () == " BlockAsTuple" ) {
37
+ for (ui32 i = 0 ; i < argsCallable->GetInputsCount (); ++i) {
38
+ entryPoints.emplace_back (argsCallable->GetInput (i).GetNode ());
40
39
}
40
+ }
41
41
42
- Alloc_.Ref ().UseRefLocking = true ;
43
- Pattern_ = NKikimr::NMiniKQL::MakeComputationPattern (Explorer_, Pgm_, entryPoints, opts);
44
- RandomProvider_ = CreateDefaultRandomProvider ();
45
- TimeProvider_ = CreateDefaultTimeProvider ();
42
+ Alloc_.Ref ().UseRefLocking = true ;
43
+ Pattern_ = NKikimr::NMiniKQL::MakeComputationPattern (Explorer_, Pgm_, entryPoints, opts);
44
+ RandomProvider_ = CreateDefaultRandomProvider ();
45
+ TimeProvider_ = CreateDefaultTimeProvider ();
46
46
47
- Graph_ = Pattern_->Clone (opts.ToComputationOptions (*RandomProvider_, *TimeProvider_));
48
- NKikimr::NMiniKQL::TBindTerminator terminator (Graph_->GetTerminator ());
49
- Topology_ = Graph_->GetKernelsTopology ();
50
- MKQL_ENSURE (Topology_->Items .size () >= 3 , " Expected at least 3 kernels" );
51
- }
47
+ Graph_ = Pattern_->Clone (opts.ToComputationOptions (*RandomProvider_, *TimeProvider_));
48
+ NKikimr::NMiniKQL::TBindTerminator terminator (Graph_->GetTerminator ());
49
+ Topology_ = Graph_->GetKernelsTopology ();
50
+ MKQL_ENSURE (Topology_->Items .size () >= 3 , " Expected at least 3 kernels" );
51
+ }
52
52
53
- ~TLoader () {
54
- Alloc_.Acquire ();
55
- }
53
+ ~TLoader () {
54
+ Alloc_.Acquire ();
55
+ }
56
56
57
- ui32 GetKernelsCount () const {
58
- return Topology_->Items .size () - 3 ;
59
- }
57
+ ui32 GetKernelsCount () const {
58
+ return Topology_->Items .size () - 3 ;
59
+ }
60
60
61
- const arrow::compute::ScalarKernel* GetKernel (ui32 index) const {
62
- MKQL_ENSURE (index < Topology_->Items .size () - 3 , " Bad kernel index" );
63
- return &Topology_->Items [index].Node ->GetArrowKernel ();
64
- }
61
+ const arrow::compute::ScalarKernel* GetKernel (ui32 index) const {
62
+ MKQL_ENSURE (index < Topology_->Items .size () - 3 , " Bad kernel index" );
63
+ return &Topology_->Items [index].Node ->GetArrowKernel ();
64
+ }
65
65
66
- private:
67
- NKikimr::NMiniKQL::TScopedAlloc Alloc_;
68
- NKikimr::NMiniKQL::TTypeEnvironment Env_;
69
- NKikimr::NMiniKQL::TRuntimeNode Pgm_;
70
- NKikimr::NMiniKQL::TExploringNodeVisitor Explorer_;
71
- NKikimr::NMiniKQL::IComputationPattern::TPtr Pattern_;
72
- TIntrusivePtr<IRandomProvider> RandomProvider_;
73
- TIntrusivePtr<ITimeProvider> TimeProvider_;
74
- THolder<NKikimr::NMiniKQL::IComputationGraph> Graph_;
75
- const NKikimr::NMiniKQL::TArrowKernelsTopology* Topology_;
76
- };
77
- }
66
+ private:
67
+ NKikimr::NMiniKQL::TScopedAlloc Alloc_;
68
+ NKikimr::NMiniKQL::TTypeEnvironment Env_;
69
+ NKikimr::NMiniKQL::TRuntimeNode Pgm_;
70
+ NKikimr::NMiniKQL::TExploringNodeVisitor Explorer_;
71
+ NKikimr::NMiniKQL::IComputationPattern::TPtr Pattern_;
72
+ TIntrusivePtr<IRandomProvider> RandomProvider_;
73
+ TIntrusivePtr<ITimeProvider> TimeProvider_;
74
+ THolder<NKikimr::NMiniKQL::IComputationGraph> Graph_;
75
+ const NKikimr::NMiniKQL::TArrowKernelsTopology* Topology_;
76
+ };
77
+ } // namespace
78
78
79
79
std::vector<std::shared_ptr<const arrow::compute::ScalarKernel>> LoadKernels (const TString& serialized,
80
- const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
81
- const NKikimr::NMiniKQL::TComputationNodeFactory& nodeFactory,
82
- TLangVersion langver) {
80
+ const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
81
+ const NKikimr::NMiniKQL::TComputationNodeFactory& nodeFactory,
82
+ TLangVersion langver) {
83
83
auto loader = std::make_shared<TLoader>();
84
84
loader->Init (serialized, functionRegistry, nodeFactory, langver);
85
85
std::vector<std::shared_ptr<const arrow::compute::ScalarKernel>> ret (loader->GetKernelsCount ());
@@ -91,4 +91,4 @@ std::vector<std::shared_ptr<const arrow::compute::ScalarKernel>> LoadKernels(con
91
91
return ret;
92
92
}
93
93
94
- }
94
+ } // namespace NYql
0 commit comments