Skip to content

Commit d4da5f3

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #866 from Altinity/feature/cache_locality_lock
Antalya 25.3: lock_object_storage_task_distribution_ms setting
1 parent 83bf85c commit d4da5f3

File tree

9 files changed

+215
-53
lines changed

9 files changed

+215
-53
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6860,6 +6860,9 @@ Default number of tasks for parallel reading in distributed query. Tasks are spr
68606860
DECLARE(Bool, distributed_plan_optimize_exchanges, true, R"(
68616861
Removes unnecessary exchanges in distributed query plan. Disable it for debugging.
68626862
)", 0) \
6863+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6864+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
6865+
)", EXPERIMENTAL) \
68636866
DECLARE(String, distributed_plan_force_exchange_kind, "", R"(
68646867
Force specified kind of Exchange operators between distributed query stages.
68656868

src/Core/SettingsChangesHistory.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6767
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
6868
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
6969
/// Note: please check if the key already exists to prevent duplicate entries.
70+
addSettingsChanges(settings_changes_history, "25.6.5.20000",
71+
{
72+
// Altinity Antalya modifications atop of 25.6
73+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
74+
});
7075
addSettingsChanges(settings_changes_history, "25.6",
7176
{
7277
{"output_format_native_use_flattened_dynamic_and_json_serialization", false, false, "Add flattened Dynamic/JSON serializations to Native format"},

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -97,4 +101,36 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
97101
return write_settings;
98102
}
99103

104+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
105+
{
106+
Poco::JSON::Parser parser;
107+
try
108+
{
109+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
110+
if (!json)
111+
return;
112+
113+
successfully_parsed = true;
114+
115+
if (json->has("retry_after_us"))
116+
retry_after_us = json->getValue<size_t>("retry_after_us");
117+
}
118+
catch (const Poco::JSON::JSONException &)
119+
{ /// Not a JSON
120+
return;
121+
}
122+
}
123+
124+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
125+
{
126+
Poco::JSON::Object json;
127+
if (retry_after_us.has_value())
128+
json.set("retry_after_us", retry_after_us.value());
129+
130+
std::ostringstream oss;
131+
oss.exceptions(std::ios::failbit);
132+
Poco::JSON::Stringifier::stringify(json, oss);
133+
return oss.str();
134+
}
135+
100136
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,37 @@ struct ObjectMetadata
8383

8484
struct RelativePathWithMetadata
8585
{
86+
class CommandInTaskResponse
87+
{
88+
public:
89+
CommandInTaskResponse() {}
90+
CommandInTaskResponse(const std::string & task);
91+
92+
bool is_parsed() const { return successfully_parsed; }
93+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
94+
95+
std::string to_string() const;
96+
97+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
98+
99+
private:
100+
bool successfully_parsed = false;
101+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
102+
};
103+
86104
String relative_path;
87105
std::optional<ObjectMetadata> metadata;
106+
CommandInTaskResponse command;
88107

89108
RelativePathWithMetadata() = default;
90109

91-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
92-
: relative_path(std::move(relative_path_))
93-
, metadata(std::move(metadata_))
94-
{}
110+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
111+
: metadata(std::move(metadata_))
112+
, command(task_string)
113+
{
114+
if (!command.is_parsed())
115+
relative_path = task_string;
116+
}
95117

96118
virtual ~RelativePathWithMetadata() = default;
97119

@@ -100,6 +122,8 @@ struct RelativePathWithMetadata
100122
virtual bool isArchive() const { return false; }
101123
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
102124
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
125+
126+
const CommandInTaskResponse & getCommand() const { return command; }
103127
};
104128

105129
struct ObjectKeyWithMetadata

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@ namespace DB
2323
namespace Setting
2424
{
2525
extern const SettingsBool use_hive_partitioning;
26+
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
2627
}
2728

2829
namespace ErrorCodes
2930
{
3031
extern const int LOGICAL_ERROR;
3132
extern const int INCORRECT_DATA;
33+
extern const int INVALID_SETTING_VALUE;
3234
}
3335

3436
String StorageObjectStorageCluster::getPathSample(ContextPtr context)
@@ -234,7 +236,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
234236
}
235237
}
236238

237-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
239+
uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];
240+
241+
/// Check value to avoid negative result after conversion in microseconds.
242+
/// Poco::Timestamp::TimeDiff is signed int 64.
243+
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
244+
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
245+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
246+
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
247+
lock_object_storage_task_distribution_ms,
248+
lock_object_storage_task_distribution_ms_max
249+
);
250+
251+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
252+
iterator,
253+
ids_of_hosts,
254+
lock_object_storage_task_distribution_ms);
238255

239256
auto callback = std::make_shared<TaskIterator>(
240257
[task_distributor](size_t number_of_current_replica) mutable -> String

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <QueryPipeline/QueryPipelineBuilder.h>
2020
#include <Storages/Cache/SchemaCache.h>
2121
#include <Storages/ObjectStorage/StorageObjectStorage.h>
22+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
2223
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
2324
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
2425
#include <Storages/VirtualColumnUtils.h>
@@ -432,11 +433,31 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
432433
ObjectInfoPtr object_info;
433434
auto query_settings = configuration->getQuerySettings(context_);
434435

436+
bool not_a_path = false;
437+
435438
do
436439
{
440+
not_a_path = false;
437441
object_info = file_iterator->next(processor);
438442

439-
if (!object_info || object_info->getPath().empty())
443+
if (!object_info)
444+
return {};
445+
446+
if (object_info->getCommand().is_parsed())
447+
{
448+
auto retry_after_us = object_info->getCommand().get_retry_after_us();
449+
if (retry_after_us.has_value())
450+
{
451+
not_a_path = true;
452+
/// TODO: Make asyncronous waiting without sleep in thread
453+
/// Now this sleep is on executor node in worker thread
454+
/// Does not block query initiator
455+
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
456+
continue;
457+
}
458+
}
459+
460+
if (object_info->getPath().empty())
440461
return {};
441462

442463
if (!object_info->metadata)
@@ -455,7 +476,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
455476
object_info->metadata = object_storage->getObjectMetadata(path);
456477
}
457478
}
458-
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
479+
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));
459480

