Skip to content

Commit f58dadf

Browse files
committed
Fix REPLACE INTO handling in incremental backup (ydb-platform#24427)
1 parent 219ddd7 commit f58dadf

File tree

2 files changed

+254
-15
lines changed

2 files changed

+254
-15
lines changed

ydb/core/backup/impl/change_record.h

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,19 +71,25 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
7171
auto& upsert = *record.MutableUpsert();
7272

7373
switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
74-
case NKikimrChangeExchange::TDataChange::kUpsert: {
74+
case NKikimrChangeExchange::TDataChange::kUpsert:
75+
case NKikimrChangeExchange::TDataChange::kReset: {
7576
TVector<NTable::TTag> tags;
7677
TVector<TCell> cells;
7778
NKikimrBackup::TColumnStateMap columnStateMap;
7879

79-
const auto& upsertData = ProtoBody.GetCdcDataChange().GetUpsert();
80+
// Handle both Upsert and Reset operations
81+
const bool isResetOperation = ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset;
82+
const auto& operationData = isResetOperation
83+
? ProtoBody.GetCdcDataChange().GetReset()
84+
: ProtoBody.GetCdcDataChange().GetUpsert();
85+
8086
TSerializedCellVec originalCells;
81-
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(upsertData.GetData(), originalCells));
87+
Y_ABORT_UNLESS(TSerializedCellVec::TryParse(operationData.GetData(), originalCells));
8288

83-
tags.assign(upsertData.GetTags().begin(), upsertData.GetTags().end());
89+
tags.assign(operationData.GetTags().begin(), operationData.GetTags().end());
8490
cells.assign(originalCells.GetCells().begin(), originalCells.GetCells().end());
8591

86-
THashSet<NTable::TTag> presentTags(upsertData.GetTags().begin(), upsertData.GetTags().end());
92+
THashSet<NTable::TTag> presentTags(operationData.GetTags().begin(), operationData.GetTags().end());
8793
for (const auto& [name, columnInfo] : Schema->ValueColumns) {
8894
if (name == "__ydb_incrBackupImpl_deleted" || name == "__ydb_incrBackupImpl_columnStates") {
8995
continue;
@@ -93,9 +99,9 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
9399
columnState->SetTag(columnInfo.Tag);
94100

95101
if (presentTags.contains(columnInfo.Tag)) {
96-
auto it = std::find(upsertData.GetTags().begin(), upsertData.GetTags().end(), columnInfo.Tag);
97-
if (it != upsertData.GetTags().end()) {
98-
size_t idx = std::distance(upsertData.GetTags().begin(), it);
102+
auto it = std::find(operationData.GetTags().begin(), operationData.GetTags().end(), columnInfo.Tag);
103+
if (it != operationData.GetTags().end()) {
104+
size_t idx = std::distance(operationData.GetTags().begin(), it);
99105
if (idx < originalCells.GetCells().size()) {
100106
columnState->SetIsNull(originalCells.GetCells()[idx].IsNull());
101107
} else {
@@ -106,8 +112,13 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
106112
}
107113
columnState->SetIsChanged(true);
108114
} else {
109-
columnState->SetIsNull(false);
110-
columnState->SetIsChanged(false);
115+
if (isResetOperation) {
116+
columnState->SetIsNull(true);
117+
columnState->SetIsChanged(true);
118+
} else {
119+
columnState->SetIsNull(false);
120+
columnState->SetIsChanged(false);
121+
}
111122
}
112123
}
113124

@@ -168,7 +179,6 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
168179
upsert.SetData(TSerializedCellVec::Serialize(cells));
169180
break;
170181
}
171-
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
172182
default:
173183
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
174184
}
@@ -182,27 +192,29 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
182192
record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());
183193

184194
switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
185-
case NKikimrChangeExchange::TDataChange::kUpsert: {
195+
case NKikimrChangeExchange::TDataChange::kUpsert:
196+
case NKikimrChangeExchange::TDataChange::kReset: {
186197
auto& upsert = *record.MutableUpsert();
187-
// Check if NewImage is available, otherwise fall back to Upsert
198+
// Check if NewImage is available, otherwise fall back to Upsert/Reset
188199
if (ProtoBody.GetCdcDataChange().has_newimage()) {
189200
*upsert.MutableTags() = {
190201
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().begin(),
191202
ProtoBody.GetCdcDataChange().GetNewImage().GetTags().end()};
192203
upsert.SetData(ProtoBody.GetCdcDataChange().GetNewImage().GetData());
193-
} else {
204+
} else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kUpsert) {
194205
// Fallback to Upsert field if NewImage is not available
195206
*upsert.MutableTags() = {
196207
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
197208
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
198209
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
210+
} else if (ProtoBody.GetCdcDataChange().GetRowOperationCase() == NKikimrChangeExchange::TDataChange::kReset) {
211+
Y_ABORT("Reset operation is not supported, all operations must be converted to Upsert");
199212
}
200213
break;
201214
}
202215
case NKikimrChangeExchange::TDataChange::kErase:
203216
record.MutableErase();
204217
break;
205-
case NKikimrChangeExchange::TDataChange::kReset: [[fallthrough]];
206218
default:
207219
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
208220
}

ydb/core/tx/datashard/datashard_ut_incremental_backup.cpp

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,16 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
6565
return TShardedTableOptions();
6666
}
6767

68+
TShardedTableOptions ThreeColumnTable() {
69+
TShardedTableOptions opts;
70+
opts.Columns_ = {
71+
{"key", "Uint32", true, false},
72+
{"value", "Uint32", false, false},
73+
{"extra", "Uint32", false, false}
74+
};
75+
return opts;
76+
}
77+
6878
ui64 ResolvePqTablet(TTestActorRuntime& runtime, const TActorId& sender, const TString& path, ui32 partitionId) {
6979
auto streamDesc = Ls(runtime, sender, path);
7080

@@ -169,6 +179,23 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
169179
return proto;
170180
}
171181

182+
NKikimrChangeExchange::TChangeRecord MakeReset(ui32 key, ui32 value) {
183+
auto keyCell = TCell::Make<ui32>(key);
184+
auto valueCell = TCell::Make<ui32>(value);
185+
NKikimrChangeExchange::TChangeRecord proto;
186+
187+
auto& dc = *proto.MutableCdcDataChange();
188+
auto& dcKey = *dc.MutableKey();
189+
dcKey.AddTags(1);
190+
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));
191+
192+
auto& reset = *dc.MutableReset();
193+
reset.AddTags(2);
194+
reset.SetData(TSerializedCellVec::Serialize({valueCell}));
195+
196+
return proto;
197+
}
198+
172199
NKikimrChangeExchange::TChangeRecord MakeErase(ui32 key) {
173200
auto keyCell = TCell::Make<ui32>(key);
174201
NKikimrChangeExchange::TChangeRecord proto;
@@ -182,6 +209,44 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
182209
return proto;
183210
}
184211

