1010#include < src/client/common_client/impl/client.h>
1111#include < ydb-cpp-sdk/client/proto/accessor.h>
1212
13+ #include < util/string/join.h>
14+
1315namespace NYdb ::inline V3 {
1416namespace NImport {
1517
@@ -34,6 +36,25 @@ std::vector<TImportItemProgress> ItemsProgressFromProto(const google::protobuf::
3436 return result;
3537}
3638
39+ template <class TS3SettingsProto , class TSettings >
40+ void FillS3Settings (TS3SettingsProto& proto, const TSettings& settings) {
41+ proto.set_endpoint (TStringType{settings.Endpoint_ });
42+ proto.set_scheme (TProtoAccessor::GetProto<ImportFromS3Settings>(settings.Scheme_ ));
43+ proto.set_bucket (TStringType{settings.Bucket_ });
44+ proto.set_access_key (TStringType{settings.AccessKey_ });
45+ proto.set_secret_key (TStringType{settings.SecretKey_ });
46+
47+ if (settings.NumberOfRetries_ ) {
48+ proto.set_number_of_retries (settings.NumberOfRetries_ .value ());
49+ }
50+
51+ if (settings.SymmetricKey_ ) {
52+ proto.mutable_encryption_settings ()->mutable_symmetric_key ()->set_key (*settings.SymmetricKey_ );
53+ }
54+
55+ proto.set_disable_virtual_addressing (!settings.UseVirtualAddressing_ );
56+ }
57+
3758} // anonymous
3859
3960// / S3
@@ -66,6 +87,36 @@ const TImportFromS3Response::TMetadata& TImportFromS3Response::Metadata() const
6687 return Metadata_;
6788}
6889
90+ TListObjectsInS3ExportResult::TListObjectsInS3ExportResult (TStatus&& status, const ::Ydb::Import::ListObjectsInS3ExportResult& proto)
91+ : TStatus(std::move(status))
92+ {
93+ Items_.reserve (proto.items_size ());
94+ for (const auto & item : proto.items ()) {
95+ Items_.emplace_back (TItem{
96+ .Prefix = item.prefix (),
97+ .Path = item.path ()
98+ });
99+ }
100+ NextPageToken_ = proto.next_page_token ();
101+ }
102+
103+ const std::vector<TListObjectsInS3ExportResult::TItem>& TListObjectsInS3ExportResult::GetItems () const {
104+ return Items_;
105+ }
106+
107+ void TListObjectsInS3ExportResult::Out (IOutputStream& out) const {
108+ if (IsSuccess ()) {
109+ out << " { items: [" << JoinSeq (" , " , Items_) << " ], next_page_token: \" " << NextPageToken_ << " \" }" ;
110+ } else {
111+ return TStatus::Out (out);
112+ }
113+ }
114+
115+ void TListObjectsInS3ExportResult::TItem::Out (IOutputStream& out) const {
116+ out << " { prefix: \" " << Prefix << " \" "
117+ << " , path: \" " << Path << " \" }" ;
118+ }
119+
69120// / Data
70121TImportDataResult::TImportDataResult (TStatus&& status)
71122 : TStatus(std::move(status))
@@ -80,13 +131,37 @@ class TImportClient::TImpl : public TClientImplCommon<TImportClient::TImpl> {
80131 {
81132 }
82133
83- TFuture<TImportFromS3Response> ImportFromS3 (ImportFromS3Request&& request, const TImportFromS3Settings& settings) {
134+ TAsyncImportFromS3Response ImportFromS3 (ImportFromS3Request&& request, const TImportFromS3Settings& settings) {
84135 return RunOperation<V1::ImportService, ImportFromS3Request, ImportFromS3Response, TImportFromS3Response>(
85136 std::move (request),
86137 &V1::ImportService::Stub::AsyncImportFromS3,
87138 TRpcRequestSettings::Make (settings));
88139 }
89140
141+ TAsyncListObjectsInS3ExportResult ListObjectsInS3Export (ListObjectsInS3ExportRequest&& request, const TListObjectsInS3ExportSettings& settings) {
142+ auto promise = NThreading::NewPromise<TListObjectsInS3ExportResult>();
143+
144+ auto extractor = [promise]
145+ (google::protobuf::Any* any, TPlainStatus status) mutable {
146+ ListObjectsInS3ExportResult result;
147+ if (any) {
148+ any->UnpackTo (&result);
149+ }
150+
151+ promise.SetValue (TListObjectsInS3ExportResult (TStatus (std::move (status)), result));
152+ };
153+
154+ Connections_->RunDeferred <V1::ImportService, ListObjectsInS3ExportRequest, ListObjectsInS3ExportResponse>(
155+ std::move (request),
156+ extractor,
157+ &V1::ImportService::Stub::AsyncListObjectsInS3Export,
158+ DbDriverState_,
159+ INITIAL_DEFERRED_CALL_DELAY,
160+ TRpcRequestSettings::Make (settings));
161+
162+ return promise.GetFuture ();
163+ }
164+
90165 template <typename TSettings>
91166 TAsyncImportDataResult ImportData (ImportDataRequest&& request, const TSettings& settings) {
92167 auto promise = NThreading::NewPromise<TImportDataResult>();
@@ -131,22 +206,18 @@ TImportClient::TImportClient(const TDriver& driver, const TCommonClientSettings&
131206{
132207}
133208
134- TFuture<TImportFromS3Response> TImportClient::ImportFromS3 (const TImportFromS3Settings& settings) {
209+ TAsyncImportFromS3Response TImportClient::ImportFromS3 (const TImportFromS3Settings& settings) {
135210 auto request = MakeOperationRequest<ImportFromS3Request>(settings);
136-
137- request.mutable_settings ()->set_endpoint (TStringType{settings.Endpoint_ });
138- request.mutable_settings ()->set_scheme (TProtoAccessor::GetProto<ImportFromS3Settings>(settings.Scheme_ ));
139- request.mutable_settings ()->set_bucket (TStringType{settings.Bucket_ });
140- request.mutable_settings ()->set_access_key (TStringType{settings.AccessKey_ });
141- request.mutable_settings ()->set_secret_key (TStringType{settings.SecretKey_ });
211+ Ydb::Import::ImportFromS3Settings& settingsProto = *request.mutable_settings ();
212+ FillS3Settings (settingsProto, settings);
142213
143214 for (const auto & item : settings.Item_ ) {
144215 if (!item.Src .empty () && !item.SrcPath .empty ()) {
145216 throw TContractViolation (
146217 TStringBuilder () << " Invalid item: both source prefix and source path are set: \" " << item.Src << " \" and \" " << item.SrcPath << " \" " );
147218 }
148219
149- auto & protoItem = *request. mutable_settings ()-> mutable_items ()->Add ();
220+ auto & protoItem = *settingsProto. mutable_items ()->Add ();
150221 if (!item.Src .empty ()) {
151222 protoItem.set_source_prefix (item.Src );
152223 }
@@ -157,32 +228,47 @@ TFuture<TImportFromS3Response> TImportClient::ImportFromS3(const TImportFromS3Se
157228 }
158229
159230 if (settings.Description_ ) {
160- request.mutable_settings ()->set_description (TStringType{settings.Description_ .value ()});
161- }
162-
163- if (settings.NumberOfRetries_ ) {
164- request.mutable_settings ()->set_number_of_retries (settings.NumberOfRetries_ .value ());
231+ settingsProto.set_description (TStringType{settings.Description_ .value ()});
165232 }
166233
167234 if (settings.NoACL_ ) {
168- request. mutable_settings ()-> set_no_acl (settings.NoACL_ .value ());
235+ settingsProto. set_no_acl (settings.NoACL_ .value ());
169236 }
170237
171238 if (settings.SourcePrefix_ ) {
172- request. mutable_settings ()-> set_source_prefix (settings.SourcePrefix_ .value ());
239+ settingsProto. set_source_prefix (settings.SourcePrefix_ .value ());
173240 }
174241
175242 if (settings.DestinationPath_ ) {
176- request. mutable_settings ()-> set_destination_path (settings.DestinationPath_ .value ());
243+ settingsProto. set_destination_path (settings.DestinationPath_ .value ());
177244 }
178245
179- if (settings.SymmetricKey_ ) {
180- request.mutable_settings ()->mutable_encryption_settings ()->mutable_symmetric_key ()->set_key (*settings.SymmetricKey_ );
246+ return Impl_->ImportFromS3 (std::move (request), settings);
247+ }
248+
249+ TAsyncListObjectsInS3ExportResult TImportClient::ListObjectsInS3Export (const TListObjectsInS3ExportSettings& settings, std::int64_t pageSize, const std::string& pageToken) {
250+ auto request = MakeOperationRequest<ListObjectsInS3ExportRequest>(settings);
251+ Ydb::Import::ListObjectsInS3ExportSettings& settingsProto = *request.mutable_settings ();
252+ FillS3Settings (settingsProto, settings);
253+
254+ if (settings.Prefix_ ) {
255+ settingsProto.set_prefix (settings.Prefix_ .value ());
181256 }
182257
183- request.mutable_settings ()->set_disable_virtual_addressing (!settings.UseVirtualAddressing_ );
258+ for (const auto & item : settings.Item_ ) {
259+ if (item.Path .empty ()) {
260+ throw TContractViolation (
261+ TStringBuilder () << " Invalid item: path is not set" );
262+ }
184263
185- return Impl_->ImportFromS3 (std::move (request), settings);
264+ settingsProto.add_items ()->set_path (item.Path );
265+ }
266+
267+ // Paging
268+ request.set_page_size (pageSize);
269+ request.set_page_token (pageToken);
270+
271+ return Impl_->ListObjectsInS3Export (std::move (request), settings);
186272}
187273
188274TAsyncImportDataResult TImportClient::ImportData (const std::string& table, std::string&& data, const TImportYdbDumpDataSettings& settings) {
0 commit comments