Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions ydb/core/backup/impl/local_partition_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,16 @@ class TLocalPartitionReader
Send(PQTablet, CreateGetOffsetRequest().Release());
}

void HandleInit() {
Send(PQTablet, CreateGetOffsetRequest().Release());
}

void HandleInit(TEvPersQueue::TEvResponse::TPtr& ev) {
LOG_D("Handle " << ev->Get()->ToString());
auto& record = ev->Get()->Record;
if (record.GetErrorCode() == NPersQueue::NErrorCode::INITIALIZING) {
// TODO reschedule
Y_ABORT("Unimplemented!");
Schedule(TDuration::Seconds(1), new NActors::TEvents::TEvWakeup);
return;
}
Y_VERIFY_S(record.GetErrorCode() == NPersQueue::NErrorCode::OK, "Unimplemented!");
Y_VERIFY_S(record.HasPartitionResponse() && record.GetPartitionResponse().HasCmdGetClientOffsetResult(), "Unimplemented!");
Expand All @@ -85,7 +89,7 @@ class TLocalPartitionReader
auto& read = *req.MutableCmdRead();
read.SetOffset(Offset);
read.SetClientId(OFFLOAD_ACTOR_CLIENT_ID);
read.SetTimeoutMs(0);
read.SetTimeoutMs(1000);
read.SetBytes(1_MB);

return request;
Expand Down Expand Up @@ -114,6 +118,12 @@ class TLocalPartitionReader

const auto& readResult = record.GetPartitionResponse().GetCmdReadResult();

if (!readResult.ResultSize()) {
Y_ABORT_UNLESS(PQTablet);
Send(PQTablet, CreateReadRequest().Release());
return;
}

auto gotOffset = Offset;
TVector<TEvWorker::TEvData::TRecord> records(::Reserve(readResult.ResultSize()));

Expand Down Expand Up @@ -147,6 +157,7 @@ class TLocalPartitionReader
switch (ev->GetTypeRewrite()) {
hFunc(TEvWorker::TEvHandshake, HandleInit);
hFunc(TEvPersQueue::TEvResponse, HandleInit);
sFunc(TEvents::TEvWakeup, HandleInit);
sFunc(TEvents::TEvPoison, PassAway);
default:
Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite()
Expand Down
112 changes: 95 additions & 17 deletions ydb/core/backup/impl/table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,27 @@

namespace NKikimr::NBackup::NImpl {

class TChangeRecord;

}

namespace NKikimr {

template <>
struct TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord> {
bool Restore;

TChangeRecordBuilderContextTrait(bool restore)
: Restore(restore)
{}

TChangeRecordBuilderContextTrait(const TChangeRecordBuilderContextTrait<NBackup::NImpl::TChangeRecord>& other) = default;
};

}

namespace NKikimr::NBackup::NImpl {

class TChangeRecord: public NChangeExchange::TChangeRecordBase {
friend class TChangeRecordBuilder;

Expand All @@ -30,24 +51,81 @@ class TChangeRecord: public NChangeExchange::TChangeRecordBase {
return SourceId;
}

void Serialize(NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record) const {
record.SetSourceOffset(GetOrder());
// TODO: fill WriteTxId

record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());

auto& upsert = *record.MutableUpsert();
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
}

void Serialize(
NKikimrTxDataShard::TEvApplyReplicationChanges::TChange& record,
TChangeRecordBuilderContextTrait<TChangeRecord>&) const
TChangeRecordBuilderContextTrait<TChangeRecord>& ctx) const
{
return Serialize(record);
if (!ctx.Restore) {
record.SetSourceOffset(GetOrder());
// TODO: fill WriteTxId

record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());

auto& upsert = *record.MutableUpsert();

switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
case NKikimrChangeExchange::TDataChange::kUpsert: {
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
auto it = Schema->ValueColumns.find("__incrBackupImpl_deleted");
upsert.AddTags(it->second.Tag);

TString serializedCellVec = ProtoBody.GetCdcDataChange().GetUpsert().GetData();
const TCell cell = TCell::Make<bool>(false);
Y_ABORT_UNLESS(TSerializedCellVec::UnsafeAppendCell(cell, serializedCellVec), "uh-oh");

upsert.SetData(serializedCellVec);
break;
}
case NKikimrChangeExchange::TDataChange::kErase: {
size_t size = Schema->ValueColumns.size();
TVector<NTable::TTag> tags;
TVector<TCell> cells;

tags.reserve(size);
cells.reserve(size);

for (const auto& [name, value] : Schema->ValueColumns) {
tags.push_back(value.Tag);
if (name != "__incrBackupImpl_deleted") {
cells.emplace_back();
} else {
cells.emplace_back(TCell::Make<bool>(true));
}
}

*upsert.MutableTags() = {tags.begin(), tags.end()};
upsert.SetData(TSerializedCellVec::Serialize(cells));

break;
}
case NKikimrChangeExchange::TDataChange::kReset:
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
}
} else {
record.SetSourceOffset(GetOrder());
record.SetKey(ProtoBody.GetCdcDataChange().GetKey().GetData());

switch (ProtoBody.GetCdcDataChange().GetRowOperationCase()) {
case NKikimrChangeExchange::TDataChange::kUpsert: {
auto& upsert = *record.MutableUpsert();
*upsert.MutableTags() = {
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().begin(),
ProtoBody.GetCdcDataChange().GetUpsert().GetTags().end()};
upsert.SetData(ProtoBody.GetCdcDataChange().GetUpsert().GetData());
break;
}
case NKikimrChangeExchange::TDataChange::kErase:
record.MutableErase();
break;
case NKikimrChangeExchange::TDataChange::kReset:
default:
Y_FAIL_S("Unexpected row operation: " << static_cast<int>(ProtoBody.GetCdcDataChange().GetRowOperationCase()));
}

}
}

