forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathStorageObjectStorageCluster.h
More file actions
203 lines (150 loc) · 7.17 KB
/
StorageObjectStorageCluster.h
File metadata and controls
203 lines (150 loc) · 7.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
#pragma once
#include <Storages/IStorageCluster.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
class StorageObjectStorageCluster : public IStorageCluster
{
public:
StorageObjectStorageCluster(
const String & cluster_name_,
StorageObjectStorageConfigurationPtr configuration_,
ObjectStoragePtr object_storage_,
const StorageID & table_id_,
const ColumnsDescription & columns_in_table_or_function_definition,
const ConstraintsDescription & constraints_,
const ASTPtr & partition_by,
ContextPtr context_,
const String & comment_,
std::optional<FormatSettings> format_settings_,
LoadingStrictnessLevel mode_,
std::shared_ptr<DataLake::ICatalog> catalog,
bool if_not_exists,
bool is_datalake_query,
bool is_table_function,
bool lazy_init);
std::string getName() const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ActionsDAG * filter,
const ContextPtr & context,
ClusterPtr cluster) const override;
String getPathSample(ContextPtr context);
std::optional<UInt64> totalRows(ContextPtr query_context) const override;
std::optional<UInt64> totalBytes(ContextPtr query_context) const override;
void setClusterNameInSettings(bool cluster_name_in_settings_) { cluster_name_in_settings = cluster_name_in_settings_; }
String getClusterName(ContextPtr context) const override;
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
std::optional<QueryPipeline> distributedWrite(
const ASTInsertQuery & query,
ContextPtr context) override;
void drop() override;
void dropInnerTableIfAny(bool sync, ContextPtr context) override;
void truncate(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context,
TableExclusiveLockHolder &) override;
void checkTableCanBeRenamed(const StorageID & new_name) const override;
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
void renameInMemory(const StorageID & new_table_id) override;
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & alter_lock_holder) override;
void addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const override;
IDataLakeMetadata * getExternalMetadata(ContextPtr query_context);
bool updateExternalDynamicMetadataIfExists(ContextPtr context) override;
StorageMetadataPtr getInMemoryMetadataPtr() const override;
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
Pipe alterPartition(
const StorageMetadataPtr & metadata_snapshot,
const PartitionCommands & commands,
ContextPtr context) override;
void checkAlterPartitionIsPossible(
const PartitionCommands & commands,
const StorageMetadataPtr & metadata_snapshot,
const Settings & settings,
ContextPtr context) const override;
bool optimize(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
const ASTPtr & partition,
bool final,
bool deduplicate,
const Names & deduplicate_by_columns,
bool cleanup,
ContextPtr context) override;
QueryPipeline updateLightweight(const MutationCommands & commands, ContextPtr context) override;
void mutate(const MutationCommands & commands, ContextPtr context) override;
CancellationCode killMutation(const String & mutation_id) override;
void waitForMutation(const String & mutation_id, bool wait_for_another_mutation) override;
void setMutationCSN(const String & mutation_id, UInt64 csn) override;
CancellationCode killPartMoveToShard(const UUID & task_uuid) override;
void startup() override;
void shutdown(bool is_drop = false) override;
void flushAndPrepareForShutdown() override;
ActionLock getActionLock(StorageActionBlockType action_type) override;
void onActionLockRemove(StorageActionBlockType action_type) override;
bool supportsImport() const override;
SinkToStoragePtr import(
const std::string & /* file_name */,
Block & /* block_with_partition_values */,
std::string & /* destination_file_path */,
bool /* overwrite_if_exists */,
ContextPtr /* context */) override;
bool prefersLargeBlocks() const override;
bool supportsPartitionBy() const override;
bool supportsSubcolumns() const override;
bool supportsDynamicSubcolumns() const override;
bool supportsTrivialCountOptimization(const StorageSnapshotPtr &, ContextPtr) const override;
/// Things required for PREWHERE.
bool supportsPrewhere() const override;
bool canMoveConditionsToPrewhere() const override;
std::optional<NameSet> supportedPrewhereColumns() const override;
ColumnSizeByName getColumnSizes() const override;
bool parallelizeOutputAfterReading(ContextPtr context) const override;
bool supportsDelete() const override;
private:
void updateQueryToSendIfNeeded(
ASTPtr & query,
const StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context) override;
void readFallBackToPure(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr writeFallBackToPure(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;
void updateConfigurationIfNeeded(ContextPtr context) override;
/*
In case the table was created with `object_storage_cluster` setting,
modify the AST query object so that it uses the table function implementation
by mapping the engine name to table function name and setting `object_storage_cluster`.
For table like
CREATE TABLE table ENGINE=S3(...) SETTINGS object_storage_cluster='cluster'
coverts request
SELECT * FROM table
to
SELECT * FROM s3(...) SETTINGS object_storage_cluster='cluster'
to make distributed request over cluster 'cluster'.
*/
void updateQueryForDistributedEngineIfNeeded(ASTPtr & query, ContextPtr context);
const String engine_name;
StorageObjectStorageConfigurationPtr configuration;
const ObjectStoragePtr object_storage;
NamesAndTypesList hive_partition_columns_to_read_from_file_path;
bool cluster_name_in_settings;
/// non-clustered storage to fall back on pure realisation if needed
std::shared_ptr<StorageObjectStorage> pure_storage;
bool update_configuration_on_read_write;
};
}