Skip to content

Commit aff2df7

Browse files
authored
issue-1751: [Filestore] WriteBackCache - refactor/rewrite main state (#5292)
1 parent c8f0ee3 commit aff2df7

20 files changed

+1290
-646
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#include "node_flush_state.h"
2+
3+
#include <cloud/storage/core/libs/common/error.h>
4+
5+
#include <util/generic/algorithm.h>
6+
7+
namespace NCloud::NFileStore::NFuse::NWriteBackCache {
8+
9+
////////////////////////////////////////////////////////////////////////////////
10+
11+
TNodeFlushState::TNodeFlushState(
12+
ui64 nodeId,
13+
TVector<std::shared_ptr<NProto::TWriteDataRequest>> requests,
14+
ui64 affectedUnflushedRequestCount)
15+
: NodeId(nodeId)
16+
, AffectedUnflushedRequestCount(affectedUnflushedRequestCount)
17+
{
18+
Y_ABORT_UNLESS(!requests.empty());
19+
Y_ABORT_UNLESS(affectedUnflushedRequestCount > 0);
20+
21+
for (auto& request: requests) {
22+
WriteDataRequests.emplace_back(std::move(request));
23+
}
24+
}
25+
26+
TVector<std::shared_ptr<NProto::TWriteDataRequest>>
27+
TNodeFlushState::BeginFlush()
28+
{
29+
Y_ABORT_UNLESS(InFlightWriteDataRequestCount.load() == 0);
30+
Y_ABORT_UNLESS(!WriteDataRequests.empty(), "Nothing to flush");
31+
32+
TVector<std::shared_ptr<NProto::TWriteDataRequest>> res(
33+
Reserve(WriteDataRequests.size()));
34+
35+
for (const auto& it: WriteDataRequests) {
36+
res.push_back(it.Request);
37+
}
38+
39+
InFlightWriteDataRequestCount.store(WriteDataRequests.size());
40+
41+
return res;
42+
}
43+
44+
EWriteDataRequestCompletedAction TNodeFlushState::OnWriteDataRequestCompleted(
45+
size_t index,
46+
const NProto::TWriteDataResponse& response)
47+
{
48+
Y_ABORT_UNLESS(index < WriteDataRequests.size());
49+
50+
WriteDataRequests[index].Error = response.GetError();
51+
52+
auto prev =
53+
InFlightWriteDataRequestCount.fetch_sub(1, std::memory_order_release);
54+
Y_ABORT_UNLESS(prev > 0);
55+
56+
if (prev > 1) {
57+
return EWriteDataRequestCompletedAction::ContinueExecution;
58+
}
59+
60+
// The thread that has made the final decrement should process the results
61+
// of other threads
62+
auto cur = InFlightWriteDataRequestCount.load(std::memory_order_acquire);
63+
Y_ABORT_UNLESS(cur == 0);
64+
65+
return EWriteDataRequestCompletedAction::CollectFlushResult;
66+
}
67+
68+
NCloud::NProto::TError TNodeFlushState::CollectFlushResult()
69+
{
70+
Y_ABORT_UNLESS(InFlightWriteDataRequestCount.load() == 0);
71+
72+
EraseIf(
73+
WriteDataRequests,
74+
[](const auto& it) { return !HasError(it.Error); });
75+
76+
return WriteDataRequests.empty() ? NCloud::NProto::TError()
77+
: WriteDataRequests.front().Error;
78+
}
79+
80+
} // namespace NCloud::NFileStore::NFuse::NWriteBackCache
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#pragma once
2+
3+
#include <cloud/filestore/public/api/protos/data.pb.h>
4+
5+
#include <util/generic/vector.h>
6+
7+
#include <atomic>
8+
9+
namespace NCloud::NFileStore::NFuse::NWriteBackCache {
10+
11+
////////////////////////////////////////////////////////////////////////////////
12+
13+
struct TWriteDataRequestWithResponse
14+
{
15+
std::shared_ptr<NProto::TWriteDataRequest> Request;
16+
NCloud::NProto::TError Error;
17+
18+
explicit TWriteDataRequestWithResponse(
19+
std::shared_ptr<NProto::TWriteDataRequest> request)
20+
: Request(std::move(request))
21+
{}
22+
};
23+
24+
////////////////////////////////////////////////////////////////////////////////
25+
26+
// The action that the caller should take after calling
27+
// TNodeFlushState::OnWriteDataRequestCompleted
28+
enum class EWriteDataRequestCompletedAction
29+
{
30+
// The caller should continue executing other requests
31+
ContinueExecution,
32+
33+
// The caller was the last executing thread and should collect flush result
34+
CollectFlushResult
35+
};
36+
37+
////////////////////////////////////////////////////////////////////////////////
38+
39+
class TNodeFlushState
40+
{
41+
private:
42+
const ui64 NodeId;
43+
44+
// WriteData requests generated during Flush that are executed in parallel
45+
TVector<TWriteDataRequestWithResponse> WriteDataRequests;
46+
47+
// The amount of unflushed requests that will become flushed when all
48+
// requests from |WriteDataRequests| are completed
49+
ui64 AffectedUnflushedRequestCount = 0;
50+
51+
// The number of remaining executing requests from |WriteDataRequests|
52+
std::atomic<ui64> InFlightWriteDataRequestCount = 0;
53+
54+
public:
55+
TNodeFlushState(
56+
ui64 nodeId,
57+
TVector<std::shared_ptr<NProto::TWriteDataRequest>> requests,
58+
ui64 affectedUnflushedRequestCount);
59+
60+
ui64 GetNodeId() const
61+
{
62+
return NodeId;
63+
}
64+
65+
ui64 GetAffectedUnflushedRequestCount() const
66+
{
67+
return AffectedUnflushedRequestCount;
68+
}
69+
70+
size_t GetWriteDataRequestCount() const
71+
{
72+
return WriteDataRequests.size();
73+
}
74+
75+
// The caller should execute the requests in parallel and call
76+
// OnWriteDataRequestCompleted for each response with the index
77+
// corresponding to the request in the returned vector
78+
TVector<std::shared_ptr<NProto::TWriteDataRequest>> BeginFlush();
79+
80+
// The method is thread-safe
81+
EWriteDataRequestCompletedAction OnWriteDataRequestCompleted(
82+
size_t index,
83+
const NProto::TWriteDataResponse& response);
84+
85+
// When error is returned, the caller should retry flush
86+
NCloud::NProto::TError CollectFlushResult();
87+
};
88+
89+
} // namespace NCloud::NFileStore::NFuse::NWriteBackCache

cloud/filestore/libs/vfs_fuse/write_back_cache/node_state.h

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,16 @@ namespace NCloud::NFileStore::NFuse::NWriteBackCache {
1313

1414
////////////////////////////////////////////////////////////////////////////////
1515

16-
struct TNodeFlushState
16+
enum class ENodeFlushStatus
1717
{
18-
TVector<std::shared_ptr<NProto::TWriteDataRequest>> WriteRequests;
19-
TVector<std::shared_ptr<NProto::TWriteDataRequest>> FailedWriteRequests;
20-
size_t AffectedWriteDataEntriesCount = 0;
21-
size_t InFlightWriteRequestsCount = 0;
22-
bool Executing = false;
18+
// Node has no unflushed requests
19+
NothingToFlush,
20+
21+
// Node has unflushed requests but flush has not been requested yet
22+
ReadyToFlush,
23+
24+
// Flush has been requested for a node
25+
FlushRequested
2326
};
2427

2528
////////////////////////////////////////////////////////////////////////////////
@@ -41,58 +44,53 @@ struct TFlushRequest
4144

4245
struct TNodeState
4346
{
44-
const ui64 NodeId;
45-
4647
// Holds pending, unflushed and flushed requests
4748
// Tracks cached data parts
4849
TNodeCache Cache;
4950

5051
// Prevent from concurrent read and write requests with overlapping ranges
5152
TReadWriteRangeLock RangeLock;
5253

53-
TNodeFlushState FlushState;
54+
ENodeFlushStatus FlushStatus = ENodeFlushStatus::NothingToFlush;
5455

5556
// Flush requests are fulfilled when there are no pending or unflushed
5657
// requests with SequenceId less or equal than |TFlushRequest::SequenceId|.
5758
// Flush requests are stored in chronological order: SequenceId values are
5859
// strictly increasing so newer flush requests have larger SequenceId.
5960
TDeque<TFlushRequest> FlushRequests;
6061

61-
// All entries with RequestId <= |AutomaticFlushRequestId| are to be flushed
62-
ui64 AutomaticFlushRequestId = 0;
63-
6462
// Cached data extends the node size but until the data is flushed,
6563
// the changes are not visible to the tablet. FileSystem requests that
6664
// return node attributes or rely on it (GetAttr, Lookup, Read, ReadDir)
6765
// should have the node size adjusted to this value.
6866
ui64 CachedNodeSize = 0;
6967

70-
explicit TNodeState(ui64 nodeId)
71-
: NodeId(nodeId)
72-
{}
73-
7468
bool CanBeDeleted() const
7569
{
7670
if (Cache.Empty() && RangeLock.Empty()) {
77-
Y_ABORT_UNLESS(!FlushState.Executing);
71+
Y_ABORT_UNLESS(FlushRequests.empty());
72+
Y_ABORT_UNLESS(FlushStatus == ENodeFlushStatus::NothingToFlush);
7873
return true;
7974
}
8075
return false;
8176
}
8277

83-
bool ShouldFlush(ui64 maxFlushAllRequestId) const
78+
ENodeFlushStatus GetExpectedFlushStatus(ui64 flushAllSequenceId) const
8479
{
80+
if (FlushStatus == ENodeFlushStatus::FlushRequested) {
81+
// Once Flush has been scheduled, the status can be changed only in
82+
// FlushSucceeded and FlushFailed calls
83+
return ENodeFlushStatus::FlushRequested;
84+
}
8585
if (!Cache.HasUnflushedRequests()) {
86-
return false;
86+
return ENodeFlushStatus::NothingToFlush;
8787
}
88-
89-
if (!FlushRequests.empty()) {
90-
return true;
88+
if (!FlushRequests.empty() ||
89+
Cache.GetMinUnflushedSequenceId() <= flushAllSequenceId)
90+
{
91+
return ENodeFlushStatus::FlushRequested;
9192
}
92-
93-
const ui64 minRequestId = Cache.GetMinUnflushedSequenceId();
94-
return minRequestId <= maxFlushAllRequestId ||
95-
minRequestId <= AutomaticFlushRequestId;
93+
return ENodeFlushStatus::ReadyToFlush;
9694
}
9795
};
9896

cloud/filestore/libs/vfs_fuse/write_back_cache/node_state_holder.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ TNodeStateHolder::TNodeStateHolder(IWriteBackCacheStatsPtr stats)
1010

1111
TNodeState& TNodeStateHolder::GetOrCreateNodeState(ui64 nodeId)
1212
{
13-
auto [it, inserted] = NodeStates.try_emplace(nodeId, TNodeState(nodeId));
13+
auto [it, inserted] = NodeStates.try_emplace(nodeId);
1414
if (inserted) {
1515
Stats->IncrementNodeCount();
1616
}

cloud/filestore/libs/vfs_fuse/write_back_cache/node_state_holder.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ class TNodeStateHolder
5353
// Return a pointer to the node state for nodeId or nullptr if it does not
5454
// exist.
5555
//
56-
// If includeDeleted is false (default), logically deleted states are
57-
// treated as absent and nullptr may be returned. If includeDeleted is
58-
// true, the function will return states that were deleted but are still
59-
// retained due to active pins.
60-
TNodeState* GetNodeState(ui64 nodeId, bool includeDeleted = false);
56+
// If includeDeleted is false, logically deleted states are treated as
57+
// absent and nullptr may be returned. If includeDeleted is true, the
58+
// function will return states that were deleted but are still retained due
59+
// to active pins.
60+
TNodeState* GetNodeState(ui64 nodeId, bool includeDeleted);
6161

6262
const TNodeState* GetNodeState(
6363
ui64 nodeId,
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#include "queued_operations.h"
2+
3+
#include <variant>
4+
5+
namespace NCloud::NFileStore::NFuse::NWriteBackCache {
6+
7+
namespace {
8+
9+
////////////////////////////////////////////////////////////////////////////////
10+
11+
struct TWriteDataPromiseCompletedEvent
12+
{
13+
NThreading::TPromise<NProto::TWriteDataResponse> Promise;
14+
15+
void Invoke()
16+
{
17+
Promise.SetValue({});
18+
}
19+
};
20+
21+
struct TFlushPromiseCompletedEvent
22+
{
23+
NThreading::TPromise<void> Promise;
24+
25+
void Invoke()
26+
{
27+
Promise.SetValue();
28+
}
29+
};
30+
31+
struct TScheduleFlushEvent
32+
{
33+
IQueuedOperationsProcessor& Processor;
34+
ui64 NodeId;
35+
36+
void Invoke()
37+
{
38+
Processor.ScheduleFlushNode(NodeId);
39+
}
40+
};
41+
42+
using TEventVariant = std::variant<
43+
TWriteDataPromiseCompletedEvent,
44+
TFlushPromiseCompletedEvent,
45+
TScheduleFlushEvent>;
46+
47+
} // namespace
48+
49+
////////////////////////////////////////////////////////////////////////////////
50+
51+
struct TQueuedOperations::TEvent: public TEventVariant
52+
{
53+
using TEventVariant::TEventVariant;
54+
};
55+
56+
////////////////////////////////////////////////////////////////////////////////
57+
58+
TQueuedOperations::TQueuedOperations(IQueuedOperationsProcessor& processor)
59+
: Processor(processor)
60+
{}
61+
62+
TQueuedOperations::~TQueuedOperations() = default;
63+
64+
void TQueuedOperations::Acquire()
65+
{
66+
Lock.Acquire();
67+
}
68+
69+
void TQueuedOperations::Release()
70+
{
71+
auto events = std::exchange(Events, {});
72+
Lock.Release();
73+
for (auto& event: events) {
74+
std::visit([](auto& ev) { ev.Invoke(); }, event);
75+
}
76+
}
77+
78+
void TQueuedOperations::ScheduleFlushNode(ui64 nodeId)
79+
{
80+
Events.push_back(TScheduleFlushEvent{Processor, nodeId});
81+
}
82+
83+
void TQueuedOperations::CompleteWriteDataPromise(
84+
NThreading::TPromise<NProto::TWriteDataResponse> promise)
85+
{
86+
Events.push_back(TWriteDataPromiseCompletedEvent{std::move(promise)});
87+
}
88+
89+
void TQueuedOperations::CompleteFlushPromise(NThreading::TPromise<void> promise)
90+
{
91+
Events.push_back(TFlushPromiseCompletedEvent{std::move(promise)});
92+
}
93+
94+
} // namespace NCloud::NFileStore::NFuse::NWriteBackCache

0 commit comments

Comments
 (0)