Skip to content

Commit 6f6ef4b

Browse files
committed
Fix lock_object_storage_task_distribution_ms with a few files
1 parent eb9bfbe commit 6f6ef4b

File tree

6 files changed

+107
-31
lines changed

6 files changed

+107
-31
lines changed

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string
118118
if (!json)
119119
return;
120120

121-
successfully_parsed = true;
121+
is_valid = true;
122122

123123
if (json->has("retry_after_us"))
124124
retry_after_us = json->getValue<size_t>("retry_after_us");
@@ -129,7 +129,7 @@ PathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string
129129
}
130130
}
131131

132-
std::string PathWithMetadata::CommandInTaskResponse::to_string() const
132+
std::string PathWithMetadata::CommandInTaskResponse::toString() const
133133
{
134134
Poco::JSON::Object json;
135135
if (retry_after_us.has_value())

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,19 @@ struct PathWithMetadata
121121
CommandInTaskResponse() = default;
122122
explicit CommandInTaskResponse(const std::string & task);
123123

124-
bool is_parsed() const { return successfully_parsed; }
125-
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
124+
bool isValid() const { return is_valid; }
125+
void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us)
126+
{
127+
retry_after_us = time_us;
128+
is_valid = true;
129+
}
126130

127-
std::string to_string() const;
131+
std::string toString() const;
128132

129-
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
133+
std::optional<Poco::Timestamp::TimeDiff> getRetryAfterUs() const { return retry_after_us; }
130134

131135
private:
132-
bool successfully_parsed = false;
136+
bool is_valid = false;
133137
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
134138
};
135139

@@ -158,7 +162,7 @@ struct PathWithMetadata
158162
, absolute_path((absolute_path_.has_value() && !absolute_path_.value().empty()) ? absolute_path_ : std::nullopt)
159163
, object_storage_to_use(object_storage_to_use_)
160164
{
161-
if (command.is_parsed())
165+
if (command.isValid())
162166
relative_path = "";
163167
}
164168

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
3333

3434
file_meta_info = object->file_meta_info;
3535

36-
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
37-
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
38-
absolute_path = object->getAbsolutePath();
36+
if (object->getCommand().isValid())
37+
path = object->getCommand().toString();
38+
else
39+
{
40+
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
41+
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
42+
absolute_path = object->getAbsolutePath();
43+
}
3944
}
4045

4146
ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(const std::string & path_)

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,9 +498,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
498498
if (!object_info)
499499
return {};
500500

501-
if (object_info->getCommand().is_parsed())
501+
if (object_info->getCommand().isValid())
502502
{
503-
auto retry_after_us = object_info->getCommand().get_retry_after_us();
503+
auto retry_after_us = object_info->getCommand().getRetryAfterUs();
504504
if (retry_after_us.has_value())
505505
{
506506
not_a_path = true;

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(s
250250
/// All unprocessed files owned by alive replicas with recenlty activity
251251
/// Need to retry after (oldest_activity - activity_limit) microseconds
252252
PathWithMetadata::CommandInTaskResponse response;
253-
response.set_retry_after_us(oldest_activity - activity_limit);
254-
return std::make_shared<ObjectInfo>(response.to_string());
253+
response.setRetryAfterUs(oldest_activity - activity_limit);
254+
return std::make_shared<ObjectInfo>(response.toString());
255255
}
256256

257257
return {};

tests/integration/test_s3_cache_locality/test.py

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ def create_buckets_s3(cluster, files=1000):
2222
s3_data = []
2323

2424
for file_number in range(files):
25-
file_name = f"data/generated/file_{file_number}.csv"
26-
os.makedirs(os.path.join(SCRIPT_DIR, "data/generated/"), exist_ok=True)
25+
file_name = f"data/generated_{files}/file_{file_number}.csv"
26+
os.makedirs(os.path.join(SCRIPT_DIR, f"data/generated_{files}/"), exist_ok=True)
2727
s3_data.append(file_name)
2828
with open(os.path.join(SCRIPT_DIR, file_name), "w+", encoding="utf-8") as f:
2929
# a String, b UInt64
@@ -69,15 +69,17 @@ def started_cluster():
6969
logging.info("Cluster started")
7070

7171
create_buckets_s3(cluster)
72+
create_buckets_s3(cluster, files=3)
7273

7374
yield cluster
7475
finally:
75-
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated/"), ignore_errors=True)
76+
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated_1000/"), ignore_errors=True)
77+
shutil.rmtree(os.path.join(SCRIPT_DIR, "data/generated_3/"), ignore_errors=True)
7678
cluster.shutdown()
7779

7880

7981
def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache,
80-
lock_object_storage_task_distribution_ms):
82+
lock_object_storage_task_distribution_ms, files=1000):
8183
for host in list(cluster.instances.values()):
8284
host.query("SYSTEM DROP FILESYSTEM CACHE 'raw_s3_cache'", ignore_error=True)
8385

