Skip to content

Commit d1f4b4c

Browse files
committed
Fix after review
1 parent d851e30 commit d1f4b4c

File tree

7 files changed

+109
-84
lines changed

7 files changed

+109
-84
lines changed

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -107,4 +111,36 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage)
107111
}
108112
}
109113

114+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
115+
{
116+
Poco::JSON::Parser parser;
117+
try
118+
{
119+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
120+
if (!json)
121+
return;
122+
123+
successfully_parsed = true;
124+
125+
if (json->has("retry_after_us"))
126+
retry_after_us = json->getValue<size_t>("retry_after_us");
127+
}
128+
catch (const Poco::JSON::JSONException &)
129+
{ /// Not a JSON
130+
return;
131+
}
132+
}
133+
134+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
135+
{
136+
Poco::JSON::Object json;
137+
if (retry_after_us.has_value())
138+
json.set("retry_after_us", retry_after_us.value());
139+
140+
std::ostringstream oss;
141+
oss.exceptions(std::ios::failbit);
142+
Poco::JSON::Stringifier::stringify(json, oss);
143+
return oss.str();
144+
}
145+
110146
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,37 @@ struct ObjectMetadata
6666

6767
struct RelativePathWithMetadata
6868
{
69+
class CommandInTaskResponse
70+
{
71+
public:
72+
CommandInTaskResponse() {}
73+
CommandInTaskResponse(const std::string & task);
74+
75+
bool is_parsed() const { return successfully_parsed; }
76+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
77+
78+
std::string to_string() const;
79+
80+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
81+
82+
private:
83+
bool successfully_parsed = false;
84+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
85+
};
86+
6987
String relative_path;
7088
std::optional<ObjectMetadata> metadata;
89+
CommandInTaskResponse command;
7190

7291
RelativePathWithMetadata() = default;
7392

74-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
75-
: relative_path(std::move(relative_path_))
76-
, metadata(std::move(metadata_))
77-
{}
93+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
94+
: metadata(std::move(metadata_))
95+
, command(task_string)
96+
{
97+
if (!command.is_parsed())
98+
relative_path = task_string;
99+
}
78100

79101
virtual ~RelativePathWithMetadata() = default;
80102

@@ -85,6 +107,8 @@ struct RelativePathWithMetadata
85107
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
86108

87109
void loadMetadata(ObjectStoragePtr object_storage);
110+
111+
const CommandInTaskResponse & getCommand() const { return command; }
88112
};
89113

90114
struct ObjectKeyWithMetadata

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ namespace ErrorCodes
3535
extern const int LOGICAL_ERROR;
3636
extern const int UNKNOWN_FUNCTION;
3737
extern const int NOT_IMPLEMENTED;
38+
extern const int INVALID_SETTING_VALUE;
3839
}
3940

4041

@@ -387,10 +388,22 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
387388
}
388389
}
389390

391+
uint64_t lock_object_storage_task_distribution_ms = local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms];
392+
393+
/// Check value to avoid negative result after conversion in microseconds.
394+
/// Poco::Timestamp::TimeDiff is signed int 64.
395+
static const uint64_t lock_object_storage_task_distribution_ms_max = 0x0020000000000000ULL;
396+
if (lock_object_storage_task_distribution_ms > lock_object_storage_task_distribution_ms_max)
397+
throw Exception(ErrorCodes::INVALID_SETTING_VALUE,
398+
"Value lock_object_storage_task_distribution_ms is too big: {}, allowed maximum is {}",
399+
lock_object_storage_task_distribution_ms,
400+
lock_object_storage_task_distribution_ms_max
401+
);
402+
390403
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
391404
iterator,
392405
ids_of_hosts,
393-
local_context->getSettingsRef()[Setting::lock_object_storage_task_distribution_ms]);
406+
lock_object_storage_task_distribution_ms);
394407

