Skip to content

Commit e666b18

Browse files
start
1 parent 0ba28e4 commit e666b18

File tree

5 files changed

+23
-0
lines changed

5 files changed

+23
-0
lines changed

ydb/core/tablet_flat/flat_cxx_database.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Date32> { typedef i32 Ty
238238
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Datetime64> { typedef i64 Type; };
239239
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Timestamp64> { typedef i64 Type; };
240240
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Interval64> { typedef i64 Type; };
241+
template <> struct NSchemeTypeMapper<NScheme::NTypeIds::Json> { typedef TVector<TString> Type; };
241242

242243
/// only for compatibility with old code
243244
template <NScheme::TTypeId ValType>

ydb/core/tx/schemeshard/schemeshard__init.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4464,6 +4464,15 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
44644464
item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
44654465
}
44664466

4467+
if (rowset.HaveValue<Schema::ImportItems::Changefeeds>()) {
4468+
const ui64 count = rowset.GetValue<Schema::ImportItems::Changefeeds>().size();
4469+
TVector<Ydb::Table::ChangefeedDescription> changefeeds(count);
4470+
for (ui64 i = 0; i < count; ++i) {
4471+
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(changefeeds[i], rowset.GetValue<Schema::ImportItems::Changefeeds>())[i]);
4472+
}
4473+
item.Changefeeds = changefeeds;
4474+
}
4475+
44674476
item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
44684477
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
44694478
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);

ydb/core/tx/schemeshard/schemeshard_import.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,17 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
189189
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
190190
NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
191191
);
192+
const ui64 count = item.Changefeeds.size();
193+
TVector<TString> jsonChangefeeds;
194+
jsonChangefeeds.reserve(count);
195+
196+
for (const auto& changefeed : item.Changefeeds) {
197+
jsonChangefeeds.push_back(changefeed.SerializeAsString());
198+
}
199+
200+
db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
201+
NIceDb::TUpdate<Schema::ImportItems::Changefeeds>(jsonChangefeeds)
202+
);
192203
}
193204

194205
void TSchemeShard::PersistImportItemPreparedCreationQuery(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2851,6 +2851,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
28512851
TMaybe<NKikimrSchemeOp::TModifyScheme> PreparedCreationQuery;
28522852
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
28532853
NBackup::TMetadata Metadata;
2854+
TVector<Ydb::Table::ChangefeedDescription> Changefeeds;
28542855

28552856
EState State = EState::GetScheme;
28562857
ESubState SubState = ESubState::AllocateTxId;

ydb/core/tx/schemeshard/schemeshard_schema.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1562,6 +1562,7 @@ struct Schema : NIceDb::Schema {
15621562
struct PreparedCreationQuery : Column<14, NScheme::NTypeIds::String> {};
15631563
struct Permissions : Column<11, NScheme::NTypeIds::String> {};
15641564
struct Metadata : Column<12, NScheme::NTypeIds::String> {};
1565+
struct Changefeeds : Column<13, NScheme::NTypeIds::Json> {};
15651566

15661567
struct State : Column<7, NScheme::NTypeIds::Byte> {};
15671568
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };

0 commit comments

Comments
 (0)