Skip to content
1 change: 1 addition & 0 deletions ydb/core/protos/counters_schemeshard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -655,4 +655,5 @@ enum ETxTypes {
TXTYPE_LOGIN_FINALIZE = 100 [(TxTypeOpts) = {Name: "TxLoginFinalize"}];

TXTYPE_PROGRESS_INCREMENTAL_RESTORE = 101 [(TxTypeOpts) = {Name: "TxProgressIncrementalRestore"}];
TXTYPE_INCREMENTAL_RESTORE_RESPONSE = 102 [(TxTypeOpts) = {Name: "TxIncrementalRestoreResponse"}];
}
35 changes: 35 additions & 0 deletions ydb/core/protos/tx_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2385,3 +2385,38 @@ message TEvForceDataCleanupResult {
optional uint64 TabletId = 2;
optional EStatus Status = 3;
}

message TEvRestoreMultipleIncrementalBackups {
optional uint64 TxId = 1;
optional NKikimrProto.TPathID PathId = 2; // Table being restored

message TIncrementalBackup {
optional string BackupPath = 1;
optional uint64 BackupStep = 2;
optional uint64 BackupTxId = 3;
optional string BackupTrimmedName = 4;
}

repeated TIncrementalBackup IncrementalBackups = 3;
}

message TEvRestoreMultipleIncrementalBackupsResponse {
optional uint64 TxId = 1;
optional NKikimrProto.TPathID PathId = 2;
optional uint64 TabletId = 3;

enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
SCHEME_ERROR = 2;
BAD_REQUEST = 3;
OVERLOADED = 4;
OPERATION_NOT_FOUND = 5;
ERROR = 6;
}

optional EStatus Status = 4;
repeated Ydb.Issue.IssueMessage Issues = 5;
optional uint64 ProcessedRows = 6;
optional uint64 ProcessedBytes = 7;
}
5 changes: 5 additions & 0 deletions ydb/core/tx/datashard/change_sender_incr_restore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,14 @@ class TIncrRestoreChangeSenderMain
, TargetTablePathId(targetPathId)
, TargetTableVersion(0)
{
LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: TIncrRestoreChangeSenderMain created for userTableId=" << userTableId
<< " targetPathId=" << targetPathId << " dataShard=" << dataShard.TabletId);
}

void Bootstrap() {
LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: TIncrRestoreChangeSenderMain::Bootstrap called");
ResolveUserTable();
}

Expand Down
155 changes: 155 additions & 0 deletions ydb/core/tx/datashard/create_datashard_streaming_unit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#include "create_datashard_streaming_unit.h"
#include "change_sender_incr_restore.h"
#include "execution_unit_ctors.h"

#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tablet_flat/flat_scan_spent.h>

#define STREAMING_LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[DataShardStreaming] [" << LogPrefix() << "] " << stream)
#define STREAMING_LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[DataShardStreaming] [" << LogPrefix() << "] " << stream)
#define STREAMING_LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[DataShardStreaming] [" << LogPrefix() << "] " << stream)
#define STREAMING_LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[DataShardStreaming] [" << LogPrefix() << "] " << stream)
#define STREAMING_LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "[DataShardStreaming] [" << LogPrefix() << "] " << stream)

