Skip to content

Commit ab6d0d9

Browse files
import_scheme_getter is complere
1 parent b548dd0 commit ab6d0d9

File tree

3 files changed

+140
-17
lines changed

3 files changed

+140
-17
lines changed

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ bool CreateChangefeedsPropose(THolder<TEvSchemeShard::TEvModifySchemeTransaction
1212
auto& record = propose->Record;
1313
const auto& changefeeds = item.Changefeeds;
1414

15-
for (const auto& changefeed : changefeeds) {
15+
for (const auto& [changefeed, topic]: changefeeds) {
1616
auto& modifyScheme = *record.AddTransaction();
1717
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();
1818
Ydb::StatusIds::StatusCode status;
1919
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
2020
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
2121
return false;
2222
}
23+
cdcStreamDescription.
2324
}
2425
return true;
2526
}

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 132 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,12 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5151
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
5252
}
5353

54-
static TString ChangefeedKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) {
55-
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
56-
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/changefeed_description.pb";
54+
static TString ChangefeedDescriptionKey(const TString& changefeedPrefix) {
55+
return TStringBuilder() << changefeedPrefix << "/changefeed_description.pb";
56+
}
57+
58+
static TString TopicDescriptionKey(const TString& changefeedPrefix) {
59+
return TStringBuilder() << changefeedPrefix << "/topic_description.pb";
5760
}
5861

5962
void ListObjects(const TString& prefix) {
@@ -73,13 +76,21 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
7376
if (!CheckResult(result, "ListObject")) {
7477
return;
7578
}
76-
TString a;
7779

78-
//Создать поле класса с ветором ключей (перед этим пофильтровать именно ченджфиды - пути до директорий)
79-
//создать индекс уже скаченных
80-
//сделать по аналогии с импортом
81-
for (const auto& x : result.GetResult().GetContents()) {
82-
x.GetKey().
80+
const auto& objects = result.GetResult().GetContents();
81+
ChangefeedsKeys.reserve(objects.size());
82+
83+
for (const auto& obj : objects) {
84+
const TFsPath& path = obj.GetKey();
85+
if (path.GetName() == "changefeed_description.pb") {
86+
ChangefeedsKeys.push_back(path.Dirname());
87+
}
88+
}
89+
90+
if (!ChangefeedsKeys.empty()) {
91+
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[0]));
92+
} else {
93+
Reply();
8394
}
8495

8596
}
@@ -161,6 +172,36 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
161172
GetObject(ChecksumKey, std::make_pair(0, contentLength - 1));
162173
}
163174

175+
void HandleChangefeeds(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
176+
const auto& result = ev->Get()->Result;
177+
178+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvHeadObjectResponse"
179+
<< ": self# " << SelfId()
180+
<< ", result# " << result);
181+
182+
if (!CheckResult(result, "HeadObject")) {
183+
return;
184+
}
185+
186+
const auto contentLength = result.GetResult().GetContentLength();
187+
GetObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
188+
}
189+
190+
void HandleTopics(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
191+
const auto& result = ev->Get()->Result;
192+
193+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvHeadObjectResponse"
194+
<< ": self# " << SelfId()
195+
<< ", result# " << result);
196+
197+
if (!CheckResult(result, "HeadObject")) {
198+
return;
199+
}
200+
201+
const auto contentLength = result.GetResult().GetContentLength();
202+
GetObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), std::make_pair(0, contentLength - 1));
203+
}
204+
164205
void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
165206
auto request = Model::GetObjectRequest()
166207
.WithKey(key)
@@ -238,7 +279,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
238279
if (NeedDownloadPermissions) {
239280
StartDownloadingPermissions();
240281
} else {
241-
Reply();
282+
StartDownloadingChangefeeds();
242283
}
243284
};
244285

@@ -275,7 +316,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
275316
item.Permissions = std::move(permissions);
276317

277318
auto nextStep = [this]() {
278-
StartDonloadingChangefeeds();
319+
StartDownloadingChangefeeds();
279320
};
280321

281322
if (NeedValidateChecksums) {
@@ -307,6 +348,82 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
307348
ChecksumValidatedCallback();
308349
}
309350

351+
void HandleChangefeed(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
352+
const auto& msg = *ev->Get();
353+
const auto& result = msg.Result;
354+
355+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvGetObjectResponse"
356+
<< ": self# " << SelfId()
357+
<< ", result# " << result);
358+
359+
if (!CheckResult(result, "GetObject")) {
360+
return;
361+
}
362+
363+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
364+
auto& item = ImportInfo->Items.at(ItemIdx);
365+
366+
LOG_T("Trying to parse changefeed"
367+
<< ": self# " << SelfId()
368+
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
369+
370+
Ydb::Table::ChangefeedDescription changefeed;
371+
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &changefeed)) {
372+
return Reply(false, "Cannot parse permissions");
373+
}
374+
item.Changefeeds[IndexDownloadedChangefeed].ChangefeedDescription = std::move(changefeed);
375+
376+
auto nextStep = [this]() {
377+
HeadObject(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
378+
};
379+
380+
if (NeedValidateChecksums) {
381+
StartValidatingChecksum(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
382+
} else {
383+
nextStep();
384+
}
385+
}
386+
387+
void HandleTopic(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
388+
const auto& msg = *ev->Get();
389+
const auto& result = msg.Result;
390+
391+
LOG_D("HandleChangefeeds TEvExternalStorage::TEvGetObjectResponse"
392+
<< ": self# " << SelfId()
393+
<< ", result# " << result);
394+
395+
if (!CheckResult(result, "GetObject")) {
396+
return;
397+
}
398+
399+
Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
400+
auto& item = ImportInfo->Items.at(ItemIdx);
401+
402+
LOG_T("Trying to parse changefeed"
403+
<< ": self# " << SelfId()
404+
<< ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
405+
406+
Ydb::Topic::DescribeTopicResult topic;
407+
if (!google::protobuf::TextFormat::ParseFromString(msg.Body, &topic)) {
408+
return Reply(false, "Cannot parse permissions");
409+
}
410+
item.Changefeeds[IndexDownloadedChangefeed].Topic = std::move(topic);
411+
412+
auto nextStep = [this]() {
413+
if (++IndexDownloadedChangefeed == ChangefeedsKeys.size()) {
414+
Reply();
415+
} else {
416+
HeadObject(ChangefeedDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]));
417+
}
418+
};
419+
420+
if (NeedValidateChecksums) {
421+
StartValidatingChecksum(TopicDescriptionKey(ChangefeedsKeys[IndexDownloadedChangefeed]), msg.Body, nextStep);
422+
} else {
423+
nextStep();
424+
}
425+
}
426+
310427
template <typename TResult>
311428
bool CheckResult(const TResult& result, const TStringBuf marker) {
312429
if (result.IsSuccess()) {
@@ -398,7 +515,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
398515
Become(&TThis::StateDownloadPermissions);
399516
}
400517

401-
void StartDonloadingChangefeeds() {
518+
void StartDownloadingChangefeeds() {
402519
ResetRetries();
403520
DownloadChangefeeds();
404521
Become(&TThis::StateDownloadChangefeeds);
@@ -468,7 +585,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
468585
switch (ev->GetTypeRewrite()) {
469586
hFunc(TEvExternalStorage::TEvListObjectsResponse, HandleChangefeeds);
470587
hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChangefeeds);
471-
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChangefeeds);
588+
hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChangefeed);
472589

473590
sFunc(TEvents::TEvWakeup, DownloadChangefeeds);
474591
sFunc(TEvents::TEvPoisonPill, PassAway);
@@ -494,7 +611,8 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
494611
const TString MetadataKey;
495612
TString SchemeKey;
496613
const TString PermissionsKey;
497-
const TString ChangefeedKey;
614+
TVector<TString> ChangefeedsKeys;
615+
ui64 IndexDownloadedChangefeed = 0;
498616

499617
const ui32 Retries;
500618
ui32 Attempt = 0;

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2837,6 +2837,11 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28372837
S3 = 0,
28382838
};
28392839

2840+
struct TChangefeedImportDescriptions {
2841+
Ydb::Table::ChangefeedDescription ChangefeedDescription;
2842+
Ydb::Topic::DescribeTopicResult Topic;
2843+
};
2844+
28402845
struct TItem {
28412846
enum class ESubState: ui8 {
28422847
AllocateTxId = 0,
@@ -2850,9 +2855,8 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28502855
TString CreationQuery;
28512856
TMaybe<NKikimrSchemeOp::TModifyScheme> PreparedCreationQuery;
28522857
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
2853-
TVector<Ydb::Table::ChangefeedDescription> Changefeeds;
28542858
NBackup::TMetadata Metadata;
2855-
TVector<Ydb::Table::ChangefeedDescription> Changefeeds;
2859+
TVector<TChangefeedImportDescriptions> Changefeeds;
28562860

28572861
EState State = EState::GetScheme;
28582862
ESubState SubState = ESubState::AllocateTxId;

0 commit comments

Comments
 (0)