|
12 | 12 | #include "http_state.hpp" |
13 | 13 |
|
14 | 14 | #include "duckdb/common/string_util.hpp" |
| 15 | +#include "duckdb/common/crypto/md5.hpp" |
| 16 | +#include "duckdb/common/types/blob.hpp" |
15 | 17 | #include "duckdb/function/scalar/string_common.hpp" |
16 | 18 | #include "duckdb/main/secret/secret_manager.hpp" |
17 | 19 | #include "duckdb/storage/buffer_manager.hpp" |
@@ -1028,21 +1030,114 @@ void S3FileSystem::RemoveFile(const string &path, optional_ptr<FileOpener> opene |
1028 | 1030 | } |
1029 | 1031 | } |
1030 | 1032 |
|
| 1033 | +// Forward declaration for FindTagContents (defined later in file) |
| 1034 | +optional_idx FindTagContents(const string &response, const string &tag, idx_t cur_pos, string &result); |
| 1035 | + |
| 1036 | +void S3FileSystem::RemoveFiles(const vector<string> &paths, optional_ptr<FileOpener> opener) { |
| 1037 | + if (paths.empty()) { |
| 1038 | + return; |
| 1039 | + } |
| 1040 | + |
| 1041 | + struct BucketUrlInfo { |
| 1042 | + string prefix; |
| 1043 | + string http_proto; |
| 1044 | + string host; |
| 1045 | + string path; |
| 1046 | + S3AuthParams auth_params; |
| 1047 | + }; |
| 1048 | + |
| 1049 | + unordered_map<string, vector<string>> keys_by_bucket; |
| 1050 | + unordered_map<string, BucketUrlInfo> url_info_by_bucket; |
| 1051 | + |
| 1052 | + for (auto &path : paths) { |
| 1053 | + FileOpenerInfo info = {path}; |
| 1054 | + S3AuthParams auth_params = S3AuthParams::ReadFrom(opener, info); |
| 1055 | + auto parsed_url = S3UrlParse(path, auth_params); |
| 1056 | + ReadQueryParams(parsed_url.query_param, auth_params); |
| 1057 | + |
| 1058 | + const string &bucket = parsed_url.bucket; |
| 1059 | + if (keys_by_bucket.find(bucket) == keys_by_bucket.end()) { |
| 1060 | + string bucket_path = parsed_url.path.substr(0, parsed_url.path.length() - parsed_url.key.length() - 1); |
| 1061 | + if (bucket_path.empty()) { |
| 1062 | + bucket_path = "/"; |
| 1063 | + } |
| 1064 | + url_info_by_bucket[bucket] = {parsed_url.prefix, parsed_url.http_proto, parsed_url.host, bucket_path, |
| 1065 | + auth_params}; |
| 1066 | + } |
| 1067 | + |
| 1068 | + keys_by_bucket[bucket].push_back(parsed_url.key); |
| 1069 | + } |
| 1070 | + |
| 1071 | + constexpr idx_t MAX_KEYS_PER_REQUEST = 1000; |
| 1072 | + |
| 1073 | + for (auto &bucket_entry : keys_by_bucket) { |
| 1074 | + const string &bucket = bucket_entry.first; |
| 1075 | + const vector<string> &keys = bucket_entry.second; |
| 1076 | + const auto &url_info = url_info_by_bucket[bucket]; |
| 1077 | + |
| 1078 | + for (idx_t batch_start = 0; batch_start < keys.size(); batch_start += MAX_KEYS_PER_REQUEST) { |
| 1079 | + idx_t batch_end = MinValue<idx_t>(batch_start + MAX_KEYS_PER_REQUEST, keys.size()); |
| 1080 | + |
| 1081 | + std::stringstream xml_body; |
| 1082 | + xml_body << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"; |
| 1083 | + xml_body << "<Delete xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">"; |
| 1084 | + |
| 1085 | + for (idx_t i = batch_start; i < batch_end; i++) { |
| 1086 | + xml_body << "<Object><Key>" << keys[i] << "</Key></Object>"; |
| 1087 | + } |
| 1088 | + |
| 1089 | + xml_body << "<Quiet>true</Quiet>"; |
| 1090 | + xml_body << "</Delete>"; |
| 1091 | + |
| 1092 | + string body = xml_body.str(); |
| 1093 | + |
| 1094 | + MD5Context md5_context; |
| 1095 | + md5_context.Add(body); |
| 1096 | + data_t md5_hash[MD5Context::MD5_HASH_LENGTH_BINARY]; |
| 1097 | + md5_context.Finish(md5_hash); |
| 1098 | + |
| 1099 | + string_t md5_blob(const_char_ptr_cast(md5_hash), MD5Context::MD5_HASH_LENGTH_BINARY); |
| 1100 | + string content_md5 = Blob::ToBase64(md5_blob); |
| 1101 | + |
| 1102 | + const string http_query_param_for_sig = "delete="; |
| 1103 | + const string http_query_param_for_url = "delete"; |
| 1104 | + auto payload_hash = GetPayloadHash(const_cast<char *>(body.data()), body.length()); |
| 1105 | + |
| 1106 | + auto headers = CreateS3Header(url_info.path, http_query_param_for_sig, url_info.host, "s3", "POST", |
| 1107 | + url_info.auth_params, "", "", payload_hash, ""); |
| 1108 | + headers["Content-MD5"] = content_md5; |
| 1109 | + headers["Content-Type"] = "application/xml"; |
| 1110 | + |
| 1111 | + string http_url = url_info.http_proto + url_info.host + S3FileSystem::UrlEncode(url_info.path) + "?" + |
| 1112 | + http_query_param_for_url; |
| 1113 | + string bucket_url = url_info.prefix + bucket + "/"; |
| 1114 | + auto handle = OpenFile(bucket_url, FileFlags::FILE_FLAGS_READ, opener); |
| 1115 | + |
| 1116 | + string result; |
| 1117 | + auto res = HTTPFileSystem::PostRequest(*handle, http_url, headers, result, const_cast<char *>(body.data()), |
| 1118 | + body.length()); |
| 1119 | + |
| 1120 | + if (res->status != HTTPStatusCode::OK_200) { |
| 1121 | + throw IOException("Failed to remove files: HTTP %d (%s)\n%s", static_cast<int>(res->status), |
| 1122 | + res->GetError(), result); |
| 1123 | + } |
| 1124 | + |
| 1125 | + idx_t cur_pos = 0; |
| 1126 | + string error_content; |
| 1127 | + auto error_pos = FindTagContents(result, "Error", cur_pos, error_content); |
| 1128 | + if (error_pos.IsValid()) { |
| 1129 | + throw IOException("Failed to remove files: %s", error_content); |
| 1130 | + } |
| 1131 | + } |
| 1132 | + } |
| 1133 | +} |
| 1134 | + |
1031 | 1135 | void S3FileSystem::RemoveDirectory(const string &path, optional_ptr<FileOpener> opener) { |
| 1136 | + vector<string> files_to_remove; |
1032 | 1137 | ListFiles( |
1033 | | - path, |
1034 | | - [&](const string &file, bool is_dir) { |
1035 | | - try { |
1036 | | - this->RemoveFile(file, opener); |
1037 | | - } catch (IOException &e) { |
1038 | | - string errmsg(e.what()); |
1039 | | - if (errmsg.find("No such file or directory") != std::string::npos) { |
1040 | | - return; |
1041 | | - } |
1042 | | - throw; |
1043 | | - } |
1044 | | - }, |
1045 | | - opener.get()); |
| 1138 | + path, [&](const string &file, bool is_dir) { files_to_remove.push_back(file); }, opener.get()); |
| 1139 | + |
| 1140 | + RemoveFiles(files_to_remove, opener); |
1046 | 1141 | } |
1047 | 1142 |
|
1048 | 1143 | void S3FileSystem::FileSync(FileHandle &handle) { |
@@ -1137,7 +1232,7 @@ struct S3GlobResult : public LazyMultiFileList { |
1137 | 1232 | }; |
1138 | 1233 |
|
1139 | 1234 | S3GlobResult::S3GlobResult(S3FileSystem &fs, const string &glob_pattern_p, optional_ptr<FileOpener> opener) |
1140 | | - : glob_pattern(glob_pattern_p), opener(opener) { |
| 1235 | + : LazyMultiFileList(FileOpener::TryGetClientContext(opener)), glob_pattern(glob_pattern_p), opener(opener) { |
1141 | 1236 | if (!opener) { |
1142 | 1237 | throw InternalException("Cannot S3 Glob without FileOpener"); |
1143 | 1238 | } |
|
0 commit comments