Skip to content

Commit e5ec35d

Browse files
committed
try to fix bugs
1 parent 0c29ed0 commit e5ec35d

File tree

1 file changed

+68
-5
lines changed

1 file changed

+68
-5
lines changed

src/Storages/ObjectStorage/Utils.cpp

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
#if USE_AWS_S3
1717
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
1818
#endif
19+
#if USE_HDFS
20+
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
21+
#endif
1922

2023

2124
namespace DB
@@ -315,7 +318,17 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
315318
{
316319
try
317320
{
318-
S3::URI s3_uri(path);
321+
// Normalize s3a:// and s3n:// to s3:// for S3::URI parsing
322+
std::string normalized_path = path;
323+
if (target.scheme == "s3a" || target.scheme == "s3n")
324+
{
325+
normalized_path = "s3://" + target.authority + "/" + target.key;
326+
}
327+
S3::URI s3_uri(normalized_path);
328+
329+
// Use the key from SchemeAuthorityKey parsing to ensure we have the full path
330+
// S3::URI might extract it differently, so we trust SchemeAuthorityKey which already parsed it correctly
331+
std::string key_to_use = target.key;
319332

320333
// Check if base storage is S3 and matches the path's bucket/endpoint
321334
bool use_base_storage = false;
@@ -345,7 +358,13 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
345358
// Also check if table_location is provided and matches
346359
if (!use_base_storage && (base_norm == "s3" || base_norm == "https" || base_norm == "http"))
347360
{
348-
S3::URI base_s3_uri(table_location);
361+
// Normalize s3a:// and s3n:// in table_location to s3:// for S3::URI parsing
362+
std::string normalized_table_location = table_location;
363+
if (base.scheme == "s3a" || base.scheme == "s3n")
364+
{
365+
normalized_table_location = "s3://" + base.authority + "/" + base.key;
366+
}
367+
S3::URI base_s3_uri(normalized_table_location);
349368

350369
// For s3:// URIs, match by bucket only. For explicit http(s):// URIs, require both bucket and endpoint.
351370
bool bucket_matches = (s3_uri.bucket == base_s3_uri.bucket);
@@ -359,12 +378,12 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
359378
}
360379

361380
if (use_base_storage)
362-
return {base_storage, s3_uri.key};
381+
return {base_storage, key_to_use};
363382

364383
const std::string cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint);
365384

366385
if (auto it = secondary_storages.find(cache_key); it != secondary_storages.end())
367-
return {it->second, s3_uri.key};
386+
return {it->second, key_to_use};
368387

369388
/// TODO: maybe do not invent new configuration. Use old one and clean up later
370389
Poco::AutoPtr<Poco::Util::MapConfiguration> cfg(new Poco::Util::MapConfiguration);
@@ -410,7 +429,7 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
410429
DB::ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true);
411430

412431
secondary_storages.emplace(cache_key, storage);
413-
return {storage, s3_uri.key};
432+
return {storage, key_to_use};
414433
}
415434
catch (const Exception &) // NOLINT
416435
{
@@ -419,6 +438,50 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath(
419438
}
420439
#endif
421440

441+
// For HDFS URIs, check if we can reuse base storage
442+
if (target_norm == "hdfs")
443+
{
444+
// Check if base storage is HDFS and matches the path's endpoint
445+
bool use_base_storage = false;
446+
if (base_storage->getType() == ObjectStorageType::HDFS)
447+
{
448+
#if USE_HDFS
449+
if (auto hdfs_storage = std::dynamic_pointer_cast<HDFSObjectStorage>(base_storage))
450+
{
451+
const std::string base_url = hdfs_storage->getDescription();
452+
// Extract endpoint from base URL (hdfs://namenode:port/path -> hdfs://namenode:port)
453+
std::string base_endpoint;
454+
if (auto pos = base_url.find('/', base_url.find("//") + 2); pos != std::string::npos)
455+
base_endpoint = base_url.substr(0, pos);
456+
else
457+
base_endpoint = base_url;
458+
459+
// For HDFS, compare endpoints (namenode addresses)
460+
std::string target_endpoint = target_norm + "://" + target.authority;
461+
462+
if (base_endpoint == target_endpoint)
463+
{
464+
use_base_storage = true;
465+
}
466+
}
467+
#endif
468+
}
469+
470+
// Also check if table_location is provided and matches
471+
if (!use_base_storage && base_norm == "hdfs")
472+
{
473+
std::string base_endpoint = base_norm + "://" + base.authority;
474+
std::string target_endpoint = target_norm + "://" + target.authority;
475+
476+
if (base_endpoint == target_endpoint)
477+
{
478+
use_base_storage = true;
479+
}
480+
}
481+
482+
if (use_base_storage)
483+
return {base_storage, target.key};
484+
}
422485

423486
// Reuse base storage if scheme and authority (bucket) matches
424487
if (base_norm == target_norm && base.authority == target.authority)

0 commit comments

Comments
 (0)