@@ -541,6 +541,21 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
541541 Send (Self->SelfId (), CreateChangefeedPropose (Self, txId, item));
542542 }
543543
544+ void CreateConsumers (TImportInfo::TPtr importInfo, ui32 itemIdx, TTxId txId) {
545+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
546+ auto & item = importInfo->Items .at (itemIdx);
547+ item.SubState = ESubState::Proposed;
548+
549+ LOG_I (" TImport::TTxProgress: CreateConsumers propose"
550+ << " : info# " << importInfo->ToString ()
551+ << " , item# " << item.ToString (itemIdx)
552+ << " , txId# " << txId);
553+
554+ Y_ABORT_UNLESS (item.WaitTxId == InvalidTxId);
555+
556+ Send (Self->SelfId (), CreateConsumersPropose (Self, txId, item));
557+ }
558+
544559 void AllocateTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
545560 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
546561 auto & item = importInfo->Items .at (itemIdx);
@@ -622,6 +637,26 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
622637 return path->LastTxId ;
623638 }
624639
640+ TTxId GetActiveCreateConsumerTxId (TImportInfo::TPtr importInfo, ui32 itemIdx) {
641+ Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
642+ const auto & item = importInfo->Items .at (itemIdx);
643+
644+ Y_ABORT_UNLESS (item.State == EState::CreateChangefeed);
645+ Y_ABORT_UNLESS (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateConsumers);
646+ Y_ABORT_UNLESS (item.StreamImplPathId );
647+
648+ if (!Self->PathsById .contains (item.StreamImplPathId )) {
649+ return InvalidTxId;
650+ }
651+
652+ auto path = Self->PathsById .at (item.StreamImplPathId );
653+ if (path->PathState != NKikimrSchemeOp::EPathStateAlter) {
654+ return InvalidTxId;
655+ }
656+
657+ return path->LastTxId ;
658+ }
659+
625660 static TString MakeIndexBuildUid (TImportInfo::TPtr importInfo, ui32 itemIdx) {
626661 Y_ABORT_UNLESS (itemIdx < importInfo->Items .size ());
627662 const auto & item = importInfo->Items .at (itemIdx);
@@ -816,10 +851,6 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
816851 TTxId txId = InvalidTxId;
817852
818853 switch (item.State ) {
819- case EState::CreateChangefeed:
820- txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
821- break ;
822-
823854 case EState::Transferring:
824855 if (!CancelTransferring (importInfo, itemIdx)) {
825856 txId = GetActiveRestoreTxId (importInfo, itemIdx);
@@ -1045,7 +1076,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
10451076 break ;
10461077
10471078 case EState::CreateChangefeed:
1048- CreateChangefeed (importInfo, i, txId);
1079+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1080+ CreateChangefeed (importInfo, i, txId);
1081+ } else {
1082+ CreateConsumers (importInfo, i, txId);
1083+ }
10491084 itemIdx = i;
10501085 break ;
10511086
@@ -1109,11 +1144,30 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
11091144 } else if (item.State == EState::Transferring) {
11101145 txId = GetActiveRestoreTxId (importInfo, itemIdx);
11111146 } else if (item.State == EState::CreateChangefeed) {
1112- txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
1147+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1148+ txId = GetActiveCreateChangefeedTxId (importInfo, itemIdx);
1149+ } else {
1150+ txId = GetActiveCreateConsumerTxId (importInfo, itemIdx);
1151+ }
1152+
11131153 }
11141154 }
11151155
11161156 if (txId == InvalidTxId) {
1157+
1158+ if (record.GetStatus () == NKikimrScheme::StatusAlreadyExists && item.State == EState::CreateChangefeed) {
1159+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1160+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1161+ AllocateTxId (importInfo, itemIdx);
1162+ } else if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1163+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
1164+ AllocateTxId (importInfo, itemIdx);
1165+ } else {
1166+ item.State = EState::Done;
1167+ }
1168+ return ;
1169+ }
1170+
11171171 return CancelAndPersist (db, importInfo, itemIdx, record.GetReason (), " unhappy propose" );
11181172 }
11191173
@@ -1290,7 +1344,11 @@ struct TSchemeShard::TImport::TTxProgress: public TSchemeShard::TXxport::TTxBase
12901344 break ;
12911345
12921346 case EState::CreateChangefeed:
1293- if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1347+ if (item.ChangefeedState == TImportInfo::TItem::EChangefeedState::CreateChangefeed) {
1348+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateConsumers;
1349+ AllocateTxId (importInfo, itemIdx);
1350+ } else if (++item.NextChangefeedIdx < item.Changefeeds .GetChangefeeds ().size ()) {
1351+ item.ChangefeedState = TImportInfo::TItem::EChangefeedState::CreateChangefeed;
12941352 AllocateTxId (importInfo, itemIdx);
12951353 } else {
12961354 item.State = EState::Done;
0 commit comments