Skip to content

Commit 019be8b

Browse files
committed
fix conflicts
1 parent b00a140 commit 019be8b

File tree

6 files changed

+107
-54
lines changed

6 files changed

+107
-54
lines changed

src/Storages/ObjectStorage/HDFS/Configuration.cpp

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,6 @@ ObjectStoragePtr StorageHDFSConfiguration::createObjectStorage( /// NOLINT
5959
url, std::move(hdfs_settings), context->getConfigRef(), /* lazy_initialize */true);
6060
}
6161

62-
std::string StorageHDFSConfiguration::getPathWithoutGlobs() const
63-
{
64-
/// Unlike s3 and azure, which are object storages,
65-
/// hdfs is a filesystem, so it cannot list files by partual prefix,
66-
/// only by directory.
67-
auto first_glob_pos = path.find_first_of("*?{");
68-
auto end_of_path_without_globs = path.substr(0, first_glob_pos).rfind('/');
69-
if (end_of_path_without_globs == std::string::npos || end_of_path_without_globs == 0)
70-
return "/";
71-
return path.substr(0, end_of_path_without_globs);
72-
}
73-
7462
StorageObjectStorage::QuerySettings StorageHDFSConfiguration::getQuerySettings(const ContextPtr & context) const
7563
{
7664
const auto & settings = context->getSettingsRef();

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,20 @@ bool StorageObjectStorage::Configuration::update( ///NOLINT
280280
return true;
281281
}
282282

283+
void StorageObjectStorage::Configuration::initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context)
284+
{
285+
286+
partition_strategy = PartitionStrategyFactory::get(
287+
partition_strategy_type,
288+
partition_by,
289+
columns.getOrdinary(),
290+
context,
291+
format,
292+
getRawPath().withGlobs(),
293+
getRawPath().withPartitionWildcard(),
294+
partition_columns_in_data_file);
295+
}
296+
283297
IDataLakeMetadata * StorageObjectStorage::getExternalMetadata(ContextPtr query_context)
284298
{
285299
configuration->update(
@@ -454,7 +468,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter
454468
auto context = getContext();
455469
iterator_wrapper = StorageObjectStorageSource::createFileIterator(
456470
configuration, configuration->getQuerySettings(context), object_storage, distributed_processing,
457-
context, predicate, filter_actions_dag, virtual_columns, nullptr, context->getFileProgressCallback());
471+
context, predicate, filter_actions_dag, virtual_columns, info.hive_partition_columns_to_read_from_file_path, nullptr, context->getFileProgressCallback());
458472
}
459473
};
460474
}
@@ -464,9 +478,10 @@ ReadFromFormatInfo StorageObjectStorage::Configuration::prepareReadingFromFormat
464478
const Strings & requested_columns,
465479
const StorageSnapshotPtr & storage_snapshot,
466480
bool supports_subset_of_columns,
467-
ContextPtr local_context)
481+
ContextPtr local_context,
482+
const PrepareReadingFromFormatHiveParams & hive_params)
468483
{
469-
return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns);
484+
return DB::prepareReadingFromFormat(requested_columns, storage_snapshot, local_context, supports_subset_of_columns, hive_params);
470485
}
471486

472487
std::optional<ColumnsDescription> StorageObjectStorage::Configuration::tryGetTableStructureFromMetadata() const
@@ -752,7 +767,7 @@ void StorageObjectStorage::Configuration::initialize(
752767
{
753768
configuration_to_initialize.format
754769
= FormatFactory::instance()
755-
.tryGetFormatFromFileName(configuration_to_initialize.isArchive() ? configuration_to_initialize.getPathInArchive() : configuration_to_initialize.getPath())
770+
.tryGetFormatFromFileName(configuration_to_initialize.isArchive() ? configuration_to_initialize.getPathInArchive() : configuration_to_initialize.getRawPath().path)
756771
.value_or("auto");
757772
}
758773
}
@@ -767,33 +782,65 @@ void StorageObjectStorage::Configuration::check(ContextPtr) const
767782
FormatFactory::instance().checkFormatName(format);
768783
}
769784

770-
bool StorageObjectStorage::Configuration::withPartitionWildcard() const
785+
StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForRead() const
786+
{
787+
auto raw_path = getRawPath();
788+
789+
if (!partition_strategy)
790+
{
791+
return raw_path;
792+
}
793+
794+
return Path {partition_strategy->getPathForRead(raw_path.path)};
795+
}
796+
797+
StorageObjectStorage::Configuration::Path StorageObjectStorage::Configuration::getPathForWrite(const std::string & partition_id) const
798+
{
799+
auto raw_path = getRawPath();
800+
801+
if (!partition_strategy)
802+
{
803+
return raw_path;
804+
}
805+
806+
return Path {partition_strategy->getPathForWrite(raw_path.path, partition_id)};
807+
}
808+
809+
bool StorageObjectStorage::Configuration::Path::withPartitionWildcard() const
771810
{
772811
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
773-
return getPath().find(PARTITION_ID_WILDCARD) != String::npos
774-
|| getNamespace().find(PARTITION_ID_WILDCARD) != String::npos;
812+
return path.find(PARTITION_ID_WILDCARD) != String::npos;
775813
}
776814

