Skip to content

Commit 0509cfa

Browse files
authored
[Stable-25-3-1] More CTAS fixes (#26314)
2 parents 52f0bb1 + 8c7115d commit 0509cfa

File tree

18 files changed

+198
-85
lines changed

18 files changed

+198
-85
lines changed

ydb/core/grpc_services/rpc_export.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
294294
// Skip children that we don't want to export
295295
if (child.Name.StartsWith("~")
296296
|| child.Name.StartsWith(".sys")
297+
|| child.Name.StartsWith(".tmp")
297298
|| child.Name.StartsWith(".metadata")
298299
|| child.Name.StartsWith("export-"))
299300
{

ydb/core/kqp/common/simple/temp_tables.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ TKqpTempTablesState::FindInfo(const std::string_view& path, bool withSessionId)
1010
return TempTables.find(path);
1111
}
1212

13-
const auto temporaryStoragePrefix = CanonizePath(GetSessionDirPath(Database, SessionId)) + "/";
13+
const auto temporaryStoragePrefix = CanonizePath(GetSessionDirPath(Database, TempDirName)) + "/";
1414

1515
if (path.size() < temporaryStoragePrefix.size()) {
1616
return TempTables.end();
@@ -40,16 +40,16 @@ TString GetSessionDirsBasePath(const TString& database) {
4040
return CanonizePath(JoinPath({database, TmpDirectoryName, SessionsDirectoryName}));
4141
}
4242

43-
TString GetSessionDirPath(const TString& database, const TString& sessionId) {
44-
return CanonizePath(JoinPath({database, TmpDirectoryName, SessionsDirectoryName, sessionId}));
43+
TString GetSessionDirPath(const TString& database, const TString& tmpDirName) {
44+
return CanonizePath(JoinPath({database, TmpDirectoryName, SessionsDirectoryName, tmpDirName}));
4545
}
4646

47-
TString GetTempTablePath(const TString& database, const TString& sessionId, const TString tablePath) {
48-
return CanonizePath(JoinPath({database, TmpDirectoryName, SessionsDirectoryName, sessionId, tablePath}));
47+
TString GetTempTablePath(const TString& database, const TString& tmpDirName, const TString tablePath) {
48+
return CanonizePath(JoinPath({database, TmpDirectoryName, SessionsDirectoryName, tmpDirName, tablePath}));
4949
}
5050

51-
TString GetCreateTempTablePath(const TString& database, const TString& sessionId, const TString tablePath) {
52-
return CanonizePath(JoinPath({TmpDirectoryName, SessionsDirectoryName, sessionId, database, tablePath}));
51+
TString GetCreateTempTablePath(const TString& database, const TString& tmpDirName, const TString tablePath) {
52+
return CanonizePath(JoinPath({TmpDirectoryName, SessionsDirectoryName, tmpDirName, database, tablePath}));
5353
}
5454

5555
bool IsSessionsDirPath(const TStringBuf database, const TStringBuf path) {

ydb/core/kqp/common/simple/temp_tables.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ struct TKqpTempTablesState {
1717
TString WorkingDir;
1818
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
1919
};
20-
TString SessionId;
2120
TString Database;
21+
TString TempDirName;
2222
THashMap<TString, TTempTableInfo> TempTables;
23-
bool HasCreateTableAs = false;
23+
bool NeedCleaning = false;
2424

2525
using TConstPtr = std::shared_ptr<const TKqpTempTablesState>;
2626

@@ -31,9 +31,10 @@ struct TKqpTempTablesState {
3131
TString GetTmpDirPath(const TString& database);
3232
TString GetSessionDirName();
3333
TString GetSessionDirsBasePath(const TString& database);
34-
TString GetSessionDirPath(const TString& database, const TString& sessionId);
35-
TString GetTempTablePath(const TString& database, const TString& sessionId, const TString tablePath);
36-
TString GetCreateTempTablePath(const TString& database, const TString& sessionId, const TString tablePath);
34+
TString GetSessionDirPath(const TString& database, const TString& tmpDirName);
35+
TString GetTempTablePath(const TString& database, const TString& tmpDirName, const TString tablePath);
36+
TString GetCreateTempTablePath(const TString& database, const TString& tmpDirName
37+
, const TString tablePath);
3738

3839
bool IsSessionsDirPath(const TStringBuf database, const TStringBuf path);
3940
bool IsSessionsDirPath(const TStringBuf database, const TString& workingDir, const TString& name);

ydb/core/kqp/executer_actor/kqp_executer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ IActor* CreateKqpSchemeExecuter(
160160
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
161161
const TMaybe<TString>& requestType, const TString& database,
162162
TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress,
163-
bool temporary, bool isCreateTableAs, TString SessionId, TIntrusivePtr<TUserRequestContext> ctx,
163+
bool temporary, bool createTmpDir, bool isCreateTableAs, TString tempDirName, TIntrusivePtr<TUserRequestContext> ctx,
164164
const TActorId& kqpTempTablesAgentActor = TActorId());
165165

166166
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(

ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
7676
TKqpSchemeExecuter(
7777
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
7878
const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress,
79-
bool temporary, bool isCreateTableAs, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx,
79+
bool temporary, bool createTmpDir, bool isCreateTableAs, TString tempDirName, TIntrusivePtr<TUserRequestContext> ctx,
8080
const TActorId& kqpTempTablesAgentActor)
8181
: PhyTx(phyTx)
8282
, QueryType(queryType)
@@ -85,8 +85,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
8585
, UserToken(userToken)
8686
, ClientAddress(clientAddress)
8787
, Temporary(temporary)
88+
, CreateTmpDir(createTmpDir)
8889
, IsCreateTableAs(isCreateTableAs)
89-
, SessionId(sessionId)
90+
, TempDirName(tempDirName)
9091
, RequestContext(std::move(ctx))
9192
, RequestType(requestType)
9293
, KqpTempTablesAgentActor(kqpTempTablesAgentActor)
@@ -153,9 +154,10 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
153154
modifyScheme->SetWorkingDir(GetSessionDirsBasePath(Database));
154155
modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMkDir);
155156
modifyScheme->SetAllowCreateInTempDir(false);
157+
modifyScheme->SetFailOnExist(true);
156158

157159
auto* makeDir = modifyScheme->MutableMkDir();
158-
makeDir->SetName(SessionId);
160+
makeDir->SetName(TempDirName);
159161
ActorIdToProto(KqpTempTablesAgentActor, modifyScheme->MutableTempDirOwnerActorId());
160162

161163
if (UserToken) {
@@ -327,11 +329,12 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
327329
switch (schemeOp.GetOperationCase()) {
328330
case NKqpProto::TKqpSchemeOperation::kCreateTable: {
329331
auto modifyScheme = schemeOp.GetCreateTable();
332+
AFL_ENSURE(!IsCreateTableAs || Temporary);
330333
if (Temporary) {
331334
auto changePath = [this](NKikimrSchemeOp::TTableDescription* tableDesc) {
332335
const auto fullPath = JoinPath({tableDesc->GetPath(), tableDesc->GetName()});
333336
YQL_ENSURE(fullPath.size() > 1);
334-
tableDesc->SetName(GetCreateTempTablePath(Database, SessionId, fullPath));
337+
tableDesc->SetName(GetCreateTempTablePath(Database, TempDirName, fullPath));
335338
tableDesc->SetPath(Database);
336339
};
337340

@@ -346,7 +349,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
346349
}
347350
case NKikimrSchemeOp::ESchemeOpCreateColumnTable: {
348351
modifyScheme.MutableCreateColumnTable()->SetName(
349-
GetCreateTempTablePath(Database, SessionId, modifyScheme.GetCreateColumnTable().GetName()));
352+
GetCreateTempTablePath(Database, TempDirName, modifyScheme.GetCreateColumnTable().GetName()));
350353
break;
351354
}
352355
default:
@@ -367,6 +370,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
367370

368371
case NKqpProto::TKqpSchemeOperation::kAlterTable: {
369372
const auto& modifyScheme = schemeOp.GetAlterTable();
373+
AFL_ENSURE(!IsCreateTableAs || modifyScheme.GetOperationType() == NKikimrSchemeOp::ESchemeOpMoveTable);
370374
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
371375
break;
372376
}
@@ -741,7 +745,8 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
741745
} else if (IsCreateTableAs && schemeOp.GetOperationCase() == NKqpProto::TKqpSchemeOperation::kAlterTable) {
742746
FindWorkingDirForCTAS();
743747
} else {
744-
if (Temporary) {
748+
if (CreateTmpDir) {
749+
AFL_ENSURE(Temporary);
745750
CreateTmpDirectory();
746751
} else {
747752
MakeSchemeOperationRequest();
@@ -800,7 +805,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
800805
void Handle(TEvPrivate::TEvMakeSessionDirResult::TPtr& result) {
801806
if (!result->Get()->Result.Success()) {
802807
InternalError(TStringBuilder()
803-
<< "Error creating directory for session " << SessionId
808+
<< "Error creating directory for session " << TempDirName
804809
<< ": " << result->Get()->Result.Issues().ToString(true));
805810
}
806811
MakeSchemeOperationRequest();
@@ -1097,8 +1102,9 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
10971102
const TString ClientAddress;
10981103
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ResponseEv;
10991104
bool Temporary;
1105+
bool CreateTmpDir;
11001106
bool IsCreateTableAs;
1101-
TString SessionId;
1107+
TString TempDirName;
11021108
ui64 TxId = 0;
11031109
TActorId SchemePipeActorId_;
11041110
ui64 SchemeShardTabletId = 0;
@@ -1113,12 +1119,12 @@ IActor* CreateKqpSchemeExecuter(
11131119
TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
11141120
const TMaybe<TString>& requestType, const TString& database,
11151121
TIntrusiveConstPtr<NACLib::TUserToken> userToken, const TString& clientAddress,
1116-
bool temporary, bool isCreateTableAs,
1117-
TString sessionId, TIntrusivePtr<TUserRequestContext> ctx, const TActorId& kqpTempTablesAgentActor)
1122+
bool temporary, bool createTmpDir, bool isCreateTableAs,
1123+
TString tempDirName, TIntrusivePtr<TUserRequestContext> ctx, const TActorId& kqpTempTablesAgentActor)
11181124
{
11191125
return new TKqpSchemeExecuter(
11201126
phyTx, queryType, target, requestType, database, userToken, clientAddress,
1121-
temporary, isCreateTableAs, sessionId, std::move(ctx), kqpTempTablesAgentActor);
1127+
temporary, createTmpDir, isCreateTableAs, tempDirName, std::move(ctx), kqpTempTablesAgentActor);
11221128
}
11231129

11241130
} // namespace NKikimr::NKqp

ydb/core/kqp/gateway/kqp_ic_gateway.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ class TKqpSchemeExecuterRequestHandler: public TActorBootstrapped<TKqpSchemeExec
615615
void Bootstrap() {
616616
auto ctx = MakeIntrusive<TUserRequestContext>();
617617
ctx->DatabaseId = DatabaseId;
618-
IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, ClientAddress, false /* temporary */, false /* isCreateTableAs */, TString() /* sessionId */, ctx);
618+
IActor* actor = CreateKqpSchemeExecuter(PhyTx, QueryType, SelfId(), RequestType, Database, UserToken, ClientAddress, false /* temporary */, false /* createTmpDir */, false /* isCreateTableAs */, TString() /* sessionId */, ctx);
619619
Register(actor);
620620
Become(&TThis::WaitState);
621621
}

ydb/core/kqp/gateway/kqp_metadata_loader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ NavigateEntryResult CreateNavigateEntry(const TString& path,
4545
auto tempTablesInfoIt = tempTablesState->FindInfo(currentPath, false);
4646
if (tempTablesInfoIt != tempTablesState->TempTables.end()) {
4747
queryName = currentPath;
48-
currentPath = GetTempTablePath(tempTablesState->Database, tempTablesState->SessionId, tempTablesInfoIt->first);
48+
currentPath = GetTempTablePath(tempTablesState->Database, tempTablesState->TempDirName, tempTablesInfoIt->first);
4949
}
5050
}
5151
entry.Path = SplitPath(currentPath);

ydb/core/kqp/host/kqp_host.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1130,7 +1130,6 @@ class TKqpHost : public IKqpHost {
11301130
SessionCtx->SetDatabaseId(Gateway->GetDatabaseId());
11311131
SessionCtx->SetCluster(cluster);
11321132
if (tempTablesState) {
1133-
SessionCtx->SetSessionId(tempTablesState->SessionId);
11341133
SessionCtx->SetTempTables(std::move(tempTablesState));
11351134
}
11361135

ydb/core/kqp/host/kqp_statement_rewrite.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,14 +262,14 @@ namespace {
262262
}
263263

264264
const TString tmpTableName = TStringBuilder()
265-
<< tableName
266-
<< "_cas_"
267-
<< TAppData::RandomProvider->GenUuid4().AsUuidString();
265+
<< tableName
266+
<< "_"
267+
<< TAppData::RandomProvider->GenUuid4().AsUuidString();
268268

269269
const TString createTableName = (TStringBuilder()
270270
<< CanonizePath(sessionCtx->GetDatabase())
271271
<< "/.tmp/sessions/"
272-
<< sessionCtx->GetSessionId()
272+
<< sessionCtx->GetTempTablesState()->TempDirName
273273
<< CanonizePath(tmpTableName));
274274

275275
create = exprCtx.ReplaceNode(std::move(create), *columns, exprCtx.NewList(pos, std::move(columnNodes)));

ydb/core/kqp/provider/yql_kikimr_provider.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ std::optional<TString> TKikimrTablesData::GetTempTablePath(const TStringBuf& tab
273273
auto tempTableInfoIt = TempTablesState->FindInfo(table, false);
274274

275275
if (tempTableInfoIt != TempTablesState->TempTables.end()) {
276-
return NKikimr::NKqp::GetTempTablePath(TempTablesState->Database, TempTablesState->SessionId, tempTableInfoIt->first);
276+
return NKikimr::NKqp::GetTempTablePath(TempTablesState->Database, TempTablesState->TempDirName, tempTableInfoIt->first);
277277
}
278278
return std::nullopt;
279279
}

0 commit comments

Comments
 (0)