Skip to content

Commit f7d45ce

Browse files
authored
Merge pull request ClickHouse#77671 from azat/faster-object-queues
Improve performance of S3Queue/AzureQueue by allowing INSERTs data in parallel
2 parents 111e630 + c11d6c3 commit f7d45ce

File tree

10 files changed

+381
-59
lines changed

10 files changed

+381
-59
lines changed

docs/en/engines/table-engines/integrations/azure-queue.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ For more information about virtual columns see [here](../../../engines/table-eng
8282

8383
## Introspection {#introspection}
8484

85-
Enable logging for the table via the table setting `enable_logging_to_s3queue_log=1`.
85+
Enable logging for the table via the table setting `enable_logging_to_queue_log=1`.
8686

8787
Introspection capabilities are the same as the [S3Queue table engine](/engines/table-engines/integrations/s3queue#introspection) with several distinct differences:
8888

docs/en/engines/table-engines/integrations/s3queue.md

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,24 @@ CREATE TABLE s3_queue_engine_table (name String, value UInt32)
2424
[after_processing = 'keep',]
2525
[keeper_path = '',]
2626
[loading_retries = 0,]
27-
[processing_threads_num = 1,]
28-
[enable_logging_to_s3queue_log = 0,]
27+
[processing_threads_num = 16,]
28+
[parallel_inserts = false,]
29+
[enable_logging_to_queue_log = true,]
30+
[last_processed_path = "",]
31+
[tracked_files_limit = 1000,]
32+
[tracked_file_ttl_sec = 0,]
2933
[polling_min_timeout_ms = 1000,]
3034
[polling_max_timeout_ms = 10000,]
3135
[polling_backoff_ms = 0,]
32-
[tracked_file_ttl_sec = 0,]
33-
[tracked_files_limit = 1000,]
3436
[cleanup_interval_min_ms = 10000,]
3537
[cleanup_interval_max_ms = 30000,]
38+
[buckets = 0,]
39+
[list_objects_batch_size = 1000,]
40+
[enable_hash_ring_filtering = 0,]
41+
[max_processed_files_before_commit = 100,]
42+
[max_processed_rows_before_commit = 0,]
43+
[max_processed_bytes_before_commit = 0,]
44+
[max_processing_time_sec_before_commit = 0,]
3645
```
3746

3847
:::warning
@@ -118,7 +127,18 @@ Default value: `0`.
118127

119128
Number of threads to perform processing. Applies only for `Unordered` mode.
120129

121-
Default value: `1`.
130+
Default value: Number of CPUs or 16.
131+
132+
### s3queue_parallel_inserts {#parallel_inserts}
133+
134+
By default `processing_threads_num` will produce one `INSERT`, so it will only download files and parse in multiple threads.
135+
But this limits the parallelism, so for better throughput use `parallel_inserts=true`, this will allow to insert data in parallel (but keep in mind that it will result in higher number of generated data parts for MergeTree family).
136+
137+
:::note
138+
`INSERT`s will be spawned with respect to `max_process*_before_commit` settings.
139+
:::
140+
141+
Default value: `false`.
122142

123143
### s3queue_enable_logging_to_s3queue_log {#enable_logging_to_s3queue_log}
124144

src/Storages/ObjectStorageQueue/ObjectStorageQueueSettings.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ namespace ErrorCodes
2626
DECLARE(ObjectStorageQueueAction, after_processing, ObjectStorageQueueAction::KEEP, "Delete or keep file in after successful processing", 0) \
2727
DECLARE(String, keeper_path, "", "Zookeeper node path", 0) \
2828
DECLARE(UInt64, loading_retries, 10, "Retry loading up to specified number of times", 0) \
29-
DECLARE(UInt64, processing_threads_num, 1, "Number of processing threads", 0) \
29+
DECLARE(UInt64, processing_threads_num, 1, "Number of processing threads (default number of available CPUs or 16)", 0) \
30+
DECLARE(Bool, parallel_inserts, false, "By default processing_threads_num will produce one INSERT, but this limits the parallel execution, so better scalability enable this setting (note this will create not one INSERT per file, but one per max_process*_before_commit)", 0) \
3031
DECLARE(UInt32, enable_logging_to_queue_log, 1, "Enable logging to system table system.(s3/azure_)queue_log", 0) \
3132
DECLARE(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
3233
DECLARE(UInt64, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
@@ -37,12 +38,12 @@ namespace ErrorCodes
3738
DECLARE(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
3839
DECLARE(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
3940
DECLARE(UInt64, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
40-
DECLARE(UInt64, list_objects_batch_size, 1000, "Size of a list batcn in object storage", 0) \
41+
DECLARE(UInt64, list_objects_batch_size, 1000, "Size of a list batch in object storage", 0) \
4142
DECLARE(Bool, enable_hash_ring_filtering, 0, "Enable filtering files among replicas according to hash ring for Unordered mode", 0) \
42-
DECLARE(UInt64, max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
43-
DECLARE(UInt64, max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
44-
DECLARE(UInt64, max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
45-
DECLARE(UInt64, max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
43+
DECLARE(UInt64, max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper (in case of parallel_inserts=true, works on a per-thread basis)", 0) \
44+
DECLARE(UInt64, max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper (in case of parallel_inserts=true, works on a per-thread basis)", 0) \
45+
DECLARE(UInt64, max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper (in case of parallel_inserts=true, works on a per-thread basis)", 0) \
46+
DECLARE(UInt64, max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper (in case of parallel_inserts=true, works on a per-thread basis)", 0) \
4647

4748
#define LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS(M, ALIAS) \
4849
OBJECT_STORAGE_QUEUE_RELATED_SETTINGS(M, ALIAS) \

src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,18 @@ ObjectStorageQueueSource::FileIterator::FileIterator(
129129
}
130130
}
131131

132-
bool ObjectStorageQueueSource::FileIterator::isFinished() const
132+
bool ObjectStorageQueueSource::FileIterator::isFinished()
133133
{
134-
LOG_TEST(log, "Iterator finished: {}, objects to retry: {}", iterator_finished.load(), objects_to_retry.size());
135-
return iterator_finished
136-
&& std::all_of(listed_keys_cache.begin(), listed_keys_cache.end(), [](const auto & v) { return v.second.keys.empty(); })
137-
&& objects_to_retry.empty();
134+
std::lock_guard lock(mutex);
135+
LOG_TEST(log, "Iterator finished: {}, objects to retry: {}", iterator_finished.load(), objects_to_retry.size());
136+
return iterator_finished
137+
&& std::all_of(listed_keys_cache.begin(), listed_keys_cache.end(), [](const auto & v) { return v.second.keys.empty(); })
138+
&& objects_to_retry.empty();
138139
}
139140

140141
size_t ObjectStorageQueueSource::FileIterator::estimatedKeysCount()
141142
{
143+
std::lock_guard lock(next_mutex);
142144
/// Copied from StorageObjectStorageSource::estimateKeysCount().
143145
if (object_infos.empty() && !is_finished && object_storage_iterator->isValid())
144146
return std::numeric_limits<size_t>::max();
@@ -456,6 +458,7 @@ void ObjectStorageQueueSource::FileIterator::returnForRetry(ObjectInfoPtr object
456458

457459
void ObjectStorageQueueSource::FileIterator::releaseFinishedBuckets()
458460
{
461+
std::lock_guard lock(mutex);
459462
for (const auto & [processor, holders] : bucket_holders)
460463
{
461464
LOG_TEST(log, "Releasing {} bucket holders for processor {}", holders.size(), processor);
@@ -870,7 +873,6 @@ Chunk ObjectStorageQueueSource::generateImpl()
870873

871874
if (commit_settings.max_processed_files_before_commit)
872875
{
873-
std::lock_guard lock(progress->processed_files_mutex);
874876
if (progress->processed_files.load(std::memory_order_relaxed) >= commit_settings.max_processed_files_before_commit)
875877
{
876878
LOG_TRACE(log, "Number of max processed files before commit reached "

src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.h

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
88
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
99
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
10+
#include <base/defines.h>
1011
#include <Common/ZooKeeper/ZooKeeper.h>
1112

1213

@@ -52,7 +53,7 @@ class ObjectStorageQueueSource : public ISource, WithContext
5253
bool file_deletion_on_processed_enabled_,
5354
std::atomic<bool> & shutdown_called_);
5455

55-
bool isFinished() const;
56+
bool isFinished();
5657

5758
ObjectInfoPtr next(size_t processor) override;
5859

@@ -86,7 +87,7 @@ class ObjectStorageQueueSource : public ISource, WithContext
8687
ExpressionActionsPtr filter_expr;
8788
bool recursive{false};
8889

89-
Source::ObjectInfos object_infos;
90+
Source::ObjectInfos object_infos TSA_GUARDED_BY(next_mutex);
9091
std::vector<FileMetadataPtr> file_metadatas;
9192
bool is_finished = false;
9293
std::mutex next_mutex;
@@ -106,24 +107,24 @@ class ObjectStorageQueueSource : public ISource, WithContext
106107
std::optional<Processor> processor;
107108
};
108109
/// A cache of keys which were iterated via glob_iterator, but not taken for processing.
109-
std::unordered_map<Bucket, ListedKeys> listed_keys_cache;
110+
std::unordered_map<Bucket, ListedKeys> listed_keys_cache TSA_GUARDED_BY(mutex);
110111

111112
/// We store a vector of holders, because we cannot release them until processed files are committed.
112-
std::unordered_map<size_t, std::vector<BucketHolderPtr>> bucket_holders;
113+
std::unordered_map<size_t, std::vector<BucketHolderPtr>> bucket_holders TSA_GUARDED_BY(mutex);
113114

114115
/// Is glob_iterator finished?
115116
std::atomic_bool iterator_finished = false;
116117

117118
/// Only for processing without buckets.
118-
std::deque<std::pair<ObjectInfoPtr, FileMetadataPtr>> objects_to_retry;
119+
std::deque<std::pair<ObjectInfoPtr, FileMetadataPtr>> objects_to_retry TSA_GUARDED_BY(mutex);
119120

120121
struct NextKeyFromBucket
121122
{
122123
ObjectInfoPtr object_info;
123124
FileMetadataPtr file_metadata;
124125
ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info;
125126
};
126-
NextKeyFromBucket getNextKeyFromAcquiredBucket(size_t processor);
127+
NextKeyFromBucket getNextKeyFromAcquiredBucket(size_t processor) TSA_REQUIRES(mutex);
127128
bool hasKeysForProcessor(const Processor & processor) const;
128129
};
129130

@@ -141,8 +142,6 @@ class ObjectStorageQueueSource : public ISource, WithContext
141142
std::atomic<size_t> processed_rows = 0;
142143
std::atomic<size_t> processed_bytes = 0;
143144
Stopwatch elapsed_time{CLOCK_MONOTONIC_COARSE};
144-
145-
std::mutex processed_files_mutex;
146145
};
147146
using ProcessingProgressPtr = std::shared_ptr<ProcessingProgress>;
148147

src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ struct ObjectStorageQueueTableMetadata
2828
std::atomic<ObjectStorageQueueAction> after_processing;
2929
std::atomic<UInt64> loading_retries;
3030
std::atomic<UInt64> processing_threads_num;
31+
std::atomic<bool> parallel_inserts;
3132
std::atomic<UInt64> tracked_files_limit;
3233
std::atomic<UInt64> tracked_files_ttl_sec;
3334
std::atomic<UInt64> buckets;
@@ -47,6 +48,7 @@ struct ObjectStorageQueueTableMetadata
4748
, after_processing(other.after_processing.load())
4849
, loading_retries(other.loading_retries.load())
4950
, processing_threads_num(other.processing_threads_num.load())
51+
, parallel_inserts(other.parallel_inserts.load())
5052
, tracked_files_limit(other.tracked_files_limit.load())
5153
, tracked_files_ttl_sec(other.tracked_files_ttl_sec.load())
5254
, buckets(other.buckets.load())
@@ -89,6 +91,7 @@ struct ObjectStorageQueueTableMetadata
8991
"after_processing",
9092
"loading_retries",
9193
"processing_threads_num",
94+
"parallel_inserts",
9295
"tracked_files_limit",
9396
"tracked_file_ttl_sec",
9497
"tracked_files_ttl_sec",

0 commit comments

Comments
 (0)