@@ -526,6 +526,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
526526 return true ;
527527 }
528528
529+ void CreateChangefeed (TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
530+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
531+ auto & item = importInfo->Items .at (itemIdx);
532+ item.SubState = ESubState::Proposed;
533+
534+ LOG_I (" TImport::TTxProgress: CreateChangefeed propose"
535+ << " : info# " << importInfo->ToString ()
536+ << " , item# " << item.ToString (itemIdx)
537+ << " , txId# " << txId);
538+
539+ Y_ABORT_UNLESS (item.WaitTxId == InvalidTxId);
540+
541+ Send (Self->SelfId (), CreateChangefeedPropose (Self, txId, item));
542+ }
543+
529544 void AllocateTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
530545 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
531546 auto & item = importInfo->Items .at (itemIdx);
@@ -588,6 +603,25 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
588603 return TTxId (ui64 ((*infoPtr)->Id ));
589604 }
590605
606+ TTxId GetActiveCreateChangefeedTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
607+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
608+ const auto & item = importInfo->Items .at (itemIdx);
609+
610+ Y_ABORT_UNLESS (item.State == EState::CreateChangefeed);
611+ Y_ABORT_UNLESS (item.DstPathId );
612+
613+ if (!Self->PathsById .contains (item.DstPathId )) {
614+ return InvalidTxId;
615+ }
616+
617+ auto path = Self->PathsById .at (item.DstPathId );
618+ if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
619+ return InvalidTxId;
620+ }
621+
622+ return path->LastTxId ;
623+ }
624+
591625 static TString MakeIndexBuildUid (TImportInfo::TPtr importInfo, ui32 itemIdx) {
592626 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
593627 const auto & item = importInfo->Items .at (itemIdx);
@@ -756,6 +790,7 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
756790 case EState::CreateSchemeObject:
757791 case EState::Transferring:
758792 case EState::BuildIndexes:
793+ case EState::CreateChangefeed:
759794 if (item.WaitTxId == InvalidTxId) {
760795 if (!IsCreatedByQuery (item) || item.PreparedCreationQuery ) {
761796 AllocateTxId (importInfo, itemIdx);
@@ -781,6 +816,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
781816 TTxId txId = InvalidTxId;
782817
783818 switch (item.State ) {
819+ case EState::CreateChangefeed:
820+ txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
821+ break ;
822+
784823 case EState::Transferring:
785824 if (!CancelTransferring (importInfo, itemIdx)) {
786825 txId = GetActiveRestoreTxId (importInfo, itemIdx);
@@ -1004,6 +1043,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10041043 BuildIndex (importInfo, i, txId);
10051044 itemIdx = i;
10061045 break ;
1046+
1047+ case EState::CreateChangefeed:
1048+ CreateChangefeed (importInfo, i, txId);
1049+ itemIdx = i;
1050+ break ;
10071051
10081052 default :
10091053 break ;
@@ -1064,6 +1108,8 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10641108 txId = TTxId (record.GetPathCreateTxId ());
10651109 } else if (item.State == EState::Transferring) {
10661110 txId = GetActiveRestoreTxId (importInfo, itemIdx);
1111+ } else if (item.State == EState::CreateChangefeed) {
1112+ txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
10671113 }
10681114 }
10691115
@@ -1216,6 +1262,10 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12161262 if (item.NextIndexIdx < item.Scheme .indexes_size ()) {
12171263 item.State = EState::BuildIndexes;
12181264 AllocateTxId (importInfo, itemIdx);
1265+ } else if (item.NextChangefeedIdx < item.Changefeeds .changefeeds_size () &&
1266+ AppData ()->FeatureFlags .GetEnableChangefeedsImport ()) {
1267+ item.State = EState::CreateChangefeed;
1268+ AllocateTxId (importInfo, itemIdx);
12191269 } else {
12201270 item.State = EState::Done;
12211271 }
@@ -1229,11 +1279,23 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12291279 } else {
12301280 if (++item.NextIndexIdx < item.Scheme .indexes_size ()) {
12311281 AllocateTxId (importInfo, itemIdx);
1282+ } else if (item.NextChangefeedIdx < item.Changefeeds .changefeeds_size () &&
1283+ AppData ()->FeatureFlags .GetEnableChangefeedsImport ()) {
1284+ item.State = EState::CreateChangefeed;
1285+ AllocateTxId (importInfo, itemIdx);
12321286 } else {
12331287 item.State = EState::Done;
12341288 }
12351289 }
12361290 break ;
1291+
1292+ case EState::CreateChangefeed:
1293+ if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1294+ AllocateTxId (importInfo, itemIdx);
1295+ } else {
1296+ item.State = EState::Done;
1297+ }
1298+ break ;
12371299
12381300 default :
12391301 return SendNotificationsIfFinished (importInfo);
0 commit comments