|
3 | 3 | #include <Disks/ObjectStorages/ObjectStorageFactory.h> |
4 | 4 | #include <Storages/ObjectStorage/StorageObjectStorage.h> |
5 | 5 | #include <Poco/Util/MapConfiguration.h> |
| 6 | +#include <IO/S3/URI.h> |
| 7 | + |
6 | 8 |
|
7 | 9 | namespace DB |
8 | 10 | { |
@@ -282,6 +284,76 @@ std::pair<DB::ObjectStoragePtr, std::string> resolveObjectStorageForPath( |
282 | 284 | const std::string base_norm = normalizeSchema(base.scheme); |
283 | 285 | const std::string target_norm = normalizeSchema(target.scheme); |
284 | 286 |
|
| 287 | + // For S3 URIs, use S3::URI to properly URLs: https://s3.amazonaws.com/..... |
| 288 | + #if USE_AWS_S3 |
| 289 | + if (target_norm == "s3" || target_norm == "https" || target_norm == "http") |
| 290 | + { |
| 291 | + try |
| 292 | + { |
| 293 | + S3::URI s3_uri(path); |
| 294 | + |
| 295 | + if (base_norm == "s3" || base_norm == "https" || base_norm == "http") |
| 296 | + { |
| 297 | + S3::URI base_s3_uri(table_location); |
| 298 | + |
| 299 | + if (s3_uri.bucket == base_s3_uri.bucket && s3_uri.endpoint == base_s3_uri.endpoint) |
| 300 | + { |
| 301 | + if (base_storage) |
| 302 | + std::cerr << "\nSame S3 location, returning base, which is: " << base_storage->getName() << ", " << base_storage->getObjectsNamespace() << "\n"; |
| 303 | + else |
| 304 | + std::cerr << "\nSame S3 location, returning base, which is: NULL\n"; |
| 305 | + return {base_storage, s3_uri.key}; |
| 306 | + } |
| 307 | + else |
| 308 | + { |
| 309 | + std::cerr << "\nS3 storages are different:\n" << base_s3_uri.bucket << " @ " << base_s3_uri.endpoint |
| 310 | + << " vs " << s3_uri.bucket << " @ " << s3_uri.endpoint << "\n"; |
| 311 | + } |
| 312 | + } |
| 313 | + |
| 314 | + const std::string cache_key = "s3://" + s3_uri.bucket + "@" + (s3_uri.endpoint.empty() ? "amazonaws.com" : s3_uri.endpoint); |
| 315 | + |
| 316 | + if (auto it = secondary_storages.find(cache_key); it != secondary_storages.end()) |
| 317 | + { |
| 318 | + if (it->second) |
| 319 | + std::cerr << "\nfound in cache: " << it->second->getName() << ", " << it->second->getObjectsNamespace() << "\n"; |
| 320 | + else |
| 321 | + std::cerr << "\nfound in cache: NULL\n"; |
| 322 | + return {it->second, s3_uri.key}; |
| 323 | + } |
| 324 | + |
| 325 | + /// TODO: maybe do not invent new configuration. Use old one and clean up later |
| 326 | + Poco::AutoPtr<Poco::Util::MapConfiguration> cfg(new Poco::Util::MapConfiguration); |
| 327 | + |
| 328 | + const std::string config_prefix = "object_storages." + cache_key; |
| 329 | + |
| 330 | + cfg->setString(config_prefix + ".object_storage_type", "s3"); |
| 331 | + |
| 332 | + // Use the full endpoint or construct it from bucket |
| 333 | + std::string endpoint = s3_uri.endpoint.empty() |
| 334 | + ? ("https://" + s3_uri.bucket + ".s3.amazonaws.com") |
| 335 | + : s3_uri.endpoint; |
| 336 | + cfg->setString(config_prefix + ".endpoint", endpoint); |
| 337 | + |
| 338 | + auto & factory = DB::ObjectStorageFactory::instance(); |
| 339 | + |
| 340 | + DB::ObjectStoragePtr storage = factory.create(cache_key, *cfg, config_prefix, context, /*skip_access_check*/ true); |
| 341 | + |
| 342 | + secondary_storages.emplace(cache_key, storage); |
| 343 | + if (storage) |
| 344 | + std::cerr << "\ncreated new S3 storage: " << storage->getName() << ", " << storage->getObjectsNamespace() << ":\ndescr: " << storage->getDescription() << "\n"; |
| 345 | + else |
| 346 | + std::cerr << "\ncreated new S3 storage: it is NULL\n"; |
| 347 | + return {storage, s3_uri.key}; |
| 348 | + } |
| 349 | + catch (...) |
| 350 | + { |
| 351 | + // If S3::URI parsing fails, fall back to the old logic |
| 352 | + } |
| 353 | + } |
| 354 | + #endif |
| 355 | + |
| 356 | + |
285 | 357 | // Reuse base storage if scheme and authority (bucket) matches |
286 | 358 | if (base_norm == target_norm && base.authority == target.authority) |
287 | 359 | { |
|
0 commit comments