395408
auto callback = std::make_shared<TaskIterator>(
396409
[task_distributor](size_t number_of_current_replica) mutable -> String {

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -438,22 +438,26 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
438438
not_a_path = false;
439439
object_info = file_iterator->next(processor);
440440

441-
if (!object_info || object_info->getPath().empty())
441+
if (!object_info)
442442
return {};
443443

444-
StorageObjectStorageStableTaskDistributor::CommandInTaskResponse command(object_info->getPath());
445-
if (command.is_parsed())
444+
if (object_info->getCommand().is_parsed())
446445
{
447-
auto retry_after_us = command.get_retry_after_us();
446+
auto retry_after_us = object_info->getCommand().get_retry_after_us();
448447
if (retry_after_us.has_value())
449448
{
450449
not_a_path = true;
451450
/// TODO: Make asyncronous waiting without sleep in thread
452-
sleepForMicroseconds(std::min(100000ul, retry_after_us.value()));
451+
/// Now this sleep is on executor node in worker thread
452+
/// Does not block query initiator
453+
sleepForMicroseconds(std::min(Poco::Timestamp::TimeDiff(100000ul), retry_after_us.value()));
453454
continue;
454455
}
455456
}
456457

458+
if (object_info->getPath().empty())
459+
return {};
460+
457461
object_info->loadMetadata(object_storage);
458462
}
459463
while (not_a_path || (query_settings.skip_empty_files && object_info->metadata->size_bytes == 0));

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,6 @@
33
#include <consistent_hashing.h>
44
#include <optional>
55

6-
#include <Poco/JSON/Object.h>
7-
#include <Poco/JSON/Parser.h>
8-
#include <Poco/JSON/JSONException.h>
9-
106
namespace DB
117
{
128

@@ -169,7 +165,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
169165
/// Limit time of node activity to keep task in queue
170166
Poco::Timestamp activity_limit;
171167
Poco::Timestamp oldest_activity;
172-
if (lock_object_storage_task_distribution_us)
168+
if (lock_object_storage_task_distribution_us > 0)
173169
activity_limit -= lock_object_storage_task_distribution_us;
174170

175171
std::lock_guard lock(mutex);
@@ -181,7 +177,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
181177
while (it != unprocessed_files.end())
182178
{
183179
auto last_activity = last_node_activity.find(it->second);
184-
if (!lock_object_storage_task_distribution_us
180+
if (lock_object_storage_task_distribution_us <= 0
185181
|| last_activity == last_node_activity.end()
186182
|| activity_limit > last_activity->second)
187183
{
@@ -211,7 +207,7 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
211207

212208
/// All unprocessed files owned by alive replicas with recenlty activity
213209
/// Need to retry after (oldest_activity - activity_limit) microseconds
214-
CommandInTaskResponse response;
210+
RelativePathWithMetadata::CommandInTaskResponse response;
215211
response.set_retry_after_us(oldest_activity - activity_limit);
216212
return response.to_string();
217213
}
@@ -226,36 +222,4 @@ void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t numb
226222
last_node_activity[number_of_current_replica] = now;
227223
}
228224

229-
StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
230-
{
231-
Poco::JSON::Parser parser;
232-
try
233-
{
234-
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
235-
if (!json)
236-
return;
237-
238-
successfully_parsed = true;
239-
240-
if (json->has("retry_after_us"))
241-
retry_after_us = json->getValue<size_t>("retry_after_us");
242-
}
243-
catch (const Poco::JSON::JSONException &)
244-
{ /// Not a JSON
245-
return;
246-
}
247-
}
248-
249-
std::string StorageObjectStorageStableTaskDistributor::CommandInTaskResponse::to_string() const
250-
{
251-
Poco::JSON::Object json;
252-
if (retry_after_us.has_value())
253-
json.set("retry_after_us", retry_after_us.value());
254-
255-
std::ostringstream oss;
256-
oss.exceptions(std::ios::failbit);
257-
Poco::JSON::Stringifier::stringify(json, oss);
258-
return oss.str();
259-
}
260-
261225
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,6 @@ namespace DB
2020
class StorageObjectStorageStableTaskDistributor
2121
{
2222
public:
23-
class CommandInTaskResponse
24-
{
25-
public:
26-
CommandInTaskResponse() {}
27-
CommandInTaskResponse(const std::string & task);
28-
29-
bool is_parsed() const { return successfully_parsed; }
30-
void set_retry_after_us(uint64_t time_us) { retry_after_us = time_us; }
31-
32-
std::string to_string() const;
33-
34-
std::optional<uint64_t> get_retry_after_us() const { return retry_after_us; }
35-
36-
private:
37-
bool successfully_parsed = false;
38-
std::optional<uint64_t> retry_after_us;
39-
};
40-
4123
StorageObjectStorageStableTaskDistributor(
4224
std::shared_ptr<IObjectIterator> iterator_,
4325
std::vector<std::string> ids_of_nodes_,

tests/integration/test_s3_cache_locality/test.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ def started_cluster():
7575
cluster.shutdown()
7676

7777

78-
def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False):
78+
def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache,
79+
lock_object_storage_task_distribution_ms):
7980
for host in list(cluster.instances.values()):
8081
host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True)
8182

@@ -84,8 +85,8 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
8485
"filesystem_cache_name": "'raw_s3_cache'",
8586
}
8687

87-
if lock:
88-
settings["lock_object_storage_task_distribution_ms"] = 30000
88+
if lock_object_storage_task_distribution_ms > 0:
89+
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
8990

9091
query_id_first = str(uuid.uuid4())
9192
result_first = node.query(
@@ -133,20 +134,21 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
133134
return int(s3_get_first), int(s3_get_second)
134135

135136

136-
def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock=False):
137+
def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache,
138+
lock_object_storage_task_distribution_ms):
137139
# Repeat test several times to get average result
138-
iterations = 1 if lock else 10
140+
iterations = 1 if lock_object_storage_task_distribution_ms > 0 else 10
139141
s3_get_first_sum = 0
140142
s3_get_second_sum = 0
141143
for _ in range(iterations):
142-
(s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock)
144+
(s3_get_first, s3_get_second) = check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache, lock_object_storage_task_distribution_ms)
143145
s3_get_first_sum += s3_get_first
144146
s3_get_second_sum += s3_get_second
145147
return s3_get_first_sum, s3_get_second_sum
146148