@@ -92,7 +94,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
9294
result_first = node.query(
9395
f"""
9496
SELECT count(*)
95-
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
97+
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
9698
WHERE b=42
9799
SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())}
98100
""",
@@ -103,7 +105,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
103105
result_second = node.query(
104106
f"""
105107
SELECT count(*)
106-
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
108+
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
107109
WHERE b=42
108110
SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())}
109111
""",
@@ -134,6 +136,40 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
134136
return int(s3_get_first), int(s3_get_second)
135137

136138

139+
def check_s3_gets_by_hosts(cluster, node, expected_result,
140+
lock_object_storage_task_distribution_ms, files=1000):
141+
settings = {
142+
"enable_filesystem_cache": False,
143+
}
144+
145+
settings["lock_object_storage_task_distribution_ms"] = lock_object_storage_task_distribution_ms
146+
query_id = str(uuid.uuid4())
147+
result = node.query(
148+
f"""
149+
SELECT count(*)
150+
FROM s3Cluster('{cluster}', 'http://minio1:9001/root/data/generated_{files}/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
151+
WHERE b=42
152+
SETTINGS {",".join(f"{k}={v}" for k, v in settings.items())}
153+
""",
154+
query_id=query_id,
155+
)
156+
assert result == expected_result
157+
158+
node.query(f"SYSTEM FLUSH LOGS ON CLUSTER {cluster}")
159+
160+
s3_get = node.query(
161+
f"""
162+
SELECT ProfileEvents['S3GetObject']
163+
FROM clusterAllReplicas('{cluster}', system.query_log)
164+
WHERE type='QueryFinish'
165+
AND initial_query_id='{query_id}'
166+
ORDER BY hostname
167+
""",
168+
)
169+
170+
return [int(events) for events in s3_get.strip().split("\n")]
171+
172+
137173
def check_s3_gets_repeat(cluster, node, expected_result, cluster_first, cluster_second, enable_filesystem_cache,
138174
lock_object_storage_task_distribution_ms):
139175
# Repeat test several times to get average result
@@ -154,7 +190,7 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m
154190
expected_result = node.query(
155191
f"""
156192
SELECT count(*)
157-
FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
193+
FROM s3('http://minio1:9001/root/data/generated_1000/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
158194
WHERE b=42
159195
"""
160196
)
@@ -170,26 +206,57 @@ def test_cache_locality(started_cluster, lock_object_storage_task_distribution_m
170206
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_12345', 1, lock_object_storage_task_distribution_ms)
171207
assert s3_get_second <= s3_get_first * dispersion
172208

173-
# Different nodes order
209+
# Different replicas order
174210
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_34512', 1, lock_object_storage_task_distribution_ms)
175211
assert s3_get_second <= s3_get_first * dispersion
176212

177-
# No last node
213+
# No last replica
178214
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_1234', 1, lock_object_storage_task_distribution_ms)
179-
assert s3_get_second <= s3_get_first * (0.211 + dispersion) # actual value - 24 for 100 files, 211 for 1000
215+
assert s3_get_second <= s3_get_first * (0.179 + dispersion) # actual value - 179 of 1000 files changed replica
180216

181-
# No first node
217+
# No first replica
182218
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_2345', 1, lock_object_storage_task_distribution_ms)
183-
assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 12 for 100 files, 189 for 1000
219+
assert s3_get_second <= s3_get_first * (0.189 + dispersion) # actual value - 189 of 1000 files changed replica
184220

185-
# No first node, different nodes order
221+
# No first replica, different replicas order
186222
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_12345', 'cluster_4523', 1, lock_object_storage_task_distribution_ms)
187223
assert s3_get_second <= s3_get_first * (0.189 + dispersion)
188224

189-
# Add new node, different nodes order
225+
# Add new replica, different replicas order
190226
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_4523', 'cluster_12345', 1, lock_object_storage_task_distribution_ms)
191227
assert s3_get_second <= s3_get_first * (0.189 + dispersion)
192228

193-
# New node and old node, different nodes order
229+
# New replica and old replica, different replicas order
230+
# All files form removed replica changed replica
231+
# Some files form existed replicas changed replica on the new replica
194232
(s3_get_first, s3_get_second) = check_s3_gets_repeat(started_cluster, node, expected_result, 'cluster_1234', 'cluster_4523', 1, lock_object_storage_task_distribution_ms)
195-
assert s3_get_second <= s3_get_first * (0.400 + dispersion) # actual value - 36 for 100 files, 400 for 1000
233+
assert s3_get_second <= s3_get_first * (0.368 + dispersion) # actual value - 368 of 1000 changed replica
234+
235+
if (lock_object_storage_task_distribution_ms > 0):
236+
s3_get = check_s3_gets_by_hosts('cluster_12345', node, expected_result, lock_object_storage_task_distribution_ms, files=1000)
237+
assert s3_get == [189,210,220,202,179]
238+
s3_get = check_s3_gets_by_hosts('cluster_1234', node, expected_result, lock_object_storage_task_distribution_ms, files=1000)
239+
assert s3_get == [247,243,264,246]
240+
s3_get = check_s3_gets_by_hosts('cluster_2345', node, expected_result, lock_object_storage_task_distribution_ms, files=1000)
241+
assert s3_get == [251,280,248,221]
242+
243+
244+
def test_cache_locality_few_files(started_cluster):
245+
node = started_cluster.instances["clickhouse0"]
246+
247+
expected_result = node.query(
248+
f"""
249+
SELECT count(*)
250+
FROM s3('http://minio1:9001/root/data/generated_3/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
251+
WHERE b=42
252+
"""
253+
)
254+
255+
# Rendezvous hash makes the next distribution:
256+
# file_0 - clickhouse1
257+
# file_1 - clickhouse4
258+
# file_2 - clickhouse3
259+
# The same distribution must be in each query
260+
for _ in range(10):
261+
s3_get = check_s3_gets_by_hosts('cluster_12345', node, expected_result, lock_object_storage_task_distribution_ms=30000, files=3)
262+
assert s3_get == [1,0,1,1,0]

0 commit comments

Comments
 (0)