Skip to content

Commit a758fb4

Browse files
committed
Switch incremental backup cdc mode back to Update (ydb-platform#24271)
1 parent f24ce9a commit a758fb4

10 files changed

+248
-97
lines changed

ydb/core/backup/impl/change_record.h

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/core/change_exchange/change_record.h>
66
#include <ydb/core/protos/change_exchange.pb.h>
77
#include <ydb/core/protos/tx_datashard.pb.h>
8+
#include <ydb/core/protos/datashard_backup.pb.h>
89
#include <ydb/core/scheme/scheme_tablecell.h>
910
#include <ydb/core/tx/replication/service/lightweight_schema.h>
1011
#include <ydb/library/yverify_stream/yverify_stream.h>
@@ -71,58 +72,100 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
7172

7273
switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
7374
case NKikimrChangeExchange::TDataChange::kUpsert: {
74-
// Check if NewImage is available, otherwise fall back to Upsert
75-
if (ProtoBody.GetCdcDataChange().HasNewImage()) {
76-
*upsert.MutableTags() = {
77-
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(),
78-
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()};
79-
auto it = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
80-
Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation");
81-
upsert.AddTags(it->second.Tag);
82-
83-
TString serializedCellVec = ProtoBody.GetCdcDataChange().GetNewImage().GetData();
84-
Y_ABORT_UNLESS(
85-
TSerializedCellVec::UnsafeAppendCells({TCell::Make<bool>(false)}, serializedCellVec),
86-
"Invalid cell format, can't append cells");
75+
TVector<NTable::TTag> tags;
76+
TVector<TCell> cells;
77+
NKikimrBackup::TColumnStateMap columnStateMap;
78+
79+
const auto& upsertData = ProtoBody.GetCdcDataChange().GetUpsert();
80+
TSerializedCellVec originalCells;
81+
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(upsertData.GetData(), originalCells));
82+
83+
tags.assign(upsertData.GetTags().begin(), upsertData.GetTags().end());
84+
cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end());
85+
86+
THashSet<NTable::TTag> presentTags(upsertData.GetTags().begin(), upsertData.GetTags().end());
87+
for (const auto& [name, columnInfo] : Schema->ValueColumns) {
88+
if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") {
89+
continue;
90+
}
91+
92+
auto* columnState = columnStateMap.AddColumnStates();
93+
columnState->SetTag(columnInfo.Tag);
94+
95+
if (presentTags.contains(columnInfo.Tag)) {
96+
auto it = std::find(upsertData.GetTags().begin(), upsertData.GetTags().end(), columnInfo.Tag);
97+
if (it != upsertData.GetTags().end()) {
98+
size_t idx = std::distance(upsertData.GetTags().begin(), it);
99+
if (idx < originalCells.GetCells().size()) {
100+
columnState->SetIsNull(originalCells.GetCells()[idx].IsNull());
101+
} else {
102+
columnState->SetIsNull(true);
103+
}
104+
} else {
105+
columnState->SetIsNull(true);
106+
}
107+
columnState->SetIsChanged(true);
108+
} else {
109+
columnState->SetIsNull(false);
110+
columnState->SetIsChanged(false);
111+
}
112+
}
87113

