Skip to content

Commit 3f43e88

Browse files
authored
Merge branch 'antalya-25.3' into backports/25.3/82114_fix_glue_nested_types
2 parents 0ebcdb7 + 70ffe93 commit 3f43e88

File tree

20 files changed

+358
-72
lines changed

20 files changed

+358
-72
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6141,6 +6141,9 @@ Cache the list of objects returned by list objects calls in object storage
61416141
)", EXPERIMENTAL) \
61426142
DECLARE(Bool, object_storage_remote_initiator, false, R"(
61436143
Execute request to object storage as remote on one of object_storage_cluster nodes.
6144+
)", EXPERIMENTAL) \
6145+
DECLARE(UInt64, lock_object_storage_task_distribution_ms, 0, R"(
6146+
In object storage distribution queries do not distibute tasks on non-prefetched nodes until prefetched node is active.
61446147
)", EXPERIMENTAL) \
61456148
\
61466149

src/Core/SettingsChangesHistory.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6666
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
6767
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
6868
/// Note: please check if the key already exists to prevent duplicate entries.
69+
addSettingsChanges(settings_changes_history, "25.3.3.20000",
70+
{
71+
// Altinity Antalya modifications atop of 25.3
72+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
73+
});
6974
addSettingsChanges(settings_changes_history, "25.2.1.20000",
7075
{
7176
// Altinity Antalya modifications atop of 25.2

src/Databases/DataLake/ICatalog.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,19 @@ void TableMetadata::setLocation(const std::string & location_)
6767
auto pos_to_path = location_.substr(pos_to_bucket).find('/');
6868

6969
if (pos_to_path == std::string::npos)
70-
throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Unexpected location format: {}", location_);
71-
72-
pos_to_path = pos_to_bucket + pos_to_path;
70+
{ // empty path
71+
location_without_path = location_;
72+
path.clear();
73+
bucket = location_.substr(pos_to_bucket);
74+
}
75+
else
76+
{
77+
pos_to_path = pos_to_bucket + pos_to_path;
7378

74-
location_without_path = location_.substr(0, pos_to_path);
75-
path = location_.substr(pos_to_path + 1);
76-
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
79+
location_without_path = location_.substr(0, pos_to_path);
80+
path = location_.substr(pos_to_path + 1);
81+
bucket = location_.substr(pos_to_bucket, pos_to_path - pos_to_bucket);
82+
}
7783

7884
LOG_TEST(getLogger("TableMetadata"),
7985
"Parsed location without path: {}, path: {}",

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
#include <Poco/JSON/Object.h>
12+
#include <Poco/JSON/Parser.h>
13+
#include <Poco/JSON/JSONException.h>
14+
1115

1216
namespace DB
1317
{
@@ -107,4 +111,36 @@ void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage)
107111
}
108112
}
109113

114+
RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std::string & task)
115+
{
116+
Poco::JSON::Parser parser;
117+
try
118+
{
119+
auto json = parser.parse(task).extract<Poco::JSON::Object::Ptr>();
120+
if (!json)
121+
return;
122+
123+
successfully_parsed = true;
124+
125+
if (json->has("retry_after_us"))
126+
retry_after_us = json->getValue<size_t>("retry_after_us");
127+
}
128+
catch (const Poco::JSON::JSONException &)
129+
{ /// Not a JSON
130+
return;
131+
}
132+
}
133+
134+
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
135+
{
136+
Poco::JSON::Object json;
137+
if (retry_after_us.has_value())
138+
json.set("retry_after_us", retry_after_us.value());
139+
140+
std::ostringstream oss;
141+
oss.exceptions(std::ios::failbit);
142+
Poco::JSON::Stringifier::stringify(json, oss);
143+
return oss.str();
144+
}
145+
110146
}

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,37 @@ struct ObjectMetadata
6666