212+
NKikimrChangeExchange::TChangeRecord MakeUpsertPartial(ui32 key, ui32 value, const TVector<ui32>& tags = {2}) {
213+
auto keyCell = TCell::Make<ui32>(key);
214+
auto valueCell = TCell::Make<ui32>(value);
215+
NKikimrChangeExchange::TChangeRecord proto;
216+
217+
auto& dc = *proto.MutableCdcDataChange();
218+
auto& dcKey = *dc.MutableKey();
219+
dcKey.AddTags(1);
220+
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));
221+
222+
auto& upsert = *dc.MutableUpsert();
223+
for (auto tag : tags) {
224+
upsert.AddTags(tag);
225+
}
226+
upsert.SetData(TSerializedCellVec::Serialize({valueCell}));
227+
228+
return proto;
229+
}
230+
231+
NKikimrChangeExchange::TChangeRecord MakeResetPartial(ui32 key, ui32 value, const TVector<ui32>& tags = {2}) {
232+
auto keyCell = TCell::Make<ui32>(key);
233+
auto valueCell = TCell::Make<ui32>(value);
234+
NKikimrChangeExchange::TChangeRecord proto;
235+
236+
auto& dc = *proto.MutableCdcDataChange();
237+
auto& dcKey = *dc.MutableKey();
238+
dcKey.AddTags(1);
239+
dcKey.SetData(TSerializedCellVec::Serialize({keyCell}));
240+
241+
auto& reset = *dc.MutableReset();
242+
for (auto tag : tags) {
243+
reset.AddTags(tag);
244+
}
245+
reset.SetData(TSerializedCellVec::Serialize({valueCell}));
246+
247+
return proto;
248+
}
249+
185250
Y_UNIT_TEST(SimpleBackup) {
186251
TPortManager portManager;
187252
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
@@ -1918,6 +1983,168 @@ Y_UNIT_TEST_SUITE(IncrementalBackup) {
19181983
UNIT_ASSERT_C(!hasAsyncReplicaAttr, TStringBuilder() << "Incremental backup table at " << foundIncrementalBackupPath << " must NOT have __async_replica attribute");
19191984
}
19201985

1986+
Y_UNIT_TEST(ResetOperationIncrementalBackup) {
1987+
TPortManager portManager;
1988+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
1989+
.SetUseRealThreads(false)
1990+
.SetDomainName("Root")
1991+
.SetEnableChangefeedInitialScan(true)
1992+
);
1993+
1994+
auto& runtime = *server->GetRuntime();
1995+
const TActorId edgeActor = runtime.AllocateEdgeActor();
1996+
1997+
InitRoot(server, edgeActor);
1998+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
1999+
2000+
ExecSQL(server, edgeActor, R"(
2001+
UPSERT INTO `/Root/Table` (key, value) VALUES
2002+
(1, 10),
2003+
(2, 20);
2004+
)");
2005+
2006+
WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));
2007+
2008+
// Test kReset operation (REPLACE INTO)
2009+
ExecSQL(server, edgeActor, R"(
2010+
REPLACE INTO `/Root/Table` (key, value) VALUES
2011+
(1, 100),
2012+
(3, 300);
2013+
)");
2014+
2015+
WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", {
2016+
MakeReset(1, 100),
2017+
MakeReset(3, 300),
2018+
});
2019+
}
2020+
2021+
Y_UNIT_TEST(ReplaceIntoIncrementalBackup) {
2022+
TPortManager portManager;
2023+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2135), {}, DefaultPQConfig())
2024+
.SetUseRealThreads(false)
2025+
.SetDomainName("Root")
2026+
.SetEnableChangefeedInitialScan(true)
2027+
);
2028+
2029+
auto& runtime = *server->GetRuntime();
2030+
const TActorId edgeActor = runtime.AllocateEdgeActor();
2031+
2032+
InitRoot(server, edgeActor);
2033+
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());
2034+
2035+
// Insert initial data
2036+
ExecSQL(server, edgeActor, R"(
2037+
UPSERT INTO `/Root/Table` (key, value) VALUES
2038+
(1, 10),
2039+
(2, 20),
2040+
(3, 30);
2041+
)");
2042+
2043+
WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));
2044+
2045+
// Test multiple REPLACE operations
2046+
ExecSQL(server, edgeActor, R"(
2047+
REPLACE INTO `/Root/Table` (key, value) VALUES
2048+
(1, 100),
2049+
(4, 400);
2050+
)");
2051+
2052+
WaitForContent(server, edgeActor, "/Root/Table/0_continuousBackupImpl", {
2053+
MakeReset(1, 100),
2054+
MakeReset(4, 400),
2055+
});
2056+
}
2057+
2058+
Y_UNIT_TEST(ResetVsUpsertMissingColumnsTest) {
2059+
TPortManager portManager;
2060+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2136), {}, DefaultPQConfig())
2061+
.SetUseRealThreads(false)
2062+
.SetDomainName("Root")
2063+
.SetEnableChangefeedInitialScan(true)
2064+
);
2065+
2066+
auto& runtime = *server->GetRuntime();
2067+
const TActorId edgeActor = runtime.AllocateEdgeActor();
2068+
2069+
InitRoot(server, edgeActor);
2070+
CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable());
2071+
2072+
// Insert initial data with all three columns
2073+
ExecSQL(server, edgeActor, R"(
2074+
UPSERT INTO `/Root/Table` (key, value, extra) VALUES
2075+
(1, 10, 100),
2076+
(2, 20, 200);
2077+
)");
2078+
2079+
WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));
2080+
2081+
ExecSQL(server, edgeActor, R"(
2082+
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15);
2083+
)");
2084+
2085+
ExecSQL(server, edgeActor, R"(
2086+
REPLACE INTO `/Root/Table` (key, value) VALUES (2, 25);
2087+
)");
2088+
2089+
SimulateSleep(server, TDuration::Seconds(1));
2090+
2091+
auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0);
2092+
UNIT_ASSERT_VALUES_EQUAL(records.size(), 2);
2093+
2094+
// Parse the first record (Upsert)
2095+
NKikimrChangeExchange::TChangeRecord firstRecord;
2096+
UNIT_ASSERT(firstRecord.ParseFromString(records[0].second));
2097+
UNIT_ASSERT_C(firstRecord.GetCdcDataChange().HasUpsert(), "First record should be an upsert");
2098+
2099+
// Parse the second record (Reset)
2100+
NKikimrChangeExchange::TChangeRecord secondRecord;
2101+
UNIT_ASSERT(secondRecord.ParseFromString(records[1].second));
2102+
UNIT_ASSERT_C(secondRecord.GetCdcDataChange().HasReset(), "Second record should be a reset");
2103+
}
2104+
2105+
Y_UNIT_TEST(ResetVsUpsertColumnStateSerialization) {
2106+
TPortManager portManager;
2107+
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2137), {}, DefaultPQConfig())
2108+
.SetUseRealThreads(false)
2109+
.SetDomainName("Root")
2110+
.SetEnableChangefeedInitialScan(true)
2111+
);
2112+
2113+
auto& runtime = *server->GetRuntime();
2114+
const TActorId edgeActor = runtime.AllocateEdgeActor();
2115+
2116+
InitRoot(server, edgeActor);
2117+
CreateShardedTable(server, edgeActor, "/Root", "Table", ThreeColumnTable());
2118+
2119+
ExecSQL(server, edgeActor, R"(
2120+
UPSERT INTO `/Root/Table` (key, value, extra) VALUES (1, 10, 100);
2121+
)");
2122+
2123+
WaitTxNotification(server, edgeActor, AsyncCreateContinuousBackup(server, "/Root", "Table"));
2124+
2125+
ExecSQL(server, edgeActor, R"(
2126+
UPSERT INTO `/Root/Table` (key, value) VALUES (1, 15);
2127+
)");
2128+
2129+
ExecSQL(server, edgeActor, R"(
2130+
REPLACE INTO `/Root/Table` (key, value) VALUES (1, 25);
2131+
)");
2132+
2133+
SimulateSleep(server, TDuration::Seconds(2));
2134+
2135+
auto records = GetRecords(runtime, edgeActor, "/Root/Table/0_continuousBackupImpl", 0);
2136+
UNIT_ASSERT_C(records.size() >= 2, "Should have at least 2 records");
2137+
2138+
for (size_t i = 0; i < records.size(); ++i) {
2139+
NKikimrChangeExchange::TChangeRecord parsedRecord;
2140+
UNIT_ASSERT(parsedRecord.ParseFromString(records[i].second));
2141+
const auto& dataChange = parsedRecord.GetCdcDataChange();
2142+
2143+
UNIT_ASSERT_C(dataChange.HasUpsert() || dataChange.HasReset(),
2144+
"Record should be either upsert or reset operation");
2145+
}
2146+
}
2147+
19212148
} // Y_UNIT_TEST_SUITE(IncrementalBackup)
19222149

19232150
} // NKikimr

0 commit comments

Comments
 (0)