Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void TDiskAgentBaseRequestActor::Done(

for (const auto& dr: DeviceRequests) {
completion.Body->RequestResults.push_back(
{.DeviceIndex = dr.DeviceIdx, .Error = {}});
{.DeviceIdx = dr.DeviceIdx, .Error = {}});
}

NCloud::Send(ctx, Part, std::move(completion.Event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,10 @@ struct TEvNonreplPartitionPrivate

struct TDeviceRequestResult
{
// Index of device that participated in the request and the
// result of the request for that device.
ui32 DeviceIndex = 0;
// Index of the device in the partition config.
ui32 DeviceIdx = 0;

// Result of the request for that device.
NProto::TError Error;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,13 @@ TNonreplicatedPartitionRdmaActor::SendReadRequests(
const auto& r = deviceRequests[i];
auto ep = AgentId2Endpoint[r.Device.GetAgentId()];
Y_ABORT_UNLESS(ep);
auto dr = std::make_unique<TDeviceReadRequestContext>();
auto dr = std::make_unique<TDeviceReadRequestContext>(
r.DeviceIdx,
startBlockIndexOffset,
r.DeviceBlockRange.Size(),
i);

ui64 sz = r.DeviceBlockRange.Size() * PartConfig->GetBlockSize();
dr->StartIndexOffset = startBlockIndexOffset;
dr->BlockCount = r.DeviceBlockRange.Size();
dr->DeviceIdx = r.DeviceIdx;
dr->RequestIndex = i;
startBlockIndexOffset += r.DeviceBlockRange.Size();

sentRequestCtx.emplace_back(r.DeviceIdx);
Expand Down Expand Up @@ -368,7 +368,8 @@ TNonreplicatedPartitionRdmaActor::SendReadRequests(
flags,
deviceRequest);

requests.push_back({std::move(ep), std::move(req)});
requests.push_back(
{.Endpoint = std::move(ep), .ClientRequest = std::move(req)});
}

for (size_t i = 0; i < requests.size(); ++i) {
Expand Down Expand Up @@ -726,24 +727,23 @@ void TNonreplicatedPartitionRdmaActor::HandleAgentIsUnavailable(
const auto& requestCtx = requestInfo.Value;
bool needToCancel = AnyOf(
requestCtx,
[&](const auto& ctx)
[&](const TRunningRdmaRequestInfo& item)
{
return laggingRows.contains(ctx.DeviceIndex) &&
return laggingRows.contains(item.DeviceIdx) &&
(requestInfo.IsWrite ||
devices[ctx.DeviceIndex].GetAgentId() ==
laggingAgentId);
devices[item.DeviceIdx].GetAgentId() == laggingAgentId);
});

if (!needToCancel) {
continue;
}

for (auto [deviceIdx, rdmaRequestId]: requestCtx) {
Y_ABORT_UNLESS(deviceIdx < static_cast<ui64>(devices.size()));
auto agentId = devices[deviceIdx].GetAgentId();
for (const TRunningRdmaRequestInfo& item: requestCtx) {
Y_ABORT_UNLESS(item.DeviceIdx < static_cast<ui64>(devices.size()));
const auto& agentId = devices[item.DeviceIdx].GetAgentId();

auto& endpoint = AgentId2Endpoint[agentId];
endpoint->CancelRequest(rdmaRequestId);
endpoint->CancelRequest(item.SentRequestId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,28 @@ namespace NCloud::NBlockStore::NStorage {

struct TDeviceReadRequestContext: public TDeviceRequestRdmaContext
{
ui64 StartIndexOffset = 0;
ui64 BlockCount = 0;
ui32 RequestIndex = 0;
const ui64 StartIndexOffset = 0;
const ui64 BlockCount = 0;
const ui32 RequestIndex = 0;

TDeviceReadRequestContext(
ui32 deviceIdx,
ui64 startIndexOffset,
ui64 blockCount,
ui32 requestIndex)
: TDeviceRequestRdmaContext(deviceIdx)
, StartIndexOffset(startIndexOffset)
, BlockCount(blockCount)
, RequestIndex(requestIndex)
{}
};

////////////////////////////////////////////////////////////////////////////////

class TNonreplicatedPartitionRdmaActor final
: public NActors::TActorBootstrapped<TNonreplicatedPartitionRdmaActor>
{
struct TDeviceRequestContext
{
ui32 DeviceIndex = 0;
ui64 SentRequestId = 0;
};

using TRequestContext = TStackVec<TDeviceRequestContext, 2>;
using TRequestContext = TStackVec<TRunningRdmaRequestInfo, 2>;

private:
const TStorageConfigPtr Config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@ namespace {

struct TDeviceChecksumRequestContext: public TDeviceRequestRdmaContext
{
ui64 RangeStartIndex = 0;
ui32 RangeSize = 0;
const ui64 RangeStartIndex = 0;
const ui32 RangeSize = 0;

TDeviceChecksumRequestContext(
ui32 deviceIdx,
ui64 rangeStartIndex,
ui32 rangeSize)
: TDeviceRequestRdmaContext(deviceIdx)
, RangeStartIndex(rangeStartIndex)
, RangeSize(rangeSize)
{}
};

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -160,10 +169,10 @@ void TNonreplicatedPartitionRdmaActor::HandleChecksumBlocks(
for (auto& r: deviceRequests) {
auto ep = AgentId2Endpoint[r.Device.GetAgentId()];
Y_ABORT_UNLESS(ep);
auto dc = std::make_unique<TDeviceChecksumRequestContext>();
dc->RangeStartIndex = r.BlockRange.Start;
dc->RangeSize = r.DeviceBlockRange.Size() * PartConfig->GetBlockSize();
dc->DeviceIdx = r.DeviceIdx;
auto dc = std::make_unique<TDeviceChecksumRequestContext>(
r.DeviceIdx,
r.BlockRange.Start,
r.DeviceBlockRange.Size() * PartConfig->GetBlockSize());

sentRequestCtx.emplace_back(r.DeviceIdx);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,11 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocks(
PartConfig,
AssignIdToWriteAndZeroRequestsEnabled);

auto context = std::make_unique<TDeviceRequestRdmaContext>();
context->DeviceIdx = deviceRequest.DeviceIdx;
auto context = std::make_unique<TDeviceRequestRdmaContext>(
deviceRequest.DeviceIdx);

sentRequestCtx.emplace_back(deviceRequest.DeviceIdx);
sentRequestCtx.emplace_back(
TRunningRdmaRequestInfo{.DeviceIdx = deviceRequest.DeviceIdx});

auto [req, err] = ep->AllocateRequest(
requestResponseHandler,
Expand All @@ -271,12 +272,11 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocks(
ctx,
deviceRequest.Device.GetDeviceUUID());

using TResponse = TEvService::TEvWriteBlocksResponse;
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TResponse>(std::move(err)));

std::make_unique<TEvService::TEvWriteBlocksResponse>(
std::move(err)));
return;
}

Expand All @@ -295,7 +295,8 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocks(
request,
sglist);

requests.push_back({std::move(ep), std::move(req)});
requests.push_back(
{.Endpoint = std::move(ep), .ClientRequest = std::move(req)});
}

for (size_t i = 0; i < requests.size(); ++i) {
Expand Down Expand Up @@ -419,8 +420,8 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocksLocal(
PartConfig,
AssignIdToWriteAndZeroRequestsEnabled);

auto context = std::make_unique<TDeviceRequestRdmaContext>();
context->DeviceIdx = deviceRequest.DeviceIdx;
auto context = std::make_unique<TDeviceRequestRdmaContext>(
deviceRequest.DeviceIdx);

auto [req, err] = ep->AllocateRequest(
requestResponseHandler,
Expand All @@ -443,11 +444,11 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocksLocal(
ctx,
deviceRequest.Device.GetDeviceUUID());

using TResponse = TEvService::TEvWriteBlocksLocalResponse;
NCloud::Reply(
ctx,
*requestInfo,
std::make_unique<TResponse>(std::move(err)));
std::make_unique<TEvService::TEvWriteBlocksLocalResponse>(
std::move(err)));

return;
}
Expand All @@ -468,7 +469,8 @@ void TNonreplicatedPartitionRdmaActor::HandleWriteBlocksLocal(

blocks += deviceRequest.DeviceBlockRange.Size();

requests.push_back({std::move(ep), std::move(req)});
requests.push_back(
{.Endpoint = std::move(ep), .ClientRequest = std::move(req)});
}

for (size_t i = 0; i < requests.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ void TNonreplicatedPartitionRdmaActor::HandleMultiAgentWrite(
const auto sentRequestId =
ep->SendRequest(std::move(req), requestInfo->CallContext);

RequestsInProgress.AddWriteRequest(
requestId,
TRequestContext{TDeviceRequestContext{
.DeviceIndex = deviceRequest.DeviceIdx,
.SentRequestId = sentRequestId}});
TRequestContext sentRequestCtx{
{.DeviceIdx = deviceRequest.DeviceIdx,
.SentRequestId = sentRequestId}};

RequestsInProgress.AddWriteRequest(requestId, sentRequestCtx);
}

} // namespace NCloud::NBlockStore::NStorage
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ void TNonreplicatedPartitionRdmaActor::HandleZeroBlocks(
msg->Record.GetHeaders().GetVolumeRequestId());
}

auto context = std::make_unique<TDeviceRequestRdmaContext>();
context->DeviceIdx = r.DeviceIdx;
auto context = std::make_unique<TDeviceRequestRdmaContext>(r.DeviceIdx);

auto [req, err] = ep->AllocateRequest(
requestContext,
Expand Down Expand Up @@ -160,7 +159,8 @@ void TNonreplicatedPartitionRdmaActor::HandleZeroBlocks(
0, // flags
deviceRequest);

requests.push_back({std::move(ep), std::move(req)});
requests.push_back(
{.Endpoint = std::move(ep), .ClientRequest = std::move(req)});
}

for (size_t i = 0; i < requests.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ namespace NCloud::NBlockStore::NStorage {

struct TDeviceRequestRdmaContext: public NRdma::TNullContext
{
ui32 DeviceIdx = 0;
// Index of the device in the partition config.
const ui32 DeviceIdx;

TDeviceRequestRdmaContext() = default;
explicit TDeviceRequestRdmaContext(ui32 deviceIdx)
: DeviceIdx(deviceIdx)
{}
};

struct TRunningRdmaRequestInfo
{
// Index of the device in the partition config.
ui32 DeviceIdx = 0;
ui64 SentRequestId = 0;
};

////////////////////////////////////////////////////////////////////////////////

template <typename TDerived>
Expand Down
Loading