@@ -631,123 +631,10 @@ void StorageObjectStorage::importMergeTreePart(
631631
632632 pipeline.complete (sink);
633633
634- pipeline.setNumThreads (local_context->getSettingsRef ()[Setting::max_threads]);
635-
636634 CompletedPipelineExecutor exec (pipeline);
637635 exec.execute ();
638636}
639637
640- void StorageObjectStorage::importMergeTreePartition (
641- const MergeTreeData & merge_tree_data,
642- const std::vector<DataPartPtr> & data_parts,
643- ContextPtr local_context,
644- std::function<void (MergeTreePartImportStats)> part_log)
645- {
646- if (data_parts.empty ())
647- return ;
648-
649- std::vector<QueryPlanPtr> part_plans;
650- part_plans.reserve (data_parts.size ());
651-
652- auto metadata_snapshot = merge_tree_data.getInMemoryMetadataPtr ();
653- Names columns_to_read = metadata_snapshot->getColumns ().getNamesOfPhysical ();
654- StorageSnapshotPtr storage_snapshot = merge_tree_data.getStorageSnapshot (metadata_snapshot, local_context);
655-
656- QueryPlan plan;
657-
658- // / using the mutations type for now
659- MergeTreeSequentialSourceType read_type = MergeTreeSequentialSourceType::Mutation;
660-
661- bool apply_deleted_mask = true ;
662- bool read_with_direct_io = false ;
663- bool prefetch = false ;
664-
665- QueryPipeline export_pipeline;
666-
667- std::vector<ExportsList::EntryPtr> export_list_entries;
668-
669- for (const auto & data_part : data_parts)
670- {
671- const auto partition_columns = configuration->partition_strategy ->getPartitionColumns ();
672-
673- auto block_with_partition_values = data_part->partition .getBlockWithPartitionValues (partition_columns);
674-
675- const auto column_with_partition_key = configuration->partition_strategy ->computePartitionKey (block_with_partition_values);
676-
677- const auto file_path = configuration->file_path_generator ->getWritingPath (column_with_partition_key->getDataAt (0 ).toString ());
678-
679- export_list_entries.emplace_back (local_context->getGlobalContext ()->getExportsList ().insert (
680- merge_tree_data.getStorageID (),
681- getStorageID (),
682- data_part->name ,
683- file_path
684- ));
685-
686- MergeTreeData::IMutationsSnapshot::Params params
687- {
688- .metadata_version = metadata_snapshot->getMetadataVersion (),
689- .min_part_metadata_version = data_part->getMetadataVersion (),
690- };
691-
692- auto mutations_snapshot = merge_tree_data.getMutationsSnapshot (params);
693-
694- auto alter_conversions = MergeTreeData::getAlterConversionsForPart (
695- data_part,
696- mutations_snapshot,
697- local_context);
698-
699- QueryPlan plan_for_part;
700-
701- createReadFromPartStep (
702- read_type,
703- plan_for_part,
704- merge_tree_data,
705- storage_snapshot,
706- RangesInDataPart (data_part),
707- alter_conversions,
708- nullptr ,
709- columns_to_read,
710- nullptr ,
711- apply_deleted_mask,
712- std::nullopt ,
713- read_with_direct_io,
714- prefetch,
715- local_context,
716- getLogger (" ExportPartition" ));
717-
718- QueryPlanOptimizationSettings optimization_settings (local_context);
719- auto pipeline_settings = BuildQueryPipelineSettings (local_context);
720- auto builder = plan_for_part.buildQueryPipeline (optimization_settings, pipeline_settings);
721- auto pipeline = QueryPipelineBuilder::getPipeline (std::move (*builder));
722-
723- auto sink = std::make_shared<StorageObjectStorageMergeTreePartImporterSink>(
724- data_part,
725- file_path,
726- object_storage,
727- configuration,
728- format_settings,
729- metadata_snapshot->getSampleBlock (),
730- part_log,
731- local_context
732- );
733-
734- pipeline.complete (sink);
735- export_pipeline.addCompletedPipeline (std::move (pipeline));
736- }
737-
738- if (!export_pipeline.completed ())
739- {
740- throw Exception (ErrorCodes::LOGICAL_ERROR, " Root pipeline is not completed" );
741- }
742-
743- export_pipeline.setNumThreads (local_context->getSettingsRef ()[Setting::max_threads]);
744-
745- CompletedPipelineExecutor exec (export_pipeline);
746- exec.execute ();
747-
748- // NOTE: Do not write commit file here. The caller manages commit via JSON manifest.
749- }
750-
751638void StorageObjectStorage::commitExportPartitionTransaction (const String & transaction_id, const String & partition_id, const Strings & exported_paths, ContextPtr local_context)
752639{
753640 const String commit_object = configuration->getRawPath ().path + " /commit_" + partition_id + " _" + transaction_id;
0 commit comments