Skip to content

Commit a4f3769

Browse files
committed
Tune sink settings
commit_hash:d3e3a331c231bc70efd908be97d7e011c85635de
1 parent 7b50230 commit a4f3769

File tree

2 files changed

+35
-9
lines changed

2 files changed

+35
-9
lines changed

yt/yql/providers/yt/provider/yql_yt_ytflow_integration.cpp

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_yt_ytflow_integration.h"
2+
#include "yql_yt_provider.h"
23
#include "yql_yt_table.h"
34

45
#include <yql/essentials/core/yql_expr_type_annotation.h>
@@ -78,11 +79,18 @@ class TYtYtflowIntegration: public IYtflowIntegration {
7879

7980
auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value());
8081
auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table()));
81-
auto epoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref());
82+
auto commitEpoch = TEpochInfo::Parse(maybeWriteTable.Cast().Table().CommitEpoch().Ref());
8283

83-
auto tableDesc = State_->TablesData->GetTable(cluster, tableName, epoch);
84+
auto tableDesc = State_->TablesData->GetTable(
85+
cluster, tableName, 0);
8486

85-
if (!tableDesc.Meta->IsDynamic) {
87+
auto commitTableDesc = State_->TablesData->GetTable(
88+
cluster, tableName, commitEpoch);
89+
90+
if (!tableDesc.Meta->IsDynamic
91+
&& tableDesc.Meta->DoesExist
92+
&& !(commitTableDesc.Intents & TYtTableIntent::Override)
93+
) {
8694
AddMessage(ctx, "write to static table");
8795
return false;
8896
}
@@ -145,14 +153,30 @@ class TYtYtflowIntegration: public IYtflowIntegration {
145153
auto maybeWriteTable = TMaybeNode<TYtWriteTable>(&sink);
146154
YQL_ENSURE(maybeWriteTable);
147155

148-
auto table = maybeWriteTable.Cast().Table().Cast<TYtTable>();
156+
NYtflow::NProto::TQYTSinkMessage sinkSettings;
149157

150-
auto* rowType = TYqlRowSpecInfo(table.RowSpec()).GetType();
158+
{
159+
auto table = maybeWriteTable.Cast().Table().Cast<TYtTable>();
151160

152-
NYtflow::NProto::TQYTSinkMessage sinkSettings;
153-
sinkSettings.SetCluster(table.Cluster().StringValue());
154-
sinkSettings.SetPath(table.Name().StringValue());
155-
sinkSettings.SetRowType(NCommon::WriteTypeToYson(rowType));
161+
sinkSettings.SetCluster(table.Cluster().StringValue());
162+
sinkSettings.SetPath(table.Name().StringValue());
163+
164+
auto* rowType = maybeWriteTable.Cast().Content().Ref().GetTypeAnn()
165+
->Cast<TListExprType>()->GetItemType();
166+
167+
sinkSettings.SetRowType(NCommon::WriteTypeToYson(rowType));
168+
}
169+
170+
{
171+
auto cluster = TString(maybeWriteTable.Cast().DataSink().Cluster().Value());
172+
auto tableName = TString(TYtTableInfo::GetTableLabel(maybeWriteTable.Cast().Table()));
173+
174+
auto tableDesc = State_->TablesData->GetTable(
175+
cluster, tableName, 0);
176+
177+
sinkSettings.SetDoesExist(tableDesc.Meta->DoesExist);
178+
sinkSettings.SetTruncate(tableDesc.Intents & TYtTableIntent::Override);
179+
}
156180

157181
settings.PackFrom(sinkSettings);
158182
}

yt/yql/providers/ytflow/integration/proto/yt.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,6 @@ message TQYTSinkMessage {
1111
optional string Cluster = 1;
1212
optional string Path = 2;
1313
optional bytes RowType = 3;
14+
optional bool DoesExist = 5;
15+
optional bool Truncate = 6;
1416
}

0 commit comments

Comments
 (0)