460481
QueryPipelineBuilder builder;
461482
std::shared_ptr<ISource> source;

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@ namespace ErrorCodes
1313

1414
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1515
std::shared_ptr<IObjectIterator> iterator_,
16-
std::vector<std::string> ids_of_nodes_)
16+
std::vector<std::string> ids_of_nodes_,
17+
uint64_t lock_object_storage_task_distribution_ms_)
1718
: iterator(std::move(iterator_))
1819
, connection_to_files(ids_of_nodes_.size())
1920
, ids_of_nodes(ids_of_nodes_)
21+
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
2022
, iterator_exhausted(false)
2123
{
2224
}
@@ -32,6 +34,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
3234
number_of_current_replica,
3335
connection_to_files.size() - 1);
3436

37+
saveLastNodeActivity(number_of_current_replica);
38+
3539
// 1. Check pre-queued files first
3640
if (auto file = getPreQueuedFile(number_of_current_replica))
3741
return file;
@@ -156,7 +160,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
156160
// Queue file for its assigned replica
157161
{
158162
std::lock_guard lock(mutex);
159-
unprocessed_files.insert(file_path);
163+
unprocessed_files[file_path] = number_of_current_replica;
160164
connection_to_files[file_replica_idx].push_back(file_path);
161165
}
162166
}
@@ -166,25 +170,64 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
166170

167171
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
168172
{
173+
/// Limit time of node activity to keep task in queue
174+
Poco::Timestamp activity_limit;
175+
Poco::Timestamp oldest_activity;
176+
if (lock_object_storage_task_distribution_us > 0)
177+
activity_limit -= lock_object_storage_task_distribution_us;
178+
169179
std::lock_guard lock(mutex);
170180

171181
if (!unprocessed_files.empty())
172182
{
173183
auto it = unprocessed_files.begin();
174-
String next_file = *it;
175-
unprocessed_files.erase(it);
184+
185+
while (it != unprocessed_files.end())
186+
{
187+
auto last_activity = last_node_activity.find(it->second);
188+
if (lock_object_storage_task_distribution_us <= 0
189+
|| last_activity == last_node_activity.end()
190+
|| activity_limit > last_activity->second)
191+
{
192+
String next_file = it->first;
193+
unprocessed_files.erase(it);
194+
195+
LOG_TRACE(
196+
log,
197+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
198+
next_file,
199+
number_of_current_replica
200+
);
201+
202+
return next_file;
203+
}
204+
205+
oldest_activity = std::min(oldest_activity, last_activity->second);
206+
++it;
207+
}
176208

177209
LOG_TRACE(
178210
log,
179-
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
180-
next_file,
181-
number_of_current_replica
211+
"No unprocessed file for replica {}, need to retry after {} us",
212+
number_of_current_replica,
213+
oldest_activity - activity_limit
182214
);
183215

184-
return next_file;
216+
/// All unprocessed files owned by alive replicas with recenlty activity
217+
/// Need to retry after (oldest_activity - activity_limit) microseconds
218+
RelativePathWithMetadata::CommandInTaskResponse response;
219+
response.set_retry_after_us(oldest_activity - activity_limit);
220+
return response.to_string();
185221
}
186222

187223
return std::nullopt;
188224
}
189225

226+
void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
227+
{
228+
Poco::Timestamp now;
229+
std::lock_guard lock(mutex);
230+
last_node_activity[number_of_current_replica] = now;
231+
}
232+
190233
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
#include <Interpreters/Cluster.h>
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
77
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
8+
9+
#include <Poco/Timestamp.h>
10+
811
#include <unordered_set>
12+
#include <unordered_map>
913
#include <vector>
1014
#include <mutex>
1115
#include <memory>
@@ -18,7 +22,8 @@ class StorageObjectStorageStableTaskDistributor
1822
public:
1923
StorageObjectStorageStableTaskDistributor(
2024
std::shared_ptr<IObjectIterator> iterator_,
21-
std::vector<std::string> ids_of_nodes_);
25+
std::vector<std::string> ids_of_nodes_,
26+
uint64_t lock_object_storage_task_distribution_ms_);
2227

2328
std::optional<String> getNextTask(size_t number_of_current_replica);
2429

@@ -28,12 +33,17 @@ class StorageObjectStorageStableTaskDistributor
2833
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
2934
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);
3035

36+
void saveLastNodeActivity(size_t number_of_current_replica);
37+
3138
std::shared_ptr<IObjectIterator> iterator;
3239

3340
std::vector<std::vector<String>> connection_to_files;
34-
std::unordered_set<String> unprocessed_files;
41+
/// Map of unprocessed files in format filename => number of prefetched replica
42+
std::unordered_map<String, size_t> unprocessed_files;
3543

3644
std::vector<std::string> ids_of_nodes;
45+
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
46+
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;
3747

3848
std::mutex mutex;
3949
bool iterator_exhausted = false;

0 commit comments

Comments
 (0)