777-
bool StorageObjectStorage::Configuration::withGlobsIgnorePartitionWildcard() const
815+
bool StorageObjectStorage::Configuration::Path::withGlobsIgnorePartitionWildcard() const
778816
{
779817
if (!withPartitionWildcard())
780818
return withGlobs();
781-
return PartitionedSink::replaceWildcards(getPath(), "").find_first_of("*?{") != std::string::npos;
819+
return PartitionedSink::replaceWildcards(path, "").find_first_of("*?{") != std::string::npos;
782820
}
783821

784-
bool StorageObjectStorage::Configuration::isPathWithGlobs() const
822+
bool StorageObjectStorage::Configuration::Path::withGlobs() const
785823
{
786-
return getPath().find_first_of("*?{") != std::string::npos;
824+
return path.find_first_of("*?{") != std::string::npos;
787825
}
788826

789-
bool StorageObjectStorage::Configuration::isNamespaceWithGlobs() const
827+
std::string StorageObjectStorage::Configuration::Path::getWithoutGlobs() const
790828
{
791-
return getNamespace().find_first_of("*?{") != std::string::npos;
829+
if (allow_partial_prefix)
830+
{
831+
return path.substr(0, path.find_first_of("*?{"));
832+
}
833+
834+
auto first_glob_pos = path.find_first_of("*?{");
835+
auto end_of_path_without_globs = path.substr(0, first_glob_pos).rfind('/');
836+
if (end_of_path_without_globs == std::string::npos || end_of_path_without_globs == 0)
837+
return "/";
838+
return path.substr(0, end_of_path_without_globs);
792839
}
793840

794-
std::string StorageObjectStorage::Configuration::getPathWithoutGlobs() const
841+
bool StorageObjectStorage::Configuration::isNamespaceWithGlobs() const
795842
{
796-
return getPath().substr(0, getPath().find_first_of("*?{"));
843+
return getNamespace().find_first_of("*?{") != std::string::npos;
797844
}
798845

799846
bool StorageObjectStorage::Configuration::isPathInArchiveWithGlobs() const
@@ -803,7 +850,7 @@ bool StorageObjectStorage::Configuration::isPathInArchiveWithGlobs() const
803850

804851
std::string StorageObjectStorage::Configuration::getPathInArchive() const
805852
{
806-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not archive", getPath());
853+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Path {} is not archive", getRawPath().path);
807854
}
808855

809856
void StorageObjectStorage::Configuration::assertInitialized() const

src/Storages/ObjectStorage/StorageObjectStorage.h

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,24 @@ class StorageObjectStorage::Configuration
187187
Configuration() = default;
188188
virtual ~Configuration() = default;
189189

190-
using Path = std::string;
190+
struct Path
191+
{
192+
Path() = default;
193+
/// A partial prefix is a prefix that does not represent an actual object (directory or file), usually strings that do not end with a slash character.
194+
/// Example: `table_root/year=20`. AWS S3 supports partial prefixes, but HDFS does not.
195+
Path(const std::string & path_, bool allow_partial_prefix_ = true) : path(path_), allow_partial_prefix(allow_partial_prefix_) {} /// NOLINT(google-explicit-constructor)
196+
197+
std::string path;
198+
199+
bool withPartitionWildcard() const;
200+
bool withGlobsIgnorePartitionWildcard() const;
201+
bool withGlobs() const;
202+
std::string getWithoutGlobs() const;
203+
204+
private:
205+
bool allow_partial_prefix;
206+
};
207+
191208
using Paths = std::vector<Path>;
192209

193210
/// Initialize configuration from either AST or NamedCollection.
@@ -206,10 +223,20 @@ class StorageObjectStorage::Configuration
206223
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
207224
virtual std::string getNamespaceType() const { return "namespace"; }
208225

209-
virtual Path getFullPath() const { return ""; }
210-
virtual Path getPath() const = 0;
211-
virtual void setPath(const Path & path) = 0;
226+
// Path provided by the user in the query
227+
virtual Path getRawPath() const = 0;
228+
// Path used for reading, it is usually a globbed path like `'table_root/**.parquet'
229+
Path getPathForRead() const;
230+
// Path used for writing, it should not be globbed and might contain a partition key
231+
Path getPathForWrite(const std::string & partition_id = "") const;
232+
233+
virtual void setRawPath(const Path & path) = 0;
212234

