Skip to content

Commit dc82819

Browse files
authored
Merge pull request #866 from Altinity/feature/cache_locality_lock
Antalya 25.3: lock_object_storage_task_distribution_ms setting
2 parents 3635c44 + d1f4b4c commit dc82819

File tree

9 files changed

+215
-53
lines changed

9 files changed

+215
-53
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage
61416141
)", EXPERIMENTAL) \
61426142
DECLARE(Bool, object_storage_remote_initiator, false, R"(
61436143
Execute request to object storage as remote on one of object_storage_cluster nodes.
6144+
)", EXPERIMENTAL) \
6145+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6146+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
61446147
)", EXPERIMENTAL) \
61456148
\
61466149

src/Core/SettingsChangesHistory.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6666
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
6767
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
6868
/// Note: please check if the key already exists to prevent duplicate entries.
69+
addSettingsChanges(settings_changes_history, "25.3.3.20000",
70+
{
71+
// Altinity Antalya modifications atop of 25.3
72+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
73+
});
6974
addSettingsChanges(settings_changes_history, "25.2.1.20000",
7075
{
7176
// Altinity Antalya modifications atop of 25.2

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -107,4 +111,36 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage)
107111
}
108112
}
109113

114+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
115+
{
116+
Poco::JSON::Parser parser;
117+
try
118+
{
119+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
120+
if (!json)
121+
return;
122+
123+
successfully_parsed = true;
124+
125+
if (json->has("retry_after_us"))
126+
retry_after_us = json->getValue<size_t>("retry_after_us");
127+
}
128+
catch (const Poco::JSON::JSONException &)
129+
{ /// Not a JSON
130+
return;
131+
}
132+
}
133+
134+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
135+
{
136+
Poco::JSON::Object json;
137+
if (retry_after_us.has_value())
138+
json.set("retry_after_us", retry_after_us.value());
139+
140+
std::ostringstream oss;
141+
oss.exceptions(std::ios::failbit);
142+
Poco::JSON::Stringifier::stringify(json, oss);
143+
return oss.str();
144+
}
145+
110146
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,37 @@ struct ObjectMetadata
6666

6767
struct RelativePathWithMetadata
6868
{
69+
class CommandInTaskResponse
70+
{
71+
public:
72+
CommandInTaskResponse() {}
73+
CommandInTaskResponse(const std::string & task);
74+
75+
bool is_parsed() const { return successfully_parsed; }
76+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
77+
78+
std::string to_string() const;
79+
80+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
81+
82+
private:
83+
bool successfully_parsed = false;
84+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
85+
};
86+
6987
String relative_path;
7088
std::optional<ObjectMetadata> metadata;
89+
CommandInTaskResponse command;
7190

7291
RelativePathWithMetadata() = default;
7392

74-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
75-
: relative_path(std::move(relative_path_))
76-
, metadata(std::move(metadata_))
77-
{}
93+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
94+
: metadata(std::move(metadata_))
95+
, command(task_string)
96+
{
97+
if (!command.is_parsed())
98+
relative_path = task_string;
99+
}
78100

79101
virtual ~RelativePathWithMetadata() = default;
80102

@@ -85,6 +107,8 @@ struct RelativePathWithMetadata
85107
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
86108

87109
void loadMetadata(ObjectStoragePtr object_storage);
110+
111+
const CommandInTaskResponse & getCommand() const { return command; }
88112
};
89113

90114
struct ObjectKeyWithMetadata

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ namespace Setting
2727
{
2828
extern const SettingsBool use_hive_partitioning;
2929
extern const SettingsString object_storage_cluster;
30+
extern const SettingsUInt64 lock_object_storage_task_distribution_ms;
3031
}
3132

3233
namespace ErrorCodes
3334
{
3435
extern const int LOGICAL_ERROR;
3536
extern const int UNKNOWN_FUNCTION;
3637
extern const int NOT_IMPLEMENTED;
38+
extern const int INVALID_SETTING_VALUE;
3739
}
3840

3941

@@ -386,7 +388,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
386388
}
387389
}
388390

389-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
391+
uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];
392+
393+
/// Check value to avoid negative result after conversion in microseconds.
394+
/// Poco::Timestamp::TimeDiff is signed int 64.
395+
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
396+
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
397+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
398+
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
399+
lock_object_storage_task_distribution_ms,
400+
lock_object_storage_task_distribution_ms_max
401+
);
402+
403+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
404+
iterator,
405+
ids_of_hosts,
406+
lock_object_storage_task_distribution_ms);
390407

391408
auto callback = std::make_shared<TaskIterator>(
392409
[task_distributor](size_t number_of_current_replica) mutable -> String {

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <QueryPipeline/QueryPipelineBuilder.h>
2020
#include <Storages/Cache/SchemaCache.h>
2121
#include <Storages/ObjectStorage/StorageObjectStorage.h>
22+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
2223
#include <Storages/ObjectStorage/DataLakes/DeltaLake/ObjectInfoWithPartitionColumns.h>
2324
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
2425
#include <Storages/VirtualColumnUtils.h>
@@ -430,16 +431,36 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
430431
ObjectInfoPtr object_info;
431432
auto query_settings = configuration->getQuerySettings(context_);
432433

434+
bool not_a_path = false;
435+
433436
do
434437
{
438+
not_a_path = false;
435439
object_info = file_iterator->next(processor);
436440

437-
if (!object_info || object_info->getPath().empty())
441+
if (!object_info)
442+
return {};
443+
444+
if (object_info->getCommand().is_parsed())
445+
{
446+
auto retry_after_us = object_info->getCommand().get_retry_after_us();
447+
if (retry_after_us.has_value())
448+
{
449+
not_a_path = true;
450+
/// TODO: Make asyncronous waiting without sleep in thread
451+
/// Now this sleep is on executor node in worker thread
452+
/// Does not block query initiator
453+
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
454+
continue;
455+
}
456+
}
457+
458+
if (object_info->getPath().empty())
438459
return {};
439460

440461
object_info->loadMetadata(object_storage);
441462
}
442-
while (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0);
463+
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));
443464

