Skip to content

Commit 4bac44a

Browse files
committed
rename a few things
1 parent bb742af commit 4bac44a

8 files changed

+135
-108
lines changed

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ add_headers_and_sources(dbms Storages/ObjectStorage/Azure)
130130
add_headers_and_sources(dbms Storages/ObjectStorage/S3)
131131
add_headers_and_sources(dbms Storages/ObjectStorage/HDFS)
132132
add_headers_and_sources(dbms Storages/ObjectStorage/Local)
133+
add_headers_and_sources(dbms Storages/ObjectStorage/MergeTree)
133134
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes)
134135
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/Iceberg)
135136
add_headers_and_sources(dbms Storages/ObjectStorage/DataLakes/DeltaLake)
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include <Storages/ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h>
2+
3+
namespace DB
4+
{
5+
6+
StorageObjectStorageMergeTreePartImporterSink::StorageObjectStorageMergeTreePartImporterSink(
7+
const DataPartPtr & part_,
8+
const std::string & path_,
9+
const ObjectStoragePtr & object_storage_,
10+
const ConfigurationPtr & configuration_,
11+
const std::optional<FormatSettings> & format_settings_,
12+
const Block & sample_block_,
13+
const std::function<void(MergeTreePartImportStats)> & part_log_,
14+
const ContextPtr & context_)
15+
: SinkToStorage(sample_block_)
16+
, object_storage(object_storage_)
17+
, configuration(configuration_)
18+
, format_settings(format_settings_)
19+
, sample_block(sample_block_)
20+
, context(context_)
21+
, part_log(part_log_)
22+
{
23+
stats.part = part_;
24+
stats.file_path = path_;
25+
sink = std::make_shared<StorageObjectStorageSink>(
26+
stats.file_path,
27+
object_storage,
28+
configuration,
29+
format_settings,
30+
sample_block,
31+
context);
32+
}
33+
34+
String StorageObjectStorageMergeTreePartImporterSink::getName() const
35+
{
36+
return "StorageObjectStorageMergeTreePartImporterSink";
37+
}
38+
39+
void StorageObjectStorageMergeTreePartImporterSink::consume(Chunk & chunk)
40+
{
41+
sink->consume(chunk);
42+
43+
stats.read_bytes += chunk.bytes();
44+
stats.read_rows += chunk.getNumRows();
45+
}
46+
47+
void StorageObjectStorageMergeTreePartImporterSink::onFinish()
48+
{
49+
sink->onFinish();
50+
if (const auto object_metadata = object_storage->tryGetObjectMetadata(stats.file_path))
51+
{
52+
stats.bytes_on_disk = object_metadata->size_bytes;
53+
}
54+
part_log(stats);
55+
}
56+
57+
void StorageObjectStorageMergeTreePartImporterSink::onException(std::exception_ptr exception)
58+
{
59+
sink->onException(exception);
60+
part_log(stats);
61+
}
62+
63+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#pragma once
2+
3+
#include <Interpreters/Context.h>
4+
#include <Storages/ObjectStorage/StorageObjectStorageSink.h>
5+
#include "Core/Settings.h"
6+
#include "Disks/ObjectStorages/IObjectStorage.h"
7+
#include "Disks/ObjectStorages/StoredObject.h"
8+
#include "Formats/FormatFactory.h"
9+
#include "IO/CompressionMethod.h"
10+
#include "Processors/Formats/IOutputFormat.h"
11+
#include "Storages/MergeTree/IMergeTreeDataPart.h"
12+
13+
namespace DB
14+
{
15+
16+
struct MergeTreePartImportStats
17+
{
18+
ExecutionStatus status;
19+
std::size_t bytes_on_disk = 0;
20+
std::size_t read_rows = 0;
21+
std::size_t read_bytes = 0;
22+
std::string file_path = "";
23+
DataPartPtr part = nullptr;
24+
};
25+
26+
/*
27+
* Wrapper around `StorageObjectsStorageSink` that takes care of accounting & metrics for partition export
28+
*/
29+
class StorageObjectStorageMergeTreePartImporterSink : public SinkToStorage
30+
{
31+
public:
32+
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
33+
34+
StorageObjectStorageMergeTreePartImporterSink(
35+
const DataPartPtr & part_,
36+
const std::string & path_,
37+
const ObjectStoragePtr & object_storage_,
38+
const ConfigurationPtr & configuration_,
39+
const std::optional<FormatSettings> & format_settings_,
40+
const Block & sample_block_,
41+
const std::function<void(MergeTreePartImportStats)> & part_log_,
42+
const ContextPtr & context_);
43+
44+
String getName() const override;
45+
46+
void consume(Chunk & chunk) override;
47+
48+
void onFinish() override;
49+
50+
void onException(std::exception_ptr exception) override;
51+
52+
private:
53+
std::shared_ptr<StorageObjectStorageSink> sink;
54+
ObjectStoragePtr object_storage;
55+
ConfigurationPtr configuration;
56+
std::optional<FormatSettings> format_settings;
57+
Block sample_block;
58+
ContextPtr context;
59+
std::function<void(MergeTreePartImportStats)> part_log;
60+
61+
MergeTreePartImportStats stats;
62+
};
63+
64+
}

src/Storages/ObjectStorage/MergeTree/StorageObjectStorageSinkMTPartImportDecorator.cpp

Whitespace-only changes.

src/Storages/ObjectStorage/MergeTree/StorageObjectStorageSinkMTPartImportDecorator.h

Lines changed: 0 additions & 102 deletions
This file was deleted.

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
#include <Core/ColumnWithTypeAndName.h>
22
#include <Storages/MergeTree/IMergeTreeDataPart.h>
33
#include <Storages/ObjectStorage/StorageObjectStorage.h>
4-
#include "MergeTree/StorageObjectStorageSinkMTPartImportDecorator.h"
4+
#include "MergeTree/StorageObjectStorageMergeTreePartImporterSink.h"
55

6-
#include <Common/logger_useful.h>
76
#include <Core/Settings.h>
87
#include <Formats/FormatFactory.h>
9-
#include <Parsers/ASTInsertQuery.h>
108
#include <Formats/ReadSchemaUtils.h>
11-
#include <QueryPipeline/QueryPipelineBuilder.h>
129
#include <Interpreters/Context.h>
10+
#include <Parsers/ASTInsertQuery.h>
11+
#include <QueryPipeline/QueryPipelineBuilder.h>
12+
#include <Common/logger_useful.h>
1313

1414
#include <Processors/Sources/NullSource.h>
1515
#include <Processors/QueryPlan/QueryPlan.h>
@@ -663,7 +663,7 @@ void StorageObjectStorage::importMergeTreePartition(
663663
auto builder = plan_for_part.buildQueryPipeline(optimization_settings, pipeline_settings);
664664
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
665665

666-
auto sink = std::make_shared<StorageObjectStorageSinkMTPartImportDecorator>(
666+
auto sink = std::make_shared<StorageObjectStorageMergeTreePartImporterSink>(
667667
data_part,
668668
file_path,
669669
object_storage,

src/Storages/ObjectStorage/StorageObjectStorageSink.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace DB
88
{
99
class StorageObjectStorageSink : public SinkToStorage
1010
{
11+
friend class StorageObjectStorageMergeTreePartImporterSink;
1112
public:
1213
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
1314

src/Storages/StorageMergeTree.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
#include <Common/escapeForFileName.h>
4949
#include "Core/BackgroundSchedulePool.h"
5050
#include "Core/Names.h"
51-
#include "ObjectStorage/MergeTree/StorageObjectStorageSinkMTPartImportDecorator.h"
51+
#include "ObjectStorage/MergeTree/StorageObjectStorageMergeTreePartImporterSink.h"
5252

5353
namespace DB
5454
{

0 commit comments

Comments
 (0)