6767
struct RelativePathWithMetadata
6868
{
69+
class CommandInTaskResponse
70+
{
71+
public:
72+
CommandInTaskResponse() {}
73+
CommandInTaskResponse(const std::string & task);
74+
75+
bool is_parsed() const { return successfully_parsed; }
76+
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
77+
78+
std::string to_string() const;
79+
80+
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
81+
82+
private:
83+
bool successfully_parsed = false;
84+
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
85+
};
86+
6987
String relative_path;
7088
std::optional<ObjectMetadata> metadata;
89+
CommandInTaskResponse command;
7190

7291
RelativePathWithMetadata() = default;
7392

74-
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
75-
: relative_path(std::move(relative_path_))
76-
, metadata(std::move(metadata_))
77-
{}
93+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
94+
: metadata(std::move(metadata_))
95+
, command(task_string)
96+
{
97+
if (!command.is_parsed())
98+
relative_path = task_string;
99+
}
78100

79101
virtual ~RelativePathWithMetadata() = default;
80102

@@ -85,6 +107,8 @@ struct RelativePathWithMetadata
85107
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
86108

87109
void loadMetadata(ObjectStoragePtr object_storage);
110+
111+
const CommandInTaskResponse & getCommand() const { return command; }
88112
};
89113

90114
struct ObjectKeyWithMetadata

src/IO/S3/Client.cpp

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ Model::HeadObjectOutcome Client::HeadObject(HeadObjectRequest & request) const
373373
auto bucket_uri = getURIForBucket(bucket);
374374
if (!bucket_uri)
375375
{
376-
if (auto maybe_error = updateURIForBucketForHead(bucket); maybe_error.has_value())
376+
if (auto maybe_error = updateURIForBucketForHead(bucket, request.GetKey()); maybe_error.has_value())
377377
return *maybe_error;
378378

379379
if (auto region = getRegionForBucket(bucket); !region.empty())
@@ -578,7 +578,6 @@ Client::doRequest(RequestType & request, RequestFn request_fn) const
578578
if (auto uri = getURIForBucket(bucket); uri.has_value())
579579
request.overrideURI(std::move(*uri));
580580

581-
582581
bool found_new_endpoint = false;
583582
// if we found correct endpoint after 301 responses, update the cache for future requests
584583
SCOPE_EXIT(
@@ -813,12 +812,15 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
813812
}
814813

815814
// Do a list request because head requests don't have body in response
816-
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket) const
815+
// S3 Tables don't support ListObjects, so made dirty workaroung - changed on GetObject
816+
std::optional<Aws::S3::S3Error> Client::updateURIForBucketForHead(const std::string & bucket, const std::string & key) const
817817
{
818-
ListObjectsV2Request req;
818+
GetObjectRequest req;
819819
req.SetBucket(bucket);
820-
req.SetMaxKeys(1);
821-
auto result = ListObjectsV2(req);
820+
req.SetKey(key);
821+
req.SetRange("bytes=0-1");
822+
auto result = GetObject(req);
823+
822824
if (result.IsSuccess())
823825
return std::nullopt;
824826
return result.GetError();

src/IO/S3/Client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ class Client : private Aws::S3::S3Client
269269

270270
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
271271
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
272-
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;
272+
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket, const std::string & key) const;
273273

274274
std::optional<S3::URI> getURIForBucket(const std::string & bucket) const;
275275

src/IO/S3/URI.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,72 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
157157
}
158158
}
159159

