Skip to content

Commit 167e2c2

Browse files
committed
Compatibility with table creation and modification
1 parent 835a4fc commit 167e2c2

File tree

4 files changed

+315
-22
lines changed

4 files changed

+315
-22
lines changed

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ StorageObjectStorage::StorageObjectStorage(
103103
std::optional<FormatSettings> format_settings_,
104104
LoadingStrictnessLevel mode,
105105
std::shared_ptr<DataLake::ICatalog> catalog_,
106-
bool if_not_exists_,
107-
bool is_datalake_query,
106+
bool /*if_not_exists_*/,
107+
bool /*is_datalake_query*/,
108108
bool distributed_processing_,
109109
ASTPtr partition_by_,
110110
bool is_table_function,
@@ -126,19 +126,6 @@ StorageObjectStorage::StorageObjectStorage(
126126
&& !configuration->isDataLakeConfiguration();
127127
const bool do_lazy_init = lazy_init && !need_resolve_columns_or_format && !need_resolve_sample_path;
128128

129-
if (!is_table_function && !columns_in_table_or_function_definition.empty() && !is_datalake_query && mode == LoadingStrictnessLevel::CREATE)
130-
{
131-
configuration->create(
132-
object_storage,
133-
context,
134-
columns_in_table_or_function_definition,
135-
partition_by_,
136-
if_not_exists_,
137-
catalog,
138-
storage_id
139-
);
140-
}
141-
142129
bool updated_configuration = false;
143130
try
144131
{

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 248 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,21 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
103103

104104
auto log_ = getLogger("StorageObjectStorageCluster");
105105

106+
if (!columns_in_table_or_function_definition.empty()
107+
&& !is_datalake_query
108+
&& mode_ == LoadingStrictnessLevel::CREATE)
109+
{
110+
configuration->create(
111+
object_storage,
112+
context_,
113+
columns_in_table_or_function_definition,
114+
partition_by,
115+
if_not_exists,
116+
catalog,
117+
table_id_
118+
);
119+
}
120+
106121
try
107122
{
108123
if (!do_lazy_init)
@@ -160,8 +175,8 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
160175
object_storage,
161176
context_,
162177
getStorageID(),
163-
getInMemoryMetadata().getColumns(),
164-
getInMemoryMetadata().getConstraints(),
178+
IStorageCluster::getInMemoryMetadata().getColumns(),
179+
IStorageCluster::getInMemoryMetadata().getConstraints(),
165180
comment_,
166181
format_settings_,
167182
mode_,
@@ -176,7 +191,7 @@ StorageObjectStorageCluster::StorageObjectStorageCluster(
176191
auto virtuals_ = getVirtualsPtr();
177192
if (virtuals_)
178193
pure_storage->setVirtuals(*virtuals_);
179-
pure_storage->setInMemoryMetadata(getInMemoryMetadata());
194+
pure_storage->setInMemoryMetadata(IStorageCluster::getInMemoryMetadata());
180195
}
181196

182197
std::string StorageObjectStorageCluster::getName() const
@@ -540,6 +555,35 @@ QueryProcessingStage::Enum StorageObjectStorageCluster::getQueryProcessingStage(
540555
return IStorageCluster::getQueryProcessingStage(context, to_stage, storage_snapshot, query_info);
541556
}
542557

558+
std::optional<QueryPipeline> StorageObjectStorageCluster::distributedWrite(
559+
const ASTInsertQuery & query,
560+
ContextPtr context)
561+
{
562+
if (getClusterName(context).empty())
563+
return pure_storage->distributedWrite(query, context);
564+
return IStorageCluster::distributedWrite(query, context);
565+
}
566+
567+
void StorageObjectStorageCluster::drop()
568+
{
569+
if (pure_storage)
570+
{
571+
pure_storage->drop();
572+
return;
573+
}
574+
IStorageCluster::drop();
575+
}
576+
577+
void StorageObjectStorageCluster::dropInnerTableIfAny(bool sync, ContextPtr context)
578+
{
579+
if (getClusterName(context).empty())
580+
{
581+
pure_storage->dropInnerTableIfAny(sync, context);
582+
return;
583+
}
584+
IStorageCluster::dropInnerTableIfAny(sync, context);
585+
}
586+
543587
void StorageObjectStorageCluster::truncate(
544588
const ASTPtr & query,
545589
const StorageMetadataPtr & metadata_snapshot,
@@ -556,14 +600,56 @@ void StorageObjectStorageCluster::truncate(
556600
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
557601
}
558602

603+
void StorageObjectStorageCluster::checkTableCanBeRenamed(const StorageID & new_name) const
604+
{
605+
if (pure_storage)
606+
{
607+
pure_storage->checkTableCanBeRenamed(new_name);
608+
return;
609+
}
610+
IStorageCluster::checkTableCanBeRenamed(new_name);
611+
}
612+
613+
void StorageObjectStorageCluster::rename(const String & new_path_to_table_data, const StorageID & new_table_id)
614+
{
615+
if (pure_storage)
616+
{
617+
pure_storage->rename(new_path_to_table_data, new_table_id);
618+
return;
619+
}
620+
IStorageCluster::rename(new_path_to_table_data, new_table_id);
621+
}
622+
623+
void StorageObjectStorageCluster::renameInMemory(const StorageID & new_table_id)
624+
{
625+
if (pure_storage)
626+
{
627+
pure_storage->renameInMemory(new_table_id);
628+
return;
629+
}
630+
IStorageCluster::renameInMemory(new_table_id);
631+
}
632+
633+
void StorageObjectStorageCluster::alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder)
634+
{
635+
if (getClusterName(context).empty())
636+
{
637+
pure_storage->alter(params, context, alter_lock_holder);
638+
setInMemoryMetadata(pure_storage->getInMemoryMetadata());
639+
return;
640+
}
641+
IStorageCluster::alter(params, context, alter_lock_holder);
642+
pure_storage->setInMemoryMetadata(IStorageCluster::getInMemoryMetadata());
643+
}
644+
559645
void StorageObjectStorageCluster::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
560646
{
561647
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false);
562648
}
563649

564650
bool StorageObjectStorageCluster::updateExternalDynamicMetadataIfExists(ContextPtr context)
565651
{
566-
if (pure_storage)
652+
if (getClusterName(context).empty())
567653
return pure_storage->updateExternalDynamicMetadataIfExists(context);
568654
return IStorageCluster::updateExternalDynamicMetadataIfExists(context);
569655
}
@@ -577,7 +663,7 @@ StorageMetadataPtr StorageObjectStorageCluster::getInMemoryMetadataPtr() const
577663

578664
IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr query_context)
579665
{
580-
if (pure_storage)
666+
if (getClusterName(query_context).empty())
581667
return pure_storage->getExternalMetadata(query_context);
582668

583669
configuration->update(
@@ -589,4 +675,161 @@ IDataLakeMetadata * StorageObjectStorageCluster::getExternalMetadata(ContextPtr
589675
return configuration->getExternalMetadata();
590676
}
591677

678+
void StorageObjectStorageCluster::checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const
679+
{
680+
if (getClusterName(context).empty())
681+
{
682+
pure_storage->checkAlterIsPossible(commands, context);
683+
return;
684+
}
685+
IStorageCluster::checkAlterIsPossible(commands, context);
686+
}
687+
688+
void StorageObjectStorageCluster::checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const
689+
{
690+
if (pure_storage)
691+
{
692+
pure_storage->checkMutationIsPossible(commands, settings);
693+
return;
694+
}
695+
IStorageCluster::checkMutationIsPossible(commands, settings);
696+
}
697+
698+
Pipe StorageObjectStorageCluster::alterPartition(
699+
const StorageMetadataPtr & metadata_snapshot,
700+
const PartitionCommands & commands,
701+
ContextPtr context)
702+
{
703+
if (getClusterName(context).empty())
704+
return pure_storage->alterPartition(metadata_snapshot, commands, context);
705+
return IStorageCluster::alterPartition(metadata_snapshot, commands, context);
706+
}
707+
708+
void StorageObjectStorageCluster::checkAlterPartitionIsPossible(
709+
const PartitionCommands & commands,
710+
const StorageMetadataPtr & metadata_snapshot,
711+
const Settings & settings,
712+
ContextPtr context) const
713+
{
714+
if (getClusterName(context).empty())
715+
{
716+
pure_storage->checkAlterPartitionIsPossible(commands, metadata_snapshot, settings, context);
717+
return;
718+
}
719+
IStorageCluster::checkAlterPartitionIsPossible(commands, metadata_snapshot, settings, context);
720+
}
721+
722+
bool StorageObjectStorageCluster::optimize(
723+
const ASTPtr & query,
724+
const StorageMetadataPtr & metadata_snapshot,
725+
const ASTPtr & partition,
726+
bool final,
727+
bool deduplicate,
728+
const Names & deduplicate_by_columns,
729+
bool cleanup,
730+
ContextPtr context)
731+
{
732+
if (getClusterName(context).empty())
733+
return pure_storage->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, context);
734+
return IStorageCluster::optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, context);
735+
}
736+
737+
QueryPipeline StorageObjectStorageCluster::updateLightweight(const MutationCommands & commands, ContextPtr context)
738+
{
739+
if (getClusterName(context).empty())
740+
return pure_storage->updateLightweight(commands, context);
741+
return IStorageCluster::updateLightweight(commands, context);
742+
}
743+
744+
void StorageObjectStorageCluster::mutate(const MutationCommands & commands, ContextPtr context)
745+
{
746+
if (getClusterName(context).empty())
747+
{
748+
pure_storage->mutate(commands, context);
749+
return;
750+
}
751+
IStorageCluster::mutate(commands, context);
752+
}
753+
754+
CancellationCode StorageObjectStorageCluster::killMutation(const String & mutation_id)
755+
{
756+
if (pure_storage)
757+
return pure_storage->killMutation(mutation_id);
758+
return IStorageCluster::killMutation(mutation_id);
759+
}
760+
761+
void StorageObjectStorageCluster::waitForMutation(const String & mutation_id, bool wait_for_another_mutation)
762+
{
763+
if (pure_storage)
764+
{
765+
pure_storage->waitForMutation(mutation_id, wait_for_another_mutation);
766+
return;
767+
}
768+
IStorageCluster::waitForMutation(mutation_id, wait_for_another_mutation);
769+
}
770+
771+
void StorageObjectStorageCluster::setMutationCSN(const String & mutation_id, UInt64 csn)
772+
{
773+
if (pure_storage)
774+
{
775+
pure_storage->setMutationCSN(mutation_id, csn);
776+
return;
777+
}
778+
IStorageCluster::setMutationCSN(mutation_id, csn);
779+
}
780+
781+
CancellationCode StorageObjectStorageCluster::killPartMoveToShard(const UUID & task_uuid)
782+
{
783+
if (pure_storage)
784+
return pure_storage->killPartMoveToShard(task_uuid);
785+
return IStorageCluster::killPartMoveToShard(task_uuid);
786+
}
787+
788+
void StorageObjectStorageCluster::startup()
789+
{
790+
if (pure_storage)
791+
{
792+
pure_storage->startup();
793+
return;
794+
}
795+
IStorageCluster::startup();
796+
}
797+
798+
void StorageObjectStorageCluster::shutdown(bool is_drop)
799+
{
800+
if (pure_storage)
801+
{
802+
pure_storage->shutdown(is_drop);
803+
return;
804+
}
805+
IStorageCluster::shutdown(is_drop);
806+
}
807+
808+
void StorageObjectStorageCluster::flushAndPrepareForShutdown()
809+
{
810+
if (pure_storage)
811+
{
812+
pure_storage->flushAndPrepareForShutdown();
813+
return;
814+
}
815+
IStorageCluster::flushAndPrepareForShutdown();
816+
}
817+
818+
ActionLock StorageObjectStorageCluster::getActionLock(StorageActionBlockType action_type)
819+
{
820+
if (pure_storage)
821+
return pure_storage->getActionLock(action_type);
822+
return IStorageCluster::getActionLock(action_type);
823+
}
824+
825+
void StorageObjectStorageCluster::onActionLockRemove(StorageActionBlockType action_type)
826+
{
827+
if (pure_storage)
828+
{
829+
pure_storage->onActionLockRemove(action_type);
830+
return;
831+
}
832+
IStorageCluster::onActionLockRemove(action_type);
833+
}
834+
592835
}

0 commit comments

Comments
 (0)