88-
upsert.SetData(serializedCellVec);
89-
} else {
90-
*upsert.MutableTags() = {
91-
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
92-
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
93-
auto it = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
94-
Y_ABORT_UNLESS(it != Schema->ValueColumns.end(), "Invariant violation");
95-
upsert.AddTags(it->second.Tag);
114+
auto deletedIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
115+
Y_ABORT_UNLESS(deletedIt != Schema->ValueColumns.end(), "Invariant violation");
116+
tags.push_back(deletedIt->second.Tag);
117+
cells.emplace_back(TCell::Make<bool>(false));
96118

97-
TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData();
98-
Y_ABORT_UNLESS(
99-
TSerializedCellVec::UnsafeAppendCells({TCell::Make<bool>(false)}, serializedCellVec),
100-
"Invalid cell format, can't append cells");
119+
auto columnStatesIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_columnStates");
120+
Y_ABORT_UNLESS(columnStatesIt != Schema->ValueColumns.end(), "Invariant violation");
121+
tags.push_back(columnStatesIt->second.Tag);
122+
123+
TString serializedColumnState;
124+
Y_ABORT_UNLESS(columnStateMap.SerializeToString(&serializedColumnState));
125+
cells.emplace_back(TCell(serializedColumnState.data(), serializedColumnState.size()));
101126

102-
upsert.SetData(serializedCellVec);
103-
}
127+
*upsert.MutableTags() = {tags.begin(), tags.end()};
128+
upsert.SetData(TSerializedCellVec::Serialize(cells));
104129
break;
105130
}
106131
case NKikimrChangeExchange::TDataChange::kErase: {
107132
size_t size = Schema->ValueColumns.size();
108133
TVector<NTable::TTag> tags;
109134
TVector<TCell> cells;
135+
NKikimrBackup::TColumnStateMap columnStateMap;
110136

111137
tags.reserve(size);
112138
cells.reserve(size);
113139

114-
for (const auto& [name, value] : Schema->ValueColumns) {
115-
tags.push_back(value.Tag);
116-
if (name != "__ydb_incrBackupImpl_deleted") {
117-
cells.emplace_back();
118-
} else {
119-
cells.emplace_back(TCell::Make<bool>(true));
140+
for (const auto& [name, columnInfo] : Schema->ValueColumns) {
141+
if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") {
142+
continue;
120143
}
144+
145+
tags.push_back(columnInfo.Tag);
146+
cells.emplace_back();
147+
148+
auto* columnState = columnStateMap.AddColumnStates();
149+
columnState->SetTag(columnInfo.Tag);
150+
columnState->SetIsNull(true);
151+
columnState->SetIsChanged(true);
121152
}
122153

154+
auto deletedIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_deleted");
155+
Y_ABORT_UNLESS(deletedIt != Schema->ValueColumns.end(), "Invariant violation");
156+
tags.push_back(deletedIt->second.Tag);
157+
cells.emplace_back(TCell::Make<bool>(true));
158+
159+
auto columnStatesIt = Schema->ValueColumns.find("__ydb_incrBackupImpl_columnStates");
160+
Y_ABORT_UNLESS(columnStatesIt != Schema->ValueColumns.end(), "Invariant violation");
161+
tags.push_back(columnStatesIt->second.Tag);
162+
163+
TString serializedColumnState;
164+
Y_ABORT_UNLESS(columnStateMap.SerializeToString(&serializedColumnState));
165+
cells.emplace_back(TCell(serializedColumnState.data(), serializedColumnState.size()));
166+
123167
*upsert.MutableTags() = {tags.begin(), tags.end()};
124168
upsert.SetData(TSerializedCellVec::Serialize(cells));
125-
126169
break;
127170
}
128171
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];

ydb/core/backup/impl/table_writer_ut.cpp

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "change_record.h"
22
#include "table_writer.h"
33

4+
#include <ydb/core/protos/datashard_backup.pb.h>
45
#include <library/cpp/testing/unittest/registar.h>
56

67
namespace NKikimr::NBackup::NImpl {
@@ -19,6 +20,10 @@ Y_UNIT_TEST_SUITE(TableWriter) {
1920
.Tag = 123,
2021
.Type = NScheme::TTypeInfo{NScheme::NTypeIds::Bool},
2122
});
23+
schema->ValueColumns.emplace("__ydb_incrBackupImpl_columnStates", TLightweightSchema::TColumn{
24+
.Tag = 124,
25+
.Type = NScheme::TTypeInfo{NScheme::NTypeIds::String},
26+
});
2227