ui64 ResolvePartitionId(NChangeExchange::IChangeSenderResolver* const resolver) const override {
Expand Down Expand Up @@ -156,8 +234,8 @@ Y_DECLARE_OUT_SPEC(inline, NKikimr::NBackup::NImpl::TChangeRecord::TPtr, out, va

namespace NKikimr::NBackup::NImpl {

IActor* CreateLocalTableWriter(const TPathId& tablePathId) {
return new NReplication::NService::TLocalTableWriter<NBackup::NImpl::TChangeRecord>(tablePathId);
IActor* CreateLocalTableWriter(const TPathId& tablePathId, bool restore) {
return new NReplication::NService::TLocalTableWriter<NBackup::NImpl::TChangeRecord>(tablePathId, restore);
}

}
2 changes: 1 addition & 1 deletion ydb/core/backup/impl/table_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ namespace NKikimr {

namespace NKikimr::NBackup::NImpl {

IActor* CreateLocalTableWriter(const TPathId& tablePathId);
IActor* CreateLocalTableWriter(const TPathId& tablePathId, bool restore = false);

}
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_describe_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDe
FillPartitioningSettings(describeTableResult, tableDescription);
FillKeyBloomFilter(describeTableResult, tableDescription);
FillReadReplicasSettings(describeTableResult, tableDescription);
FillBackupDescription(describeTableResult, tableDescription);

return ReplyWithResult(Ydb::StatusIds::SUCCESS, describeTableResult, ctx);
}
Expand Down
16 changes: 10 additions & 6 deletions ydb/core/persqueue/offload_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class TOffloadActor
const TActorId ParentTablet;
const ui32 Partition;
const NKikimrPQ::TOffloadConfig Config;
const TPathId DstPathId;

mutable TMaybe<TString> LogPrefix;
TActorId Worker;
Expand All @@ -60,24 +59,29 @@ class TOffloadActor
: ParentTablet(parentTablet)
, Partition(partition)
, Config(config)
, DstPathId(PathIdFromPathId(config.GetIncrementalBackup().GetDstPathId()))
{}

void Bootstrap() {
auto* workerActor = CreateWorker(
SelfId(),
[=]() -> IActor* { return NBackup::NImpl::CreateLocalPartitionReader(ParentTablet, Partition); },
[=]() -> IActor* { return NBackup::NImpl::CreateLocalTableWriter(DstPathId); });
[=]() -> IActor* {
if (Config.HasIncrementalBackup()) {
return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalBackup().GetDstPathId()));
} else {
return NBackup::NImpl::CreateLocalTableWriter(PathIdFromPathId(Config.GetIncrementalRestore().GetDstPathId()), true);
}
});
Worker = TActivationContext::Register(workerActor);

Become(&TOffloadActor::StateWork);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
default:
Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite()
<< " event: " << ev->ToString());
// default:
// Y_VERIFY_S(false, "Unhandled event type: " << ev->GetTypeRewrite()
// << " event: " << ev->ToString());
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/read_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,8 @@ void TPersQueueReadBalancer::CheckACL(const TEvPersQueue::TEvCheckACL::TPtr &req
};

TString user = record.HasUser() ? record.GetUser() : "";