namespace NKikimr {
namespace NDataShard {

using namespace NKikimrTxDataShard;

bool TCreateDataShardStreamingUnit::IsReadyToExecute(TOperation::TPtr op) const {
if (IsWaiting(op)) {
return false;
}

return !DataShard.IsAnyChannelYellowStop();
}

void TCreateDataShardStreamingUnit::Abort(TOperation::TPtr op, const TActorContext& ctx, const TString& error) {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_ENSURE(tx, "cannot cast operation of kind " << op->GetKind());

STREAMING_LOG_E(error);

BuildResult(op)->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, error);
ResetWaiting(op);

Cancel(tx, ctx);
}

THolder<NTable::IScan> TCreateDataShardStreamingUnit::CreateDataShardStreamingScan(
const ::NKikimrSchemeOp::TCreateDataShardStreaming& streaming,
ui64 txId)
{
TPathId sourcePathId = TPathId::FromProto(streaming.GetSourcePathId());
TPathId targetPathId = TPathId::FromProto(streaming.GetTargetPathId());
const ui64 tableId = streaming.GetSourcePathId().GetLocalId();

// Create a scan that will use the change exchange infrastructure
// to stream changes to the target DataShard
return CreateIncrementalRestoreScan(
DataShard.SelfId(),
[=, tabletID = DataShard.TabletID(), generation = DataShard.Generation(), tabletActor = DataShard.SelfId()]
(const TActorContext& ctx, TActorId parent) {
// Create a specialized change sender for DataShard-to-DataShard streaming
return ctx.Register(
CreateDataShardStreamingChangeSender(
parent,
NDataShard::TDataShardId{
.TabletId = tabletID,
.Generation = generation,
.ActorId = tabletActor,
},
sourcePathId,
targetPathId,
streaming.GetStreamingConfig()));
},
sourcePathId,
DataShard.GetUserTables().at(tableId),
targetPathId,
txId,
{} // Use default limits for now
);
}

EExecutionStatus TCreateDataShardStreamingUnit::Execute(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_ENSURE(tx, "cannot cast operation of kind " << op->GetKind());

Y_ENSURE(tx->GetSchemeTx().HasCreateDataShardStreaming());
const auto& streaming = tx->GetSchemeTx().GetCreateDataShardStreaming();

const ui64 tableId = streaming.GetSourcePathId().GetLocalId();
if (!DataShard.GetUserTables().contains(tableId)) {
Abort(op, ctx, TStringBuilder() << "Table not found: " << tableId);
return EExecutionStatus::Executed;
}

const ui32 localTableId = DataShard.GetUserTables().at(tableId)->LocalTid;
if (!txc.DB.GetScheme().GetTableInfo(localTableId)) {
Abort(op, ctx, TStringBuilder() << "Table schema not found: " << localTableId);
return EExecutionStatus::Executed;
}

if (DataShard.IsAnyChannelYellowStop()) {
SetWaiting(op);
return EExecutionStatus::Continue;
}

if (!op->IsWaitingForScan()) {
// Create and start the streaming scan
auto scan = CreateDataShardStreamingScan(streaming, tx->GetTxId());
if (!scan) {
Abort(op, ctx, "Failed to create DataShard streaming scan");
return EExecutionStatus::Executed;
}

DataShard.QueueScan(localTableId, std::move(scan), tx->GetTxId(), TRowVersion::Min());
SetWaiting(op);

STREAMING_LOG_I("Started DataShard streaming scan"
<< " from " << streaming.GetSourcePathId().ShortDebugString()
<< " to " << streaming.GetTargetPathId().ShortDebugString()
<< " txId: " << tx->GetTxId());
}

// Check if scan is completed
if (op->IsWaitingForScan()) {
return EExecutionStatus::Continue;
}

ResetWaiting(op);

STREAMING_LOG_I("DataShard streaming completed successfully"
<< " txId: " << tx->GetTxId());

return EExecutionStatus::Executed;
}

void TCreateDataShardStreamingUnit::Complete(TOperation::TPtr op, const TActorContext& ctx) {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_ENSURE(tx, "cannot cast operation of kind " << op->GetKind());

STREAMING_LOG_D("DataShard streaming unit completed"
<< " txId: " << tx->GetTxId());
}

// Factory function for creating the change sender specialized for DataShard streaming
IActor* CreateDataShardStreamingChangeSender(
const TActorId& changeServerActor,
const TDataShardId& dataShard,
const TPathId& sourcePathId,
const TPathId& targetPathId,
const TString& streamingConfig)
{
// For now, reuse the incremental restore change sender as the base
// This can be extended later with DataShard-specific streaming logic
return CreateIncrRestoreChangeSender(changeServerActor, dataShard, sourcePathId, targetPathId);
}

// Factory function for creating the execution unit
THolder<TExecutionUnit> CreateDataShardStreamingUnit(TDataShard& dataShard, TPipeline& pipeline) {
return MakeHolder<TCreateDataShardStreamingUnit>(dataShard, pipeline);
}

} // namespace NDataShard
} // namespace NKikimr
12 changes: 12 additions & 0 deletions ydb/core/tx/datashard/create_datashard_streaming_unit_factory.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include "create_datashard_streaming_unit.h"
#include "execution_unit_ctors.h"