160+
bool URI::isAWSRegion(std::string_view region)
161+
{
162+
/// List from https://docs.aws.amazon.com/general/latest/gr/s3.html
163+
static const std::unordered_set<std::string_view> regions = {
164+
"us-east-2",
165+
"us-east-1",
166+
"us-west-1",
167+
"us-west-2",
168+
"af-south-1",
169+
"ap-east-1",
170+
"ap-south-2",
171+
"ap-southeast-3",
172+
"ap-southeast-5",
173+
"ap-southeast-4",
174+
"ap-south-1",
175+
"ap-northeast-3",
176+
"ap-northeast-2",
177+
"ap-southeast-1",
178+
"ap-southeast-2",
179+
"ap-east-2",
180+
"ap-southeast-7",
181+
"ap-northeast-1",
182+
"ca-central-1",
183+
"ca-west-1",
184+
"eu-central-1",
185+
"eu-west-1",
186+
"eu-west-2",
187+
"eu-south-1",
188+
"eu-west-3",
189+
"eu-south-2",
190+
"eu-north-1",
191+
"eu-central-2",
192+
"il-central-1",
193+
"mx-central-1",
194+
"me-south-1",
195+
"me-central-1",
196+
"sa-east-1",
197+
"us-gov-east-1",
198+
"us-gov-west-1"
199+
};
200+
201+
/// 's3-us-west-2' is a legacy region format for S3 storage, equals to 'us-west-2'
202+
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#VirtualHostingBackwardsCompatibility
203+
if (region.substr(0, 3) == "s3-")
204+
region = region.substr(3);
205+
206+
return regions.contains(region);
207+
}
208+
160209
void URI::addRegionToURI(const std::string &region)
161210
{
162211
if (auto pos = endpoint.find(".amazonaws.com"); pos != std::string::npos)
212+
{
213+
if (pos > 0)
214+
{ /// Check if region is already in endpoint to avoid add it second time
215+
auto prev_pos = endpoint.find_last_of("/.", pos - 1);
216+
if (prev_pos == std::string::npos)
217+
prev_pos = 0;
218+
else
219+
++prev_pos;
220+
std::string_view endpoint_region = std::string_view(endpoint).substr(prev_pos, pos - prev_pos);
221+
if (isAWSRegion(endpoint_region))
222+
return;
223+
}
163224
endpoint = endpoint.substr(0, pos) + "." + region + endpoint.substr(pos);
225+
}
164226
}
165227

166228
void URI::validateBucket(const String & bucket, const Poco::URI & uri)

src/IO/S3/URI.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ struct URI
4141

4242
static void validateBucket(const std::string & bucket, const Poco::URI & uri);
4343

44+
/// Returns true if 'region' string is an AWS S3 region
45+
/// https://docs.aws.amazon.com/general/latest/gr/s3.html
46+
static bool isAWSRegion(std::string_view region);
47+
4448
private:
4549
std::pair<std::string, std::optional<std::string>> getURIAndArchivePattern(const std::string & source);
4650
};

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ void IcebergMetadata::updateSnapshot()
487487

488488
relevant_snapshot = IcebergSnapshot{
489489
getManifestList(getProperFilePathFromMetadataInfo(
490-
snapshot->getValue<String>(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location)),
490+
snapshot->getValue<String>(MANIFEST_LIST_PATH_FIELD), configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace())),
491491
relevant_snapshot_id, total_rows, total_bytes};
492492

493493
if (!snapshot->has("schema-id"))
@@ -654,7 +654,7 @@ ManifestListPtr IcebergMetadata::getManifestList(const String & filename) const
654654
for (size_t i = 0; i < manifest_list_deserializer.rows(); ++i)
655655
{
656656
const std::string file_path = manifest_list_deserializer.getValueFromRowByName(i, MANIFEST_FILE_PATH_COLUMN, TypeIndex::String).safeGet<std::string>();
657-
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location);
657+
const auto manifest_file_name = getProperFilePathFromMetadataInfo(file_path, configuration_ptr->getPath(), table_location, configuration_ptr->getNamespace());
658658
Int64 added_sequence_number = 0;
659659
if (format_version > 1)
660660
added_sequence_number = manifest_list_deserializer.getValueFromRowByName(i, SEQUENCE_NUMBER_COLUMN, TypeIndex::Int64).safeGet<Int64>();
@@ -706,6 +706,7 @@ ManifestFilePtr IcebergMetadata::getManifestFile(const String & filename, Int64
706706
schema_processor,
707707
inherited_sequence_number,
708708
table_location,
709+
configuration_ptr->getNamespace(),
709710
context);
710711
};
711712

0 commit comments

Comments
 (0)