RespondWithACL(request, NKikimrPQ::EAccess::ALLOWED, "", ctx);
return;
if (record.GetOperation() == NKikimrPQ::EOperation::READ_OP) {
if (!Consumers.contains(user)) {
RespondWithACL(request, NKikimrPQ::EAccess::DENIED, TStringBuilder() << "no read rule provided for consumer '"
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ enum ESimpleCounters {
COUNTER_IN_FLIGHT_OPS_TxCreateResourcePool = 174 [(CounterOpts) = {Name: "InFlightOps/CreateResourcePool"}];
COUNTER_IN_FLIGHT_OPS_TxDropResourcePool = 175 [(CounterOpts) = {Name: "InFlightOps/DropResourcePool"}];
COUNTER_IN_FLIGHT_OPS_TxAlterResourcePool = 176 [(CounterOpts) = {Name: "InFlightOps/AlterResourcePool"}];

COUNTER_IN_FLIGHT_OPS_TxRestoreIncrementalBackup = 177 [(CounterOpts) = {Name: "InFlightOps/RestoreIncrementalBackup"}];
}

enum ECumulativeCounters {
Expand Down Expand Up @@ -351,6 +353,8 @@ enum ECumulativeCounters {
COUNTER_FINISHED_OPS_TxCreateResourcePool = 103 [(CounterOpts) = {Name: "FinishedOps/CreateResourcePool"}];
COUNTER_FINISHED_OPS_TxDropResourcePool = 104 [(CounterOpts) = {Name: "FinishedOps/DropResourcePool"}];
COUNTER_FINISHED_OPS_TxAlterResourcePool = 105 [(CounterOpts) = {Name: "FinishedOps/AlterResourcePool"}];

COUNTER_FINISHED_OPS_TxRestoreIncrementalBackup = 106 [(CounterOpts) = {Name: "FinishedOps/RestoreIncrementalBackup"}];
}

enum EPercentileCounters {
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ message TTableReplicationConfig {
enum EReplicationMode {
REPLICATION_MODE_NONE = 0;
REPLICATION_MODE_READ_ONLY = 1;
REPLICATION_MODE_RESTORE_INCREMENTAL_BACKUP = 2;
}

enum EConsistency {
Expand Down Expand Up @@ -395,6 +396,8 @@ message TTableDescription {
optional TTableReplicationConfig ReplicationConfig = 40;

optional bool Temporary = 41;

optional bool IncrementalBackup = 42;
}

message TDictionaryEncodingSettings {
Expand Down Expand Up @@ -874,6 +877,7 @@ enum ECdcStreamMode {
ECdcStreamModeNewImage = 3;
ECdcStreamModeOldImage = 4;
ECdcStreamModeNewAndOldImages = 5;
ECdcStreamModeRestoreIncBackup = 6;
}

enum ECdcStreamFormat {
Expand Down Expand Up @@ -947,6 +951,7 @@ message TAlterContinuousBackup {
}

message TTakeIncrementalBackup {
optional string DstPath = 1;
}

oneof Action {
Expand All @@ -959,6 +964,11 @@ message TDropContinuousBackup {
optional string TableName = 1;
}

message TRestoreIncrementalBackup {
optional string SrcTableName = 1;
optional string DstTableName = 2;
}

enum EIndexType {
EIndexTypeInvalid = 0;
EIndexTypeGlobal = 1;
Expand Down Expand Up @@ -1589,6 +1599,8 @@ enum EOperationType {
ESchemeOpCreateResourcePool = 100;
ESchemeOpDropResourcePool = 101;
ESchemeOpAlterResourcePool = 102;

ESchemeOpRestoreIncrementalBackup = 103;
}

message TApplyIf {
Expand Down Expand Up @@ -1771,6 +1783,8 @@ message TModifyScheme {
optional bool AllowCreateInTempDir = 71 [default = false];

optional TResourcePoolDescription CreateResourcePool = 72;

optional TRestoreIncrementalBackup RestoreIncrementalBackup = 73;
}

message TCopySequence {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/protos/pqconfig.proto
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,13 @@ message TOffloadConfig {
optional NKikimrProto.TPathID DstPathId = 2;
}

message TIncrementalRestore {
optional NKikimrProto.TPathID DstPathId = 1;
}

oneof Strategy {
TIncrementalBackup IncrementalBackup = 1;
TIncrementalRestore IncrementalRestore = 2;
}
}

Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,7 @@ message TEvApplyReplicationChangesResult {
REASON_SCHEME_ERROR = 2;
REASON_BAD_REQUEST = 3;
REASON_UNEXPECTED_ROW_OPERATION = 4;
REASON_OUTDATED_SCHEME = 5;
}

optional EStatus Status = 1;
Expand Down
34 changes: 34 additions & 0 deletions ydb/core/scheme/scheme_tablecell.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,40 @@ namespace {
}
}


bool TSerializedCellVec::UnsafeAppendCell(const TCell& cell, TString& serializedCellVec) {
const char* buf = serializedCellVec.data();
const char* bufEnd = serializedCellVec.data() + serializedCellVec.size();

if (!serializedCellVec) {
TSerializedCellVec::Serialize(serializedCellVec, {cell});
return true;
}

if (Y_UNLIKELY(bufEnd - buf < static_cast<ptrdiff_t>(sizeof(ui16)))) {
return false;
}

ui16 cellCount = ReadUnaligned<ui16>(buf);
cellCount += 1;

serializedCellVec.ReserveAndResize(serializedCellVec.size() + sizeof(ui32) + cell.Size());

char* mutableBuf = serializedCellVec.Detach();
char* oldBufEnd = mutableBuf + (bufEnd - buf);

WriteUnaligned<ui16>(mutableBuf, cellCount);

TCellHeader header(cell.Size(), cell.IsNull());
WriteUnaligned<ui32>(oldBufEnd, header.RawValue);
oldBufEnd += sizeof(header);
if (cell.Size() > 0) {
cell.CopyDataInto(oldBufEnd);
}

return true;
}

TSerializedCellVec::TSerializedCellVec(TConstArrayRef<TCell> cells)
{
SerializeCellVec(cells, Buf, &Cells);
Expand Down
Loading