55#include < ydb/core/change_exchange/change_record.h>
66#include < ydb/core/protos/change_exchange.pb.h>
77#include < ydb/core/protos/tx_datashard.pb.h>
8+ #include < ydb/core/protos/datashard_backup.pb.h>
89#include < ydb/core/scheme/scheme_tablecell.h>
910#include < ydb/core/tx/replication/service/lightweight_schema.h>
1011#include < ydb/library/yverify_stream/yverify_stream.h>
@@ -70,62 +71,114 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
7071 auto & upsert = *record.MutableUpsert ();
7172
7273 switch (ProtoBody.GetCdcDataChange ().GetRowOperationCase ()) {
73- case NKikimrChangeExchange::TDataChange::kUpsert : {
74- // Check if NewImage is available, otherwise fall back to Upsert
75- if (ProtoBody.GetCdcDataChange ().HasNewImage ()) {
76- *upsert.MutableTags () = {
77- ProtoBody.GetCdcDataChange ().GetNewImage ().GetTags ().begin (),
78- ProtoBody.GetCdcDataChange ().GetNewImage ().GetTags ().end ()};
79- auto it = Schema->ValueColumns .find (" __ydb_incrBackupImpl_deleted" );
80- Y_ABORT_UNLESS (it != Schema->ValueColumns .end (), " Invariant violation" );
81- upsert.AddTags (it->second .Tag );
82-
83- TString serializedCellVec = ProtoBody.GetCdcDataChange ().GetNewImage ().GetData ();
84- Y_ABORT_UNLESS (
85- TSerializedCellVec::UnsafeAppendCells ({TCell::Make<bool >(false )}, serializedCellVec),
86- " Invalid cell format, can't append cells" );
74+ case NKikimrChangeExchange::TDataChange::kUpsert :
75+ case NKikimrChangeExchange::TDataChange::kReset : {
76+ TVector<NTable::TTag> tags;
77+ TVector<TCell> cells;
78+ NKikimrBackup::TColumnStateMap columnStateMap;
79+
80+ // Handle both Upsert and Reset operations
81+ const bool isResetOperation = ProtoBody.GetCdcDataChange ().GetRowOperationCase () == NKikimrChangeExchange::TDataChange::kReset ;
82+ const auto & operationData = isResetOperation
83+ ? ProtoBody.GetCdcDataChange ().GetReset ()
84+ : ProtoBody.GetCdcDataChange ().GetUpsert ();
85+
86+ TSerializedCellVec originalCells;
87+ Y_ABORT_UNLESS (TSerializedCellVec::TryParse (operationData.GetData (), originalCells));
88+
89+ tags.assign (operationData.GetTags ().begin (), operationData.GetTags ().end ());
90+ cells.assign (originalCells.GetCells ().begin (), originalCells.GetCells ().end ());
91+
92+ THashSet<NTable::TTag> presentTags (operationData.GetTags ().begin (), operationData.GetTags ().end ());
93+ for (const auto & [name, columnInfo] : Schema->ValueColumns ) {
94+ if (name == " __ydb_incrBackupImpl_deleted" || name == " __ydb_incrBackupImpl_columnStates" ) {
95+ continue ;
96+ }
97+
98+ auto * columnState = columnStateMap.AddColumnStates ();
99+ columnState->SetTag (columnInfo.Tag );
100+
101+ if (presentTags.contains (columnInfo.Tag )) {
102+ auto it = std::find (operationData.GetTags ().begin (), operationData.GetTags ().end (), columnInfo.Tag );
103+ if (it != operationData.GetTags ().end ()) {
104+ size_t idx = std::distance (operationData.GetTags ().begin (), it);
105+ if (idx < originalCells.GetCells ().size ()) {
106+ columnState->SetIsNull (originalCells.GetCells ()[idx].IsNull ());
107+ } else {
108+ columnState->SetIsNull (true );
109+ }
110+ } else {
111+ columnState->SetIsNull (true );
112+ }
113+ columnState->SetIsChanged (true );
114+ } else {
115+ if (isResetOperation) {
116+ columnState->SetIsNull (true );
117+ columnState->SetIsChanged (true );
118+ } else {
119+ columnState->SetIsNull (false );
120+ columnState->SetIsChanged (false );
121+ }
122+ }
123+ }
87124
88- upsert.SetData (serializedCellVec);
89- } else {
90- *upsert.MutableTags () = {
91- ProtoBody.GetCdcDataChange ().GetUpsert ().GetTags ().begin (),
92- ProtoBody.GetCdcDataChange ().GetUpsert ().GetTags ().end ()};
93- auto it = Schema->ValueColumns .find (" __ydb_incrBackupImpl_deleted" );
94- Y_ABORT_UNLESS (it != Schema->ValueColumns .end (), " Invariant violation" );
95- upsert.AddTags (it->second .Tag );
125+ auto deletedIt = Schema->ValueColumns .find (" __ydb_incrBackupImpl_deleted" );
126+ Y_ABORT_UNLESS (deletedIt != Schema->ValueColumns .end (), " Invariant violation" );
127+ tags.push_back (deletedIt->second .Tag );
128+ cells.emplace_back (TCell::Make<bool >(false ));
96129
97- TString serializedCellVec = ProtoBody.GetCdcDataChange ().GetUpsert ().GetData ();
98- Y_ABORT_UNLESS (
99- TSerializedCellVec::UnsafeAppendCells ({TCell::Make<bool >(false )}, serializedCellVec),
100- " Invalid cell format, can't append cells" );
130+ auto columnStatesIt = Schema->ValueColumns .find (" __ydb_incrBackupImpl_columnStates" );
131+ Y_ABORT_UNLESS (columnStatesIt != Schema->ValueColumns .end (), " Invariant violation" );
132+ tags.push_back (columnStatesIt->second .Tag );
133+
134+ TString serializedColumnState;
135+ Y_ABORT_UNLESS (columnStateMap.SerializeToString (&serializedColumnState));
136+ cells.emplace_back (TCell (serializedColumnState.data (), serializedColumnState.size ()));
101137
102- upsert. SetData (serializedCellVec) ;
103- }
138+ *upsert. MutableTags () = {tags. begin (), tags. end ()} ;
139+ upsert. SetData ( TSerializedCellVec::Serialize (cells));
104140 break ;
105141 }
106142 case NKikimrChangeExchange::TDataChange::kErase : {
107143 size_t size = Schema->ValueColumns .size ();
108144 TVector<NTable::TTag> tags;
109145 TVector<TCell> cells;
146+ NKikimrBackup::TColumnStateMap columnStateMap;
110147
111148 tags.reserve (size);
112149 cells.reserve (size);
113150
114- for (const auto & [name, value] : Schema->ValueColumns ) {
115- tags.push_back (value.Tag );
116- if (name != " __ydb_incrBackupImpl_deleted" ) {
117- cells.emplace_back ();
118- } else {
119- cells.emplace_back (TCell::Make<bool >(true ));
151+ for (const auto & [name, columnInfo] : Schema->ValueColumns ) {
152+ if (name == " __ydb_incrBackupImpl_deleted" || name == " __ydb_incrBackupImpl_columnStates" ) {
153+ continue ;
120154 }
155+
156+ tags.push_back (columnInfo.Tag );
157+ cells.emplace_back ();
158+
159+ auto * columnState = columnStateMap.AddColumnStates ();
160+ columnState->SetTag (columnInfo.Tag );
161+ columnState->SetIsNull (true );
162+ columnState->SetIsChanged (true );
121163 }
122164
165+ auto deletedIt = Schema->ValueColumns .find (" __ydb_incrBackupImpl_deleted" );
166+ Y_ABORT_UNLESS (deletedIt != Schema->ValueColumns .end (), " Invariant violation" );
167+ tags.push_back (deletedIt->second .Tag );
168+ cells.emplace_back (TCell::Make<bool >(true ));
169+
170+ auto columnStatesIt = Schema->ValueColumns .find (" __ydb_incrBackupImpl_columnStates" );
171+ Y_ABORT_UNLESS (columnStatesIt != Schema->ValueColumns .end (), " Invariant violation" );
172+ tags.push_back (columnStatesIt->second .Tag );
173+
174+ TString serializedColumnState;
175+ Y_ABORT_UNLESS (columnStateMap.SerializeToString (&serializedColumnState));
176+ cells.emplace_back (TCell (serializedColumnState.data (), serializedColumnState.size ()));
177+
123178 *upsert.MutableTags () = {tags.begin (), tags.end ()};
124179 upsert.SetData (TSerializedCellVec::Serialize (cells));
125-
126180 break ;
127181 }
128- case NKikimrChangeExchange::TDataChange::kReset : [[fallthrough]];
129182 default :
130183 Y_FAIL_S (" Unexpected row operation: " << static_cast <int >(ProtoBody.GetCdcDataChange ().GetRowOperationCase ()));
131184 }
@@ -139,27 +192,29 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
139192 record.SetKey (ProtoBody.GetCdcDataChange ().GetKey ().GetData ());
140193
141194 switch (ProtoBody.GetCdcDataChange ().GetRowOperationCase ()) {
142- case NKikimrChangeExchange::TDataChange::kUpsert : {
195+ case NKikimrChangeExchange::TDataChange::kUpsert :
196+ case NKikimrChangeExchange::TDataChange::kReset : {
143197 auto & upsert = *record.MutableUpsert ();
144- // Check if NewImage is available, otherwise fall back to Upsert
198+ // Check if NewImage is available, otherwise fall back to Upsert/Reset
145199 if (ProtoBody.GetCdcDataChange ().has_newimage ()) {
146200 *upsert.MutableTags () = {
147201 ProtoBody.GetCdcDataChange ().GetNewImage ().GetTags ().begin (),
148202 ProtoBody.GetCdcDataChange ().GetNewImage ().GetTags ().end ()};
149203 upsert.SetData (ProtoBody.GetCdcDataChange ().GetNewImage ().GetData ());
150- } else {
204+ } else if (ProtoBody. GetCdcDataChange (). GetRowOperationCase () == NKikimrChangeExchange::TDataChange:: kUpsert ) {
151205 // Fallback to Upsert field if NewImage is not available
152206 *upsert.MutableTags () = {
153207 ProtoBody.GetCdcDataChange ().GetUpsert ().GetTags ().begin (),
154208 ProtoBody.GetCdcDataChange ().GetUpsert ().GetTags ().end ()};
155209 upsert.SetData (ProtoBody.GetCdcDataChange ().GetUpsert ().GetData ());
210+ } else if (ProtoBody.GetCdcDataChange ().GetRowOperationCase () == NKikimrChangeExchange::TDataChange::kReset ) {
211+ Y_ABORT (" Reset operation is not supported, all operations must be converted to Upsert" );
156212 }
157213 break ;
158214 }
159215 case NKikimrChangeExchange::TDataChange::kErase :
160216 record.MutableErase ();
161217 break ;
162- case NKikimrChangeExchange::TDataChange::kReset : [[fallthrough]];
163218 default :
164219 Y_FAIL_S (" Unexpected row operation: " << static_cast <int >(ProtoBody.GetCdcDataChange ().GetRowOperationCase ()));
165220 }
0 commit comments