Skip to content

Commit 3844535

Browse files
added topics
1 parent 5cfaac1 commit 3844535

File tree

4 files changed

+18
-6
lines changed

4 files changed

+18
-6
lines changed

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4464,13 +4464,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
44644464
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
44654465
}
44664466

4467-
if (rowset.HaveValue<Schema::ImportItems::Changefeeds>()) {
4467+
if (rowset.HaveValue<Schema::ImportItems::Changefeeds>() && rowset.HaveValue<Schema::ImportItems::Topics>()) {
44684468
const ui64 count = rowset.GetValue<Schema::ImportItems::Changefeeds>().size();
4469-
TVector<Ydb::Table::ChangefeedDescription> changefeeds(count);
4469+
TVector<TChangefeedImportDescriptions> changefeeds;
4470+
changefeeds.reserve(count);
44704471
for (ui64 i = 0; i < count; ++i) {
4471-
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(changefeeds[i], rowset.GetValue<Schema::ImportItems::Changefeeds>())[i]);
4472+
Ydb::Table::ChangefeedDescription changefeed;
4473+
Ydb::Topic::DescribeTopicResult topic;
4474+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(changefeed, rowset.GetValue<Schema::ImportItems::Changefeeds>())[i]);
4475+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(topic, rowset.GetValue<Schema::ImportItems::Topics>())[i]);
4476+
changefeeds.emplace_back(changefeed, topic);
44724477
}
4473-
item.Changefeeds = changefeeds;
4478+
item.Changefeeds = std::move(changefeeds);
44744479
}
44754480

44764481
item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());

ydb/core/tx/schemeshard/schemeshard_import.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,21 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
191191
);
192192
const ui64 count = item.Changefeeds.size();
193193
TVector<TString> jsonChangefeeds;
194+
TVector<TString> jsonTopics;
194195
jsonChangefeeds.reserve(count);
196+
jsonTopics.reserve(count);
195197

196-
for (const auto& changefeed : item.Changefeeds) {
198+
for (const auto& [changefeed, topic] : item.Changefeeds) {
197199
jsonChangefeeds.push_back(changefeed.SerializeAsString());
200+
jsonTopics.push_back(topic.SerializeAsString());
198201
}
199202

200203
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
201204
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(jsonChangefeeds)
202205
);
206+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
207+
NIceDb::TUpdate<Schema::ImportItems::Topics>(jsonTopics)
208+
);
203209
}
204210

205211
void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {

ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ bool CreateChangefeedsPropose(THolder<TEvSchemeShard::TEvModifySchemeTransaction
2020
if (!FillChangefeedDescription(cdcStreamDescription, changefeed, status, error)) {
2121
return false;
2222
}
23-
cdcStreamDescription.
23+
cdcStream.SetRetentionPeriodSeconds(topic.Getretention_period().seconds());
2424
}
2525
return true;
2626
}

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,6 +1563,7 @@ struct Schema : NIceDb::Schema {
15631563
struct Permissions : Column<11, NScheme::NTypeIds::String> {};
15641564
struct Metadata : Column<12, NScheme::NTypeIds::String> {};
15651565
struct Changefeeds : Column<13, NScheme::NTypeIds::Json> {};
1566+
struct Topics : Column<14, NScheme::NTypeIds::Json> {};
15661567

15671568
struct State : Column<7, NScheme::NTypeIds::Byte> {};
15681569
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };

0 commit comments

Comments
 (0)