Skip to content

Commit 0d9d127

Browse files
committed
support io for fresh blocks writer
1 parent 2178f2d commit 0d9d127

17 files changed

+297
-8
lines changed

cloud/blockstore/libs/storage/api/fresh_blocks_writer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <cloud/blockstore/libs/kikimr/components.h>
66
#include <cloud/blockstore/libs/kikimr/events.h>
7+
#include <cloud/blockstore/libs/storage/model/channel_permissions.h>
78

89
namespace NCloud::NBlockStore::NStorage::NFreshBlocksWriter {
910

cloud/blockstore/libs/storage/fresh_blocks_writer/fresh_blocks_writer_actor.cpp

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include "fresh_blocks_writer_actor.h"
22

3+
#include "io_companion_client.h"
4+
35
#include <cloud/blockstore/libs/storage/core/config.h>
46
#include <cloud/blockstore/libs/storage/core/proto_helpers.h>
57
#include <cloud/blockstore/libs/storage/partition_common/events_private.h>
@@ -65,12 +67,17 @@ TFreshBlocksWriterActor::TFreshBlocksWriterActor(
6567
ui32 partitionIndex,
6668
ui32 siblingCount,
6769
NActors::TActorId partitionActorId,
70+
NActors::TActorId volumeActorId,
71+
TDiagnosticsConfigPtr diagnosticsConfig,
6872
ui64 partitionTabletId)
6973
: Config(std::move(config))
7074
, PartitionConfig(std::move(partitionConfig))
7175
, StorageAccessMode(storageAccessMode)
7276
, PartitionTabletID(partitionTabletId)
7377
, PartitionActorId(partitionActorId)
78+
, BlobCodec(NBlockCodecs::Codec(Config->GetBlobCompressionCodec()))
79+
, VolumeActorId(volumeActorId)
80+
, DiagnosticsConfig(std::move(diagnosticsConfig))
7481
, PoisonPillHelper(this)
7582
, LogTitle(
7683
GetCycleCount(),
@@ -102,6 +109,38 @@ void TFreshBlocksWriterActor::Bootstrap(const NActors::TActorContext& ctx)
102109
NCloud::Send(ctx, PartitionActorId, std::move(request));
103110
}
104111

112+
void TFreshBlocksWriterActor::ScheduleYellowStateUpdate(
113+
const NActors::TActorContext& ctx)
114+
{
115+
Y_UNUSED(ctx);
116+
Y_ABORT("Unimplemented");
117+
}
118+
119+
void TFreshBlocksWriterActor::UpdateYellowState(
120+
const NActors::TActorContext& ctx)
121+
{
122+
Y_UNUSED(ctx);
123+
Y_ABORT("Unimplemented");
124+
}
125+
126+
void TFreshBlocksWriterActor::ReassignChannelsIfNeeded(
127+
const NActors::TActorContext& ctx)
128+
{
129+
Y_UNUSED(ctx);
130+
Y_ABORT("Unimplemented");
131+
}
132+
133+
void TFreshBlocksWriterActor::UpdateChannelPermissions(
134+
const NActors::TActorContext& ctx,
135+
ui32 channel,
136+
EChannelPermissions permissions)
137+
{
138+
Y_UNUSED(ctx);
139+
140+
ChannelsState->UpdatePermissions(channel, permissions);
141+
//TODO(@vladstepanyuk): add partition notification.
142+
}
143+
105144
void TFreshBlocksWriterActor::HandlePartitionReady(
106145
const NPartition::TEvPartition::TEvWaitReadyResponse::TPtr& ev,
107146
const NActors::TActorContext& ctx)
@@ -144,12 +183,39 @@ void TFreshBlocksWriterActor::HandleFreshChannelsInfo(
144183
Config->GetReassignSystemChannelsImmediately(),
145184
Min(tabletChannelCount, configChannelCount));
146185

186+
for (ui32 channel = 0; channel < ChannelsState->GetChannelCount();
187+
++channel)
188+
{
189+
ChannelsState->UpdatePermissions(
190+
channel,
191+
msg->ChannelPermissions[channel]);
192+
}
193+
147194
CommitIdsState =
148195
std::make_unique<TCommitIdsState>(TabletGeneration, /*lastCommitId=*/0);
149196
FlushState = std::make_unique<TPartitionFlushState>();
150197
TrimFreshLogState =
151198
std::make_unique<TPartitionTrimFreshLogState>(*CommitIdsState);
152199

200+
IOCompanionClient = std::make_unique<TIOCompanionClient>(*this);
201+
202+
IOCompanion = std::make_unique<TIOCompanion>(
203+
Config,
204+
PartitionConfig,
205+
TabletStorageInfo,
206+
PartitionTabletID,
207+
BlobCodec,
208+
VolumeActorId,
209+
DiagnosticsConfig,
210+
StorageAccessMode,
211+
BSGroupOperationTimeTracker,
212+
BSGroupOperationId,
213+
*IOCompanionClient,
214+
*ChannelsState,
215+
LogTitle,
216+
msg->ResourceMetricsQueue,
217+
msg->GroupDowntimes,
218+
msg->PartCounters);
153219
Become(&TThis::StateWork);
154220

155221
StateLoaded = true;
@@ -191,6 +257,20 @@ void TFreshBlocksWriterActor::HandleWaitReady(
191257
NCloud::Reply(ctx, *ev, std::move(response));
192258
}
193259

260+
void TFreshBlocksWriterActor::HandleUpdateChannelPermissions(
261+
const TEvFreshBlocksWriter::TEvUpdateChannelPermissions::TPtr& ev,
262+
const NActors::TActorContext& ctx)
263+
{
264+
Y_UNUSED(ctx);
265+
266+
auto* msg = ev->Get();
267+
for (auto& channel: TabletStorageInfo->Channels) {
268+
ChannelsState->UpdatePermissions(
269+
channel.Channel,
270+
msg->Permissions[channel.Channel]);
271+
}
272+
}
273+
194274
bool TFreshBlocksWriterActor::HandleRequests(STFUNC_SIG)
195275
{
196276
switch (ev->GetTypeRewrite()) {
@@ -266,8 +346,14 @@ STFUNC(TFreshBlocksWriterActor::StateWork)
266346
HFunc(TEvents::TEvPoisonPill, PoisonPillHelper.HandlePoisonPill);
267347
HFunc(TEvents::TEvPoisonTaken, PoisonPillHelper.HandlePoisonTaken);
268348

349+
HFunc(
350+
TEvFreshBlocksWriter::TEvUpdateChannelPermissions,
351+
HandleUpdateChannelPermissions);
352+
269353
default:
270-
if (!HandleRequests(ev)) {
354+
if (!IOCompanion->HandleRequests(ev, this->ActorContext()) &&
355+
!HandleRequests(ev))
356+
{
271357
HandleUnexpectedEvent(
272358
ev,
273359
TBlockStoreComponents::PARTITION,

cloud/blockstore/libs/storage/fresh_blocks_writer/fresh_blocks_writer_actor.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <cloud/blockstore/libs/storage/model/log_title.h>
1010
#include <cloud/blockstore/libs/storage/partition/part_events_private.h>
1111
#include <cloud/blockstore/libs/storage/partition_common/events_private.h>
12+
#include <cloud/blockstore/libs/storage/partition_common/io_companion.h>
1213
#include <cloud/blockstore/libs/storage/partition_common/part_channels_state.h>
1314
#include <cloud/blockstore/libs/storage/partition_common/part_fresh_blocks_state.h>
1415

@@ -20,19 +21,25 @@ namespace NCloud::NBlockStore::NStorage::NFreshBlocksWriter {
2021

2122
////////////////////////////////////////////////////////////////////////////////
2223

24+
struct TIOCompanionClient;
25+
2326
class TFreshBlocksWriterActor final
2427
: public NActors::TActorBootstrapped<TFreshBlocksWriterActor>
2528
, public IMortalActor
2629
{
2730
using TBase = NActors::TActorBootstrapped<TFreshBlocksWriterActor>;
2831

32+
friend TIOCompanionClient;
33+
2934
private:
3035
const TStorageConfigPtr Config;
3136
const NProto::TPartitionConfig PartitionConfig;
3237
const EStorageAccessMode StorageAccessMode;
3338
const ui64 PartitionTabletID;
34-
3539
const NActors::TActorId PartitionActorId;
40+
const NBlockCodecs::ICodec* BlobCodec;
41+
const NActors::TActorId VolumeActorId;
42+
const TDiagnosticsConfigPtr DiagnosticsConfig;
3643

3744
NKikimr::TTabletStorageInfoPtr TabletStorageInfo;
3845

@@ -51,6 +58,12 @@ class TFreshBlocksWriterActor final
5158

5259
TLogTitle LogTitle;
5360

61+
ui64 BSGroupOperationId = 0;
62+
TBSGroupOperationTimeTracker BSGroupOperationTimeTracker;
63+
64+
std::unique_ptr<TIOCompanionClient> IOCompanionClient;
65+
std::unique_ptr<TIOCompanion> IOCompanion;
66+
5467
public:
5568
TFreshBlocksWriterActor(
5669
TStorageConfigPtr config,
@@ -59,6 +72,8 @@ class TFreshBlocksWriterActor final
5972
ui32 partitionIndex,
6073
ui32 siblingCount,
6174
NActors::TActorId partitionActorId,
75+
NActors::TActorId volumeActorId,
76+
TDiagnosticsConfigPtr diagnosticsConfig,
6277
ui64 partitionTabletId);
6378

6479
~TFreshBlocksWriterActor() override;
@@ -72,7 +87,16 @@ class TFreshBlocksWriterActor final
7287
NCloud::Send<NActors::TEvents::TEvPoisonPill>(ctx, ctx.SelfID);
7388
}
7489

75-
void FreshBlobsLoaded(const NActors::TActorContext& ctx);
90+
void ScheduleYellowStateUpdate(const NActors::TActorContext& ctx);
91+
92+
void UpdateYellowState(const NActors::TActorContext& ctx);
93+
94+
void ReassignChannelsIfNeeded(const NActors::TActorContext& ctx);
95+
96+
void UpdateChannelPermissions(
97+
const NActors::TActorContext& ctx,
98+
ui32 channel,
99+
EChannelPermissions permissions);
76100

77101
// IMortalActor overrides
78102

@@ -101,6 +125,10 @@ class TFreshBlocksWriterActor final
101125
ev,
102126
const NActors::TActorContext& ctx);
103127

128+
void HandleUpdateChannelPermissions(
129+
const TEvFreshBlocksWriter::TEvUpdateChannelPermissions::TPtr& ev,
130+
const NActors::TActorContext& ctx);
131+
104132
bool HandleRequests(STFUNC_SIG);
105133
bool RejectRequests(STFUNC_SIG);
106134

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "io_companion_client.h"
2+
3+
#include "fresh_blocks_writer_actor.h"
4+
5+
namespace NCloud::NBlockStore::NStorage::NFreshBlocksWriter {
6+
7+
////////////////////////////////////////////////////////////////////////////////
8+
9+
void TIOCompanionClient::ScheduleYellowStateUpdate(
10+
const NActors::TActorContext& ctx)
11+
{
12+
Owner.ScheduleYellowStateUpdate(ctx);
13+
}
14+
15+
void TIOCompanionClient::UpdateYellowState(
16+
const NActors::TActorContext& ctx)
17+
{
18+
Owner.UpdateYellowState(ctx);
19+
}
20+
21+
void TIOCompanionClient::ReassignChannelsIfNeeded(
22+
const NActors::TActorContext& ctx)
23+
{
24+
Owner.ReassignChannelsIfNeeded(ctx);
25+
}
26+
27+
void TIOCompanionClient::UpdateChannelPermissions(
28+
const NActors::TActorContext& ctx,
29+
ui32 channel,
30+
EChannelPermissions permissions)
31+
{
32+
Owner.UpdateChannelPermissions(ctx, channel, permissions);
33+
}
34+
35+
// IMortalActor implements
36+
37+
void TIOCompanionClient::Poison(const NActors::TActorContext& ctx)
38+
{
39+
Owner.Suicide(ctx);
40+
}
41+
42+
} // namespace NCloud::NBlockStore::NStorage::NFreshBlocksWriter
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#pragma once
2+
3+
#include <cloud/blockstore/libs/storage/partition_common/io_companion.h>
4+
5+
namespace NCloud::NBlockStore::NStorage::NFreshBlocksWriter {
6+
7+
class TFreshBlocksWriterActor;
8+
9+
struct TIOCompanionClient: public IIOCompanionClient
10+
{
11+
TFreshBlocksWriterActor& Owner;
12+
13+
explicit TIOCompanionClient(TFreshBlocksWriterActor& owner)
14+
: Owner(owner)
15+
{}
16+
17+
void ScheduleYellowStateUpdate(const NActors::TActorContext& ctx) override;
18+
19+
void UpdateYellowState(const NActors::TActorContext& ctx) override;
20+
21+
void ReassignChannelsIfNeeded(const NActors::TActorContext& ctx) override;
22+
23+
void UpdateChannelPermissions(
24+
const NActors::TActorContext& ctx,
25+
ui32 channel,
26+
EChannelPermissions permissions) override;
27+
28+
// IMortalActor implements
29+
30+
void Poison(const NActors::TActorContext& ctx) override;
31+
};
32+
33+
} // namespace NCloud::NBlockStore::NStorage::NFreshBlocksWriter

cloud/blockstore/libs/storage/fresh_blocks_writer/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ LIBRARY()
33
SRCS(
44
fresh_blocks_writer_actor_forward.cpp
55
fresh_blocks_writer_actor.cpp
6+
io_companion_client.cpp
67
)
78

89
PEERDIR(

cloud/blockstore/libs/storage/partition/model/group_downtimes.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ class TGroupDowntimes {
2727
void RegisterSuccess(TInstant now, ui32 groupId);
2828
};
2929

30+
using TGroupDowntimesPtr = std::shared_ptr<TGroupDowntimes>;
31+
3032
} // namespace NCloud::NBlockStore::NStorage::NPartition

cloud/blockstore/libs/storage/partition/model/part_counters_wrapper.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,6 @@ class TThreadSafePartCounters
4141
TPartitionDiskCountersPtr Swap(TPartitionDiskCountersPtr counters);
4242
};
4343

44+
using TThreadSafePartCountersPtr = std::shared_ptr<TThreadSafePartCounters>;
45+
4446
} // namespace NCloud::NBlockStore::NStorage::NPartition

cloud/blockstore/libs/storage/partition/model/resource_metrics_updates_queue.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,6 @@ class TResourceMetricsQueue
106106
}
107107
};
108108

109+
using TResourceMetricsQueuePtr = std::shared_ptr<TResourceMetricsQueue>;
110+
109111
} // namespace NCloud::NBlockStore::NStorage::NPartition

0 commit comments

Comments
 (0)