235+
/*
236+
* When using `s3_create_new_file_on_insert`, each new file path generated will be appended to the path list.
237+
* This list is used to determine the next file name and the set of files that shall be read from remote storage.
238+
* This is not ideal, there are much better ways to implement reads and writes. It should be eventually removed
239+
*/
213240
virtual const Paths & getPaths() const = 0;
214241
virtual void setPaths(const Paths & paths) = 0;
215242

@@ -222,12 +249,7 @@ class StorageObjectStorage::Configuration
222249
virtual void addStructureAndFormatToArgsIfNeeded(
223250
ASTs & args, const String & structure_, const String & format_, ContextPtr context, bool with_structure) = 0;
224251

225-
bool withPartitionWildcard() const;
226-
bool withGlobs() const { return isPathWithGlobs() || isNamespaceWithGlobs(); }
227-
bool withGlobsIgnorePartitionWildcard() const;
228-
bool isPathWithGlobs() const;
229252
bool isNamespaceWithGlobs() const;
230-
virtual std::string getPathWithoutGlobs() const;
231253

232254
virtual bool isArchive() const { return false; }
233255
bool isPathInArchiveWithGlobs() const;
@@ -259,7 +281,10 @@ class StorageObjectStorage::Configuration
259281
const Strings & requested_columns,
260282
const StorageSnapshotPtr & storage_snapshot,
261283
bool supports_subset_of_columns,
262-
ContextPtr local_context);
284+
ContextPtr local_context,
285+
const PrepareReadingFromFormatHiveParams & hive_params);
286+
287+
void initPartitionStrategy(ASTPtr partition_by, const ColumnsDescription & columns, ContextPtr context);
263288

264289
virtual std::optional<ColumnsDescription> tryGetTableStructureFromMetadata() const;
265290

@@ -290,6 +315,11 @@ class StorageObjectStorage::Configuration
290315
String format = "auto";
291316
String compression_method = "auto";
292317
String structure = "auto";
318+
PartitionStrategyFactory::StrategyType partition_strategy_type = PartitionStrategyFactory::StrategyType::WILDCARD;
319+
/// Whether partition column values are contained in the actual data.
320+
/// And alternative is with hive partitioning, when they are contained in file path.
321+
bool partition_columns_in_data_file = true;
322+
std::shared_ptr<IPartitionStrategy> partition_strategy;
293323

294324
protected:
295325
virtual void fromNamedCollection(const NamedCollection & collection, ContextPtr context) = 0;

src/Storages/ObjectStorage/StorageObjectStorageSink.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ namespace
3838
PartitionedSink::validatePartitionKey(str, true);
3939
}
4040

41-
void validateNamespace(const String & str, StorageObjectStorageConfigurationPtr configuration)
41+
void validateNamespace(const String & str, StorageObjectStorage::ConfigurationPtr configuration)
4242
{
4343
configuration->validateNamespace(str);
4444

@@ -167,7 +167,7 @@ SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String
167167
object_storage,
168168
configuration,
169169
format_settings,
170-
std::make_shared<Block>(partition_strategy->getFormatHeader()),
170+
partition_strategy->getFormatHeader(),
171171
context
172172
);
173173
}

src/Storages/PartitionedSink.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ PartitionedSink::PartitionedSink(
3131
: SinkToStorage(sample_block_)
3232
, partition_strategy(partition_strategy_)
3333
, context(context_)
34-
, source_header(source_header_)
34+
, sample_block(sample_block_)
3535
{
3636
}
3737

src/Storages/PartitionedSink.h

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,10 @@ class PartitionedSink : public SinkToStorage
1919
public:
2020
static constexpr auto PARTITION_ID_WILDCARD = "{_partition_id}";
2121

22-
<<<<<<< HEAD
23-
PartitionedSink(const ASTPtr & partition_by, ContextPtr context_, const Block & sample_block_);
24-
=======
2522
PartitionedSink(
2623
std::shared_ptr<IPartitionStrategy> partition_strategy_,
2724
ContextPtr context_,
28-
SharedHeader source_header_);
29-
>>>>>>> 790e1be4c1e (Add Hive-style S3 partitioned reads/writes)
25+
const Block & sample_block_);
3026

3127
~PartitionedSink() override;
3228

@@ -49,15 +45,7 @@ class PartitionedSink : public SinkToStorage
4945

5046
private:
5147
ContextPtr context;
52-
<<<<<<< HEAD
5348
Block sample_block;
54-
55-
ExpressionActionsPtr partition_by_expr;
56-
String partition_by_column_name;
57-
=======
58-
SharedHeader source_header;
59-
>>>>>>> 790e1be4c1e (Add Hive-style S3 partitioned reads/writes)
60-
6149
absl::flat_hash_map<StringRef, SinkPtr> partition_id_to_sink;
6250
HashMapWithSavedHash<StringRef, size_t> partition_id_to_chunk_index;
6351
IColumn::Selector chunk_row_index_to_partition_index;

0 commit comments

Comments
 (0)