Skip to content

Commit b548dd0

Browse files
list obj
1 parent d71edaa commit b548dd0

File tree

1 file changed

+59
-2
lines changed

1 file changed

+59
-2
lines changed

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,34 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5656
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/changefeed_description.pb";
5757
}
5858

59+
void ListObjects(const TString& prefix) {
60+
auto request = Model::ListObjectsRequest()
61+
.WithPrefix(prefix);
62+
63+
Send(Client, new TEvExternalStorage::TEvListObjectsRequest(request));
64+
}
65+
66+
void HandleChangefeeds(TEvExternalStorage::TEvListObjectsResponse::TPtr& ev) {
67+
const auto& result = ev.Get()->Get()->Result;
68+
69+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvListObjectResponse"
70+
<< ": self# " << SelfId()
71+
<< ", result# " << result);
72+
73+
if (!CheckResult(result, "ListObject")) {
74+
return;
75+
}
76+
TString a;
77+
78+
//Создать поле класса с ветором ключей (перед этим пофильтровать именно ченджфиды - пути до директорий)
79+
//создать индекс уже скаченных
80+
//сделать по аналогии с импортом
81+
for (const auto& x : result.GetResult().GetContents()) {
82+
x.GetKey().
83+
}
84+
85+
}
86+
5987
void HeadObject(const TString& key) {
6088
auto request = Model::HeadObjectRequest()
6189
.WithKey(key);
@@ -247,7 +275,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
247275
item.Permissions = std::move(permissions);
248276

249277
auto nextStep = [this]() {
250-
Reply();
278+
StartDonloadingChangefeeds();
251279
};
252280

253281
if (NeedValidateChecksums) {
@@ -317,12 +345,20 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
317345
TActor::PassAway();
318346
}
319347

320-
void Download(const TString& key) {
348+
void DownloadCommon() {
321349
if (Client) {
322350
Send(Client, new TEvents::TEvPoisonPill());
323351
}
324352
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
353+
}
354+
355+
void DownloadWithoutKey() {
356+
DownloadCommon();
357+
ListObjects(ImportInfo->Settings.items(ItemIdx).source_prefix());
358+
}
325359

360+
void Download(const TString& key) {
361+
DownloadCommon();
326362
HeadObject(key);
327363
}
328364

@@ -342,6 +378,10 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
342378
Download(ChecksumKey);
343379
}
344380

381+
void DownloadChangefeeds() {
382+
DownloadWithoutKey();
383+
}
384+
345385
void ResetRetries() {
346386
Attempt = 0;
347387
}
@@ -358,6 +398,12 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
358398
Become(&TThis::StateDownloadPermissions);
359399
}
360400

401+
void StartDonloadingChangefeeds() {
402+
ResetRetries();
403+
DownloadChangefeeds();
404+
Become(&TThis::StateDownloadChangefeeds);
405+
}
406+
361407
void StartValidatingChecksum(const TString& key, const TString& object, std::function<void()> checksumValidatedCallback) {
362408
ChecksumKey = NBackup::ChecksumKey(key);
363409
Checksum = NBackup::ComputeChecksum(object);
@@ -418,6 +464,17 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
418464
}
419465
}
420466

467+
STATEFN(StateDownloadChangefeeds) {
468+
switch (ev->GetTypeRewrite()) {
469+
hFunc(TEvExternalStorage::TEvListObjectsResponse, HandleChangefeeds);
470+
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChangefeeds);
471+
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChangefeeds);
472+
473+
sFunc(TEvents::TEvWakeup, DownloadChangefeeds);
474+
sFunc(TEvents::TEvPoisonPill, PassAway);
475+
}
476+
}
477+
421478
STATEFN(StateDownloadChecksum) {
422479
switch (ev->GetTypeRewrite()) {
423480
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum);

0 commit comments

Comments
 (0)