Skip to content

Commit 9e1f968

Browse files
disagg: honor pager early-stop on truncated list (#10762) (#10766)
close #10761 fix(s3): honor pager early-stop on truncated list Treat pager `more=false` as an explicit termination condition for both `listPrefix` and `listPrefixWithDelimiter`, so truncated listings do not re-fetch the same page indefinitely. Add a regression unit test that creates a multi-page prefix and requests early stop on the first callback; verify listing exits immediately. Signed-off-by: JaySon-Huang <tshent@qq.com> Co-authored-by: JaySon-Huang <tshent@qq.com>
1 parent 53815c1 commit 9e1f968

File tree

3 files changed

+139
-19
lines changed

3 files changed

+139
-19
lines changed

dbms/src/Storages/S3/MockS3Client.cpp

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ Model::DeleteObjectOutcome MockS3Client::DeleteObject(const Model::DeleteObjectR
195195

196196
Model::ListObjectsV2Outcome MockS3Client::ListObjectsV2(const Model::ListObjectsV2Request & request) const
197197
{
198+
constexpr int default_max_keys = 1000;
199+
198200
std::lock_guard lock(mtx);
199201
auto itr = storage.find(request.GetBucket());
200202
if (itr == storage.end())
@@ -203,25 +205,60 @@ Model::ListObjectsV2Outcome MockS3Client::ListObjectsV2(const Model::ListObjects
203205
}
204206
const auto & bucket_storage = itr->second;
205207
Model::ListObjectsV2Result result;
206-
RUNTIME_CHECK(!request.DelimiterHasBeenSet() || request.GetDelimiter() == "/", request.GetDelimiter());
208+
if (request.DelimiterHasBeenSet())
209+
RUNTIME_CHECK(request.GetDelimiter() == "/", request.GetDelimiter());
210+
211+
const auto max_keys = request.MaxKeysHasBeenSet() ? request.GetMaxKeys() : default_max_keys;
212+
RUNTIME_CHECK(max_keys > 0, max_keys);
213+
const auto continuation_token = request.ContinuationTokenHasBeenSet() ? request.GetContinuationToken() : String{};
214+
215+
auto finalize_page = [&](const auto & sorted_keys, auto append_result) {
216+
auto begin = sorted_keys.begin();
217+
if (!continuation_token.empty())
218+
begin = sorted_keys.lower_bound(continuation_token);
219+
220+
int key_count = 0;
221+
for (auto iter = begin; iter != sorted_keys.end() && key_count < max_keys; ++iter, ++key_count)
222+
append_result(*iter);
223+
224+
result.SetKeyCount(key_count);
225+
226+
auto next_iter = begin;
227+
for (int i = 0; i < key_count && next_iter != sorted_keys.end(); ++i)
228+
++next_iter;
229+
if (next_iter != sorted_keys.end())
230+
{
231+
result.SetIsTruncated(true);
232+
result.SetNextContinuationToken(*next_iter);
233+
}
234+
else
235+
{
236+
result.SetIsTruncated(false);
237+
}
238+
};
207239

208240
auto normalized_prefix = normalizedKey(request.GetPrefix());
209241
if (!request.DelimiterHasBeenSet())
210242
{
243+
std::set<String> matched_keys;
211244
for (auto itr_obj = bucket_storage.lower_bound(normalized_prefix); itr_obj != bucket_storage.end(); ++itr_obj)
212245
{
213246
if (startsWith(itr_obj->first, normalized_prefix))
214247
{
215-
Model::Object obj;
216-
obj.SetKey(itr_obj->first);
217-
obj.SetSize(itr_obj->second.size());
218-
result.AddContents(std::move(obj));
248+
matched_keys.insert(itr_obj->first);
219249
}
220250
else
221251
{
222252
break;
223253
}
224254
}
255+
256+
finalize_page(matched_keys, [&](const auto & key) {
257+
Model::Object obj;
258+
obj.SetKey(key);
259+
obj.SetSize(bucket_storage.at(key).size());
260+
result.AddContents(std::move(obj));
261+
});
225262
}
226263
else
227264
{
@@ -237,10 +274,9 @@ Model::ListObjectsV2Outcome MockS3Client::ListObjectsV2(const Model::ListObjects
237274
continue;
238275
common_prefix.insert(key.substr(0, pos + delimiter.size()));
239276
}
240-
for (const auto & p : common_prefix)
241-
{
242-
result.AddCommonPrefixes(Model::CommonPrefix().WithPrefix(p));
243-
}
277+
finalize_page(common_prefix, [&](const auto & key) {
278+
result.AddCommonPrefixes(Model::CommonPrefix().WithPrefix(key));
279+
});
244280
}
245281
return result;
246282
}

dbms/src/Storages/S3/S3Common.cpp

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -856,30 +856,35 @@ void listPrefix(
856856
}
857857
GET_METRIC(tiflash_storage_s3_request_seconds, type_list_objects).Observe(sw_list.elapsedSeconds());
858858

859-
PageResult page_res{};
859+
bool should_continue = true;
860860
const auto & result = outcome.GetResult();
861861
auto page_keys = result.GetContents().size();
862862
num_keys += page_keys;
863863
for (const auto & object : result.GetContents())
864864
{
865865
if (!need_cut)
866866
{
867-
page_res = pager(object);
867+
should_continue = pager(object).more;
868868
}
869869
else
870870
{
871871
// Copy the `Object` to cut off the `root` from key, the cost should be acceptable :(
872872
auto object_without_root = object;
873873
object_without_root.SetKey(object.GetKey().substr(cut_size, object.GetKey().size()));
874-
page_res = pager(object_without_root);
874+
should_continue = pager(object_without_root).more;
875875
}
876-
if (!page_res.more)
876+
if (!should_continue)
877877
break;
878878
}
879879

880+
if (!should_continue)
881+
{
882+
break;
883+
}
884+
880885
// handle the result size over max size
881886
done = !result.GetIsTruncated();
882-
if (!done && page_res.more)
887+
if (!done)
883888
{
884889
const auto & next_token = result.GetNextContinuationToken();
885890
req.SetContinuationToken(next_token);
@@ -937,30 +942,35 @@ void listPrefixWithDelimiter(
937942
}
938943
GET_METRIC(tiflash_storage_s3_request_seconds, type_list_objects).Observe(sw_list.elapsedSeconds());
939944

940-
PageResult page_res{};
945+
bool should_continue = true;
941946
const auto & result = outcome.GetResult();
942947
auto page_keys = result.GetCommonPrefixes().size();
943948
num_keys += page_keys;
944949
for (const auto & prefix : result.GetCommonPrefixes())
945950
{
946951
if (!need_cut)
947952
{
948-
page_res = pager(prefix);
953+
should_continue = pager(prefix).more;
949954
}
950955
else
951956
{
952957
// Copy the `CommonPrefix` to cut off the `root`, the cost should be acceptable :(
953958
auto prefix_without_root = prefix;
954959
prefix_without_root.SetPrefix(prefix.GetPrefix().substr(cut_size, prefix.GetPrefix().size()));
955-
page_res = pager(prefix_without_root);
960+
should_continue = pager(prefix_without_root).more;
956961
}
957-
if (!page_res.more)
962+
if (!should_continue)
958963
break;
959964
}
960965

966+
if (!should_continue)
967+
{
968+
break;
969+
}
970+
961971
// handle the result size over max size
962972
done = !result.GetIsTruncated();
963-
if (!done && page_res.more)
973+
if (!done)
964974
{
965975
const auto & next_token = result.GetNextContinuationToken();
966976
req.SetContinuationToken(next_token);

dbms/src/Storages/S3/tests/gtest_s3client.cpp

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,4 +252,78 @@ try
252252
}
253253
CATCH
254254

255+
TEST_F(S3ClientTest, ListPrefixEarlyStopOnTruncatedResult)
256+
try
257+
{
258+
// Keep key count above S3's default one-page listing size so listing is truncated.
259+
constexpr size_t key_count = 1001;
260+
SCOPE_EXIT({
261+
for (size_t i = 0; i < key_count; ++i)
262+
deleteObject(*client, fmt::format("s999/list_prefix_early_stop/key_{}", i));
263+
});
264+
for (size_t i = 0; i < key_count; ++i)
265+
{
266+
uploadEmptyFile(*client, fmt::format("s999/list_prefix_early_stop/key_{}", i));
267+
}
268+
269+
Aws::S3::Model::ListObjectsV2Request req;
270+
req.WithBucket(client->bucket()).WithPrefix(client->root() + "s999/list_prefix_early_stop/");
271+
auto outcome = client->ListObjectsV2(req);
272+
ASSERT_TRUE(outcome.IsSuccess());
273+
const auto & result = outcome.GetResult();
274+
ASSERT_TRUE(result.GetIsTruncated());
275+
ASSERT_EQ(result.GetContents().size(), 1000);
276+
ASSERT_FALSE(result.GetNextContinuationToken().empty());
277+
278+
size_t visited = 0;
279+
listPrefix(*client, "s999/list_prefix_early_stop/", [&](const Aws::S3::Model::Object & object) {
280+
UNUSED(object);
281+
++visited;
282+
return PageResult{.num_keys = 1, .more = false};
283+
});
284+
285+
ASSERT_EQ(visited, 1);
286+
}
287+
CATCH
288+
289+
TEST_F(S3ClientTest, ListPrefixWithDelimiterEarlyStopOnTruncatedResult)
290+
try
291+
{
292+
// Keep common prefix count above S3's default one-page listing size so listing is truncated.
293+
constexpr size_t prefix_count = 1001;
294+
SCOPE_EXIT({
295+
for (size_t i = 0; i < prefix_count; ++i)
296+
deleteObject(*client, fmt::format("s999/list_prefix_with_delimiter_early_stop/dir_{}/key", i));
297+
});
298+
for (size_t i = 0; i < prefix_count; ++i)
299+
{
300+
uploadEmptyFile(*client, fmt::format("s999/list_prefix_with_delimiter_early_stop/dir_{}/key", i));
301+
}
302+
303+
Aws::S3::Model::ListObjectsV2Request req;
304+
req.WithBucket(client->bucket())
305+
.WithPrefix(client->root() + "s999/list_prefix_with_delimiter_early_stop/")
306+
.WithDelimiter("/");
307+
auto outcome = client->ListObjectsV2(req);
308+
ASSERT_TRUE(outcome.IsSuccess());
309+
const auto & result = outcome.GetResult();
310+
ASSERT_TRUE(result.GetIsTruncated());
311+
ASSERT_EQ(result.GetCommonPrefixes().size(), 1000);
312+
ASSERT_FALSE(result.GetNextContinuationToken().empty());
313+
314+
size_t visited = 0;
315+
listPrefixWithDelimiter(
316+
*client,
317+
"s999/list_prefix_with_delimiter_early_stop/",
318+
"/",
319+
[&](const Aws::S3::Model::CommonPrefix & prefix) {
320+
UNUSED(prefix);
321+
++visited;
322+
return PageResult{.num_keys = 1, .more = false};
323+
});
324+
325+
ASSERT_EQ(visited, 1);
326+
}
327+
CATCH
328+
255329
} // namespace DB::S3::tests

0 commit comments

Comments
 (0)