147149

148-
@pytest.mark.parametrize("lock", [False, True])
149-
def test_cache_locality(started_cluster, lock):
150+
@pytest.mark.parametrize("lock_object_storage_task_distribution_ms ", [0, 30000])
151+
def test_cache_locality(started_cluster, lock_object_storage_task_distribution_ms):
150152
node = started_cluster.instances["clickhouse0"]
151153

152154
expected_result = node.query(
@@ -158,36 +160,36 @@ def test_cache_locality(started_cluster, lock):
158160
)
159161

160162
# Algorithm does not give 100% guarantee, so add 10% on dispersion
161-
dispersion = 0.0 if lock else 0.1
163+
dispersion = 0.0 if lock_object_storage_task_distribution_ms > 0 else 0.1
162164

163165
# No cache
164-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock)
166+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 0, lock_object_storage_task_distribution_ms)
165167
assert s3_get_second == s3_get_first
166168

167169
# With cache
168-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock)
170+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms)
169171
assert s3_get_second <= s3_get_first * dispersion
170172

171173
# Different nodes order
172-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock)
174+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms)
173175
assert s3_get_second <= s3_get_first * dispersion
174176

175177
# No last node
176-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock)
178+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms)
177179
assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000
178180

179181
# No first node
180-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock)
182+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms)
181183
assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000
182184

183185
# No first node, different nodes order
184-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock)
186+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms)
185187
assert s3_get_second <= s3_get_first * (0.189 + dispersion)
186188

187189
# Add new node, different nodes order
188-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock)
190+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms)
189191
assert s3_get_second <= s3_get_first * (0.189 + dispersion)
190192

191193
# New node and old node, different nodes order
192-
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock)
194+
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms)
193195
assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000

0 commit comments

Comments
 (0)