namespace NKikimr {
namespace NDataShard {

THolder<TExecutionUnit> CreateDataShardStreamingUnit(TDataShard &dataShard, TPipeline &pipeline) {
return MakeHolder<TCreateDataShardStreamingUnit>(dataShard, pipeline);
}

} // namespace NDataShard
} // namespace NKikimr
20 changes: 19 additions & 1 deletion ydb/core/tx/datashard/create_incremental_restore_src_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ using namespace NExportScan;
class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {
protected:
bool IsRelevant(TActiveTransaction* tx) const {
return tx->GetSchemeTx().HasCreateIncrementalRestoreSrc();
bool hasIncrRestoreSrc = tx->GetSchemeTx().HasCreateIncrementalRestoreSrc();
LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: TCreateIncrementalRestoreSrcUnit::IsRelevant called for txId="
<< tx->GetTxId() << " hasCreateIncrementalRestoreSrc=" << hasIncrRestoreSrc);
return hasIncrRestoreSrc;
}

bool IsWaiting(TOperation::TPtr op) const {
Expand Down Expand Up @@ -65,6 +69,11 @@ class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {
TPathId dstTablePathId = TPathId::FromProto(incrBackup.GetDstPathId());
const ui64 tableId = incrBackup.GetSrcPathId().GetLocalId();

LOG_INFO_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: CreateScan called for txId=" << txId
<< " srcPathId=" << tablePathId << " dstPathId=" << dstTablePathId
<< " tableId=" << tableId);

return CreateIncrementalRestoreScan(
DataShard.SelfId(),
[=, tabletID = DataShard.TabletID(), generation = DataShard.Generation(), tabletActor = DataShard.SelfId()](const TActorContext& ctx, TActorId parent) {
Expand Down Expand Up @@ -93,6 +102,11 @@ class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {
Y_ENSURE(tx->GetSchemeTx().HasCreateIncrementalRestoreSrc());
const auto& restoreSrc = tx->GetSchemeTx().GetCreateIncrementalRestoreSrc();

LOG_INFO_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: TCreateIncrementalRestoreSrcUnit::Run called for txId=" << op->GetTxId()
<< " srcPathId=" << restoreSrc.GetSrcPathId().DebugString()
<< " dstPathId=" << restoreSrc.GetDstPathId().DebugString());

const ui64 tableId = restoreSrc.GetSrcPathId().GetLocalId();
Y_ENSURE(DataShard.GetUserTables().contains(tableId));

Expand All @@ -101,6 +115,8 @@ class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {

Y_ENSURE(restoreSrc.HasDstPathId());

LOG_INFO_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: Creating scan for tableId=" << tableId << " localTableId=" << localTableId);
THolder<NTable::IScan> scan{CreateScan(restoreSrc, op->GetTxId())};

auto* appData = AppData(ctx);
Expand All @@ -124,6 +140,8 @@ class TCreateIncrementalRestoreSrcUnit : public TExecutionUnit {
.SetReadPrio(TScanOptions::EReadPrio::Low)
));

LOG_INFO_S(*TlsActivationContext, NKikimrServices::DATASHARD_BACKUP,
"INCREMENTAL_DEBUG: Scan queued successfully for txId=" << op->GetTxId());
return true;
}

Expand Down
Loading