444465
QueryPipelineBuilder builder;
445466
std::shared_ptr<ISource> source;

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ namespace DB
88

99
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1010
std::shared_ptr<IObjectIterator> iterator_,
11-
std::vector<std::string> ids_of_nodes_)
11+
std::vector<std::string> ids_of_nodes_,
12+
uint64_t lock_object_storage_task_distribution_ms_)
1213
: iterator(std::move(iterator_))
1314
, connection_to_files(ids_of_nodes_.size())
1415
, ids_of_nodes(ids_of_nodes_)
16+
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
1517
, iterator_exhausted(false)
1618
{
1719
}
@@ -24,6 +26,8 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
2426
number_of_current_replica
2527
);
2628

29+
saveLastNodeActivity(number_of_current_replica);
30+
2731
// 1. Check pre-queued files first
2832
if (auto file = getPreQueuedFile(number_of_current_replica))
2933
return file;
@@ -148,7 +152,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
148152
// Queue file for its assigned replica
149153
{
150154
std::lock_guard lock(mutex);
151-
unprocessed_files.insert(file_path);
155+
unprocessed_files[file_path] = number_of_current_replica;
152156
connection_to_files[file_replica_idx].push_back(file_path);
153157
}
154158
}
@@ -158,25 +162,64 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFile
158162

159163
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
160164
{
165+
/// Limit time of node activity to keep task in queue
166+
Poco::Timestamp activity_limit;
167+
Poco::Timestamp oldest_activity;
168+
if (lock_object_storage_task_distribution_us > 0)
169+
activity_limit -= lock_object_storage_task_distribution_us;
170+
161171
std::lock_guard lock(mutex);
162172

163173
if (!unprocessed_files.empty())
164174
{
165175
auto it = unprocessed_files.begin();
166-
String next_file = *it;
167-
unprocessed_files.erase(it);
176+
177+
while (it != unprocessed_files.end())
178+
{
179+
auto last_activity = last_node_activity.find(it->second);
180+
if (lock_object_storage_task_distribution_us <= 0
181+
|| last_activity == last_node_activity.end()
182+
|| activity_limit > last_activity->second)
183+
{
184+
String next_file = it->first;
185+
unprocessed_files.erase(it);
186+
187+
LOG_TRACE(
188+
log,
189+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
190+
next_file,
191+
number_of_current_replica
192+
);
193+
194+
return next_file;
195+
}
196+
197+
oldest_activity = std::min(oldest_activity, last_activity->second);
198+
++it;
199+
}
168200

169201
LOG_TRACE(
170202
log,
171-
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
172-
next_file,
173-
number_of_current_replica
203+
"No unprocessed file for replica {}, need to retry after {} us",
204+
number_of_current_replica,
205+
oldest_activity - activity_limit
174206
);
175207

176-
return next_file;
208+
/// All unprocessed files owned by alive replicas with recenlty activity
209+
/// Need to retry after (oldest_activity - activity_limit) microseconds
210+
RelativePathWithMetadata::CommandInTaskResponse response;
211+
response.set_retry_after_us(oldest_activity - activity_limit);
212+
return response.to_string();
177213
}
178214

179215
return std::nullopt;
180216
}
181217

218+
void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t number_of_current_replica)
219+
{
220+
Poco::Timestamp now;
221+
std::lock_guard lock(mutex);
222+
last_node_activity[number_of_current_replica] = now;
223+
}
224+
182225
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@
55
#include <Interpreters/Cluster.h>
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
77
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
8+
9+
#include <Poco/Timestamp.h>
10+
811
#include <unordered_set>
12+
#include <unordered_map>
913
#include <vector>
1014
#include <mutex>
1115
#include <memory>
@@ -18,7 +22,8 @@ class StorageObjectStorageStableTaskDistributor
1822
public:
1923
StorageObjectStorageStableTaskDistributor(
2024
std::shared_ptr<IObjectIterator> iterator_,
21-
std::vector<std::string> ids_of_nodes_);
25+
std::vector<std::string> ids_of_nodes_,
26+
uint64_t lock_object_storage_task_distribution_ms_);
2227

2328
std::optional<String> getNextTask(size_t number_of_current_replica);
2429

@@ -28,12 +33,17 @@ class StorageObjectStorageStableTaskDistributor
2833
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
2934
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);
3035

36+
void saveLastNodeActivity(size_t number_of_current_replica);
37+
3138
std::shared_ptr<IObjectIterator> iterator;
3239

3340
std::vector<std::vector<String>> connection_to_files;
34-
std::unordered_set<String> unprocessed_files;
41+
/// Map of unprocessed files in format filename => number of prefetched replica
42+
std::unordered_map<String, size_t> unprocessed_files;
3543

3644
std::vector<std::string> ids_of_nodes;
45+
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
46+
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;
3747

3848
std::mutex mutex;
3949
bool iterator_exhausted = false;

0 commit comments

Comments
 (0)