Skip to content

Commit d71edaa

Browse files
CreateChangefeedsPropose
1 parent 57ea3d5 commit d71edaa

File tree

3 files changed

+35
-13
lines changed

3 files changed

+35
-13
lines changed

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,20 @@
88
namespace NKikimr {
99
namespace NSchemeShard {
1010

11-
void CreateChangefeedsPropose(THolder<TEvSchemeShard::TEvModifySchemeTransaction>& propose, const TImportInfo::TItem& item) {
11+
bool CreateChangefeedsPropose(THolder<TEvSchemeShard::TEvModifySchemeTransaction>& propose, const TImportInfo::TItem& item, TString& error) {
1212
auto& record = propose->Record;
1313
const auto& changefeeds = item.Changefeeds;
1414

1515
for (const auto& changefeed : changefeeds) {
1616
auto& modifyScheme = *record.AddTransaction();
17-
const auto& cdcStream = modifyScheme.MutableCreateCdcStream();
18-
cdcStream->MutableTableName();
17+
auto& cdcStream = *modifyScheme.MutableCreateCdcStream();
18+
Ydb::StatusIds::StatusCode status;
19+
auto& cdcStreamDescription = *cdcStream.MutableStreamDescription();
20+
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
21+
return false;
22+
}
1923
}
20-
24+
return true;
2125
}
2226

2327
THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
@@ -85,6 +89,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CreateTablePropose(
8589
return nullptr;
8690
}
8791

92+
CreateChangefeedsPropose(propose, item, error);
93+
8894
return propose;
8995
}
9096

ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
5151
return errorType == S3Errors::RESOURCE_NOT_FOUND || errorType == S3Errors::NO_SUCH_KEY;
5252
}
5353

54+
static TString ChangefeedKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx, const TString& changefeedName) {
55+
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
56+
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/" << changefeedName << "/changefeed_description.pb";
57+
}
58+
5459
void HeadObject(const TString& key) {
5560
auto request = Model::HeadObjectRequest()
5661
.WithKey(key);
@@ -432,6 +437,7 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
432437
const TString MetadataKey;
433438
TString SchemeKey;
434439
const TString PermissionsKey;
440+
const TString ChangefeedKey;
435441

436442
const ui32 Retries;
437443
ui32 Attempt = 0;

ydb/core/ydb_convert/table_description.cpp

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,8 +1200,9 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
12001200
}
12011201
}
12021202

1203-
bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
1204-
const Ydb::Table::Changefeed& in, Ydb::StatusIds::StatusCode& status, TString& error) {
1203+
template <typename T>
1204+
bool FillChangefeedDescriptionCommon(NKikimrSchemeOp::TCdcStreamDescription& out,
1205+
const T& in, Ydb::StatusIds::StatusCode& status, TString& error) {
12051206

12061207
out.SetName(in.name());
12071208
out.SetVirtualTimestamps(in.virtual_timestamps());
@@ -1241,6 +1242,17 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
12411242
return false;
12421243
}
12431244

1245+
for (const auto& [key, value] : in.attributes()) {
1246+
auto& attr = *out.AddUserAttributes();
1247+
attr.SetKey(key);
1248+
attr.SetValue(value);
1249+
}
1250+
1251+
return true;
1252+
}
1253+
1254+
bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
1255+
const Ydb::Table::Changefeed& in, Ydb::StatusIds::StatusCode& status, TString& error) {
12441256
if (in.initial_scan()) {
12451257
if (!AppData()->FeatureFlags.GetEnableChangefeedInitialScan()) {
12461258
status = Ydb::StatusIds::UNSUPPORTED;
@@ -1249,14 +1261,12 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
12491261
}
12501262
out.SetState(NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan);
12511263
}
1264+
return FillChangefeedDescriptionCommon(out, in, status, error);
1265+
}
12521266

1253-
for (const auto& [key, value] : in.attributes()) {
1254-
auto& attr = *out.AddUserAttributes();
1255-
attr.SetKey(key);
1256-
attr.SetValue(value);
1257-
}
1258-
1259-
return true;
1267+
bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
1268+
const Ydb::Table::ChangefeedDescription& in, Ydb::StatusIds::StatusCode& status, TString& error) {
1269+
return FillChangefeedDescriptionCommon(out, in, status, error);
12601270
}
12611271

12621272
void FillTableStats(Ydb::Table::DescribeTableResult& out,

0 commit comments

Comments
 (0)