2328
{
2429
NKikimrChangeExchange::TChangeRecord changeRecord;
@@ -51,18 +56,26 @@ Y_UNIT_TEST_SUITE(TableWriter) {
5156
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
5257
record->Serialize(result, EWriterType::Backup);
5358

54-
TVector<TCell> outCells{
55-
TCell::Make<ui64>(4567),
56-
TCell::Make<bool>(false),
57-
};
58-
59-
TString out = TSerializedCellVec::Serialize(outCells);
60-
61-
UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
62-
UNIT_ASSERT(result.GetUpsert().TagsSize() == 2);
63-
UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1);
64-
UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
65-
UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData());
59+
// The serialization logic is complex, so let's just use the actual result
60+
// and verify the structure is correct by parsing it back
61+
TSerializedCellVec resultCells;
62+
UNIT_ASSERT(TSerializedCellVec::TryParse(result.GetUpsert().GetData(), resultCells));
63+
UNIT_ASSERT(resultCells.GetCells().size() == 3);
64+
65+
// Verify the first cell is the value
66+
UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[0].AsValue<ui64>(), 4567);
67+
68+
// Verify the second cell is the deleted flag
69+
UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[1].AsValue<bool>(), false);
70+
71+
// Verify the third cell contains a valid column state map
72+
NKikimrBackup::TColumnStateMap actualColumnState;
73+
TString actualSerializedColumnState(resultCells.GetCells()[2].Data(), resultCells.GetCells()[2].Size());
74+
UNIT_ASSERT(actualColumnState.ParseFromString(actualSerializedColumnState));
75+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.ColumnStatesSize(), 1);
76+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetTag(), 1);
77+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsNull(), false);
78+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsChanged(), true);
6679
}
6780

6881
{
@@ -91,18 +104,34 @@ Y_UNIT_TEST_SUITE(TableWriter) {
91104
NKikimrTxDataShard::TEvApplyReplicationChanges_TChange result;
92105
record->Serialize(result, EWriterType::Backup);
93106

94-
TVector<TCell> outCells{
95-
TCell(),
96-
TCell::Make<bool>(true),
97-
};
98-
99-
TString out = TSerializedCellVec::Serialize(outCells);
107+
// The serialization logic is complex, so let's just verify the structure
108+
// and content rather than exact binary encoding
109+
TSerializedCellVec resultCells;
110+
UNIT_ASSERT(TSerializedCellVec::TryParse(result.GetUpsert().GetData(), resultCells));
111+
UNIT_ASSERT(resultCells.GetCells().size() == 3);
112+
113+
// For erase records, the first cell should be null/empty
114+
UNIT_ASSERT(resultCells.GetCells()[0].IsNull());
115+
116+
// Verify the second cell is the deleted flag (true for erase)
117+
UNIT_ASSERT_VALUES_EQUAL(resultCells.GetCells()[1].AsValue<bool>(), true);
118+
119+
// Verify the third cell contains a valid column state map
120+
NKikimrBackup::TColumnStateMap actualColumnState;
121+
TString actualSerializedColumnState(resultCells.GetCells()[2].Data(), resultCells.GetCells()[2].Size());
122+
UNIT_ASSERT(actualColumnState.ParseFromString(actualSerializedColumnState));
123+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.ColumnStatesSize(), 1);
124+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetTag(), 1);
125+
// For erase records, all columns are changed (set to null), so IsChanged should be true
126+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsChanged(), true);
127+
// For erase records, all columns are set to null
128+
UNIT_ASSERT_VALUES_EQUAL(actualColumnState.GetColumnStates(0).GetIsNull(), true);
100129

101130
UNIT_ASSERT_VALUES_EQUAL(TSerializedCellVec::Serialize(keyCells), result.GetKey());
102-
UNIT_ASSERT(result.GetUpsert().TagsSize() == 2);
103-
UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
131+
UNIT_ASSERT(result.GetUpsert().TagsSize() == 3);
104132
UNIT_ASSERT(result.GetUpsert().GetTags(0) == 1);
105-
UNIT_ASSERT_VALUES_EQUAL(out, result.GetUpsert().GetData());
133+
UNIT_ASSERT(result.GetUpsert().GetTags(1) == 123);
134+
UNIT_ASSERT(result.GetUpsert().GetTags(2) == 124);
106135
}
107136
}
108137

ydb/core/protos/datashard_backup.proto

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,12 @@ message TChecksumState {
2323
message TS3DownloadState {
2424
optional bytes EncryptedDeserializerState = 1 [(Ydb.sensitive) = true]; // Contains secure key
2525
}
26+
27+
message TColumnStateMap {
28+
message TColumnState {
29+
optional uint32 Tag = 1;
30+
optional bool IsNull = 2;
31+
optional bool IsChanged = 3;
32+
}
33+
repeated TColumnState ColumnStates = 1;
34+
}

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
#include "datashard_ut_common_kqp.h"
22

33
#include <ydb/core/base/path.h>
4-
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
54
#include <ydb/core/change_exchange/change_sender.h>
65
#include <ydb/core/persqueue/events/global.h>
76
#include <ydb/core/persqueue/user_info.h>
87
#include <ydb/core/persqueue/write_meta.h>
8+
#include <ydb/core/protos/datashard_backup.pb.h>
9+
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
910
#include <ydb/core/tx/scheme_board/events.h>
1011
#include <ydb/core/tx/scheme_board/events_internal.h>
12+
#include <ydb/public/lib/value/value.h>
1113
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/datastreams/datastreams.h>
12-
#include <ydb/public/sdk/cpp/src/client/persqueue_public/persqueue.h>
1314
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
14-
#include <library/cpp/protobuf/json/proto2json.h>
15-
#include <ydb/public/lib/value/value.h>
15+
#include <ydb/public/sdk/cpp/src/client/persqueue_public/persqueue.h>
1616

1717
#include <library/cpp/digest/md5/md5.h>
1818
#include <library/cpp/json/json_reader.h>
1919
#include <library/cpp/json/json_writer.h>
20+
#include <library/cpp/protobuf/json/proto2json.h>
21+
#include <library/cpp/string_utils/base64/base64.h>
2022

2123
#include <util/generic/size_literals.h>
2224
#include <util/string/join.h>
@@ -159,10 +161,10 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
159161
auto& dcKey = *dc.MutableKey();
160162
dcKey.AddTags(1);
161163
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));
162-
auto& newImage = *dc.MutableNewImage();
163-
newImage.AddTags(2);
164-
newImage.SetData(TSerializedCellVec::Serialize({valueCell}));
165-
dc.MutableUpsert();
164+
165+
auto& upsert = *dc.MutableUpsert();
166+
upsert.AddTags(2);
167+
upsert.SetData(TSerializedCellVec::Serialize({valueCell}));
166168

167169
return proto;
168170
}
@@ -744,23 +746,24 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
744746
.Columns({
745747
{"key", "Uint32", true, false},
746748
{"value", "Uint32", false, false},
747-
{"__ydb_incrBackupImpl_deleted", "Bool", false, false}});
749+
{"__ydb_incrBackupImpl_deleted", "Bool", false, false},
750+
{"__ydb_incrBackupImpl_columnStates", "String", false, false}});
748751

749752
CreateShardedTable(server, edgeActor, "/Root/.backups/collections/MyCollection/19700101000002Z_incremental", "Table", opts);
750753

751754
ExecSQL(server, edgeActor, R"(
752-
UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted) VALUES
753-
(2, 200, NULL)
754-
, (1, NULL, true)
755+
UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000002Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted, __ydb_incrBackupImpl_columnStates) VALUES
756+
(2, 200, NULL, NULL)
757+
, (1, NULL, true, NULL)
755758
;
756759
)");
757760

758761
CreateShardedTable(server, edgeActor, "/Root/.backups/collections/MyCollection/19700101000003Z_incremental", "Table", opts);
759762

760763
ExecSQL(server, edgeActor, R"(
761-
UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000003Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted) VALUES
762-
(2, 2000, NULL)
763-
, (5, NULL, true)
764+
UPSERT INTO `/Root/.backups/collections/MyCollection/19700101000003Z_incremental/Table` (key, value, __ydb_incrBackupImpl_deleted, __ydb_incrBackupImpl_columnStates) VALUES
765+
(2, 2000, NULL, NULL)
766+
, (5, NULL, true, NULL)
764767
;
765768
)");
766769
}

0 commit comments

Comments
 (0)