Skip to content

Commit bc6d961

Browse files
authored
Reuse common space checks for S3 upload rows (#26353)
1 parent b4d20ec commit bc6d961

File tree

8 files changed

+107
-51
lines changed

8 files changed

+107
-51
lines changed

ydb/core/tx/datashard/datashard.cpp

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4471,25 +4471,6 @@ void TDataShard::Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TA
44714471
Execute(new TTxStoreS3DownloadInfo(this, ev), ctx);
44724472
}
44734473

4474-
void TDataShard::Handle(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev, const TActorContext& ctx)
4475-
{
4476-
if (ShouldDelayOperation(ev)) {
4477-
return;
4478-
}
4479-
4480-
const float rejectProbabilty = Executor()->GetRejectProbability();
4481-
if (rejectProbabilty > 0) {
4482-
const float rnd = AppData(ctx)->RandomProvider->GenRandReal2();
4483-
if (rnd < rejectProbabilty) {
4484-
DelayedS3UploadRows.emplace_back().Reset(ev.Release());
4485-
IncCounter(COUNTER_BULK_UPSERT_OVERLOADED);
4486-
return;
4487-
}
4488-
}
4489-
4490-
Execute(new TTxS3UploadRows(this, ev), ctx);
4491-
}
4492-
44934474
void TDataShard::ScanComplete(NTable::EStatus,
44944475
TAutoPtr<IDestructable> prod,
44954476
ui64 cookie,

ydb/core/tx/datashard/datashard.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,15 @@ namespace TEvDataShard {
920920
Record.SetTabletID(tabletId);
921921
Record.SetStatus(status);
922922
}
923+
924+
bool IsRetriableError() const {
925+
Y_ASSERT(Record.GetStatus() != NKikimrTxDataShard::TError::OK);
926+
return
927+
Record.GetStatus() == NKikimrTxDataShard::TError::WRONG_SHARD_STATE // = Ydb::StatusIds::OVERLOADED
928+
|| Record.GetStatus() == NKikimrTxDataShard::TError::SHARD_IS_BLOCKED // = Ydb::StatusIds::OVERLOADED
929+
|| Record.GetStatus() == NKikimrTxDataShard::TError::SCHEME_CHANGED // Ydb::StatusIds::GENERIC_ERROR
930+
;
931+
}
923932
};
924933

925934
struct TEvOverloadReady
@@ -1434,6 +1443,13 @@ namespace TEvDataShard {
14341443
Record.SetStatus(status);
14351444
}
14361445

1446+
TEvS3UploadRowsResponse() {}
1447+
1448+
bool IsRetriableError() const {
1449+
return TEvUploadRowsResponse(Record.GetTabletID(), Record.GetStatus())
1450+
.IsRetriableError();
1451+
}
1452+
14371453
TString ToString() const override {
14381454
return TStringBuilder() << ToStringHeader() << " {"
14391455
<< " Record: " << Record.ShortDebugString()

ydb/core/tx/datashard/datashard__op_rows.cpp

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "datashard_impl.h"
22
#include "datashard_direct_transaction.h"
3+
#include "datashard_txs.h"
34

45
namespace NKikimr {
56
namespace NDataShard {
@@ -112,48 +113,90 @@ class TDataShard::TTxEraseRows : public TTxDirectBase<TEvDataShard::TEvEraseRows
112113
TTxType GetTxType() const override { return TXTYPE_ERASE_ROWS; }
113114
};
114115

115-
static void OutOfSpace(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
116-
response.SetStatus(NKikimrTxDataShard::TError::OUT_OF_SPACE);
116+
static void OutOfSpace(TEvDataShard::TEvUploadRowsResponse& response) {
117+
response.Record.SetStatus(NKikimrTxDataShard::TError::OUT_OF_SPACE);
117118
}
118119

119-
static void DiskSpaceExhausted(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
120-
response.SetStatus(NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED);
120+
static void DiskSpaceExhausted(TEvDataShard::TEvUploadRowsResponse& response) {
121+
response.Record.SetStatus(NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED);
121122
}
122123

123-
static void WrongShardState(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
124-
response.SetStatus(NKikimrTxDataShard::TError::WRONG_SHARD_STATE);
124+
static void WrongShardState(TEvDataShard::TEvUploadRowsResponse& response) {
125+
response.Record.SetStatus(NKikimrTxDataShard::TError::WRONG_SHARD_STATE);
125126
}
126127

127-
static void Replicated(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
128-
response.SetStatus(NKikimrTxDataShard::TError::READONLY);
128+
static void Replicated(TEvDataShard::TEvUploadRowsResponse& response) {
129+
response.Record.SetStatus(NKikimrTxDataShard::TError::READONLY);
129130
}
130131

131-
static void Overloaded(NKikimrTxDataShard::TEvUploadRowsResponse& response) {
132-
response.SetStatus(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED);
132+
static void Overloaded(TEvDataShard::TEvUploadRowsResponse& response) {
133+
response.Record.SetStatus(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED);
133134
}
134135

135136
ECumulativeCounters OverloadedCounter(TEvDataShard::TEvUploadRowsRequest::TPtr&) {
136137
return COUNTER_BULK_UPSERT_OVERLOADED;
137138
}
138139

139-
static void WrongShardState(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
140-
response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE);
140+
static bool TryDelayS3UploadRows(TDataShard*, TEvDataShard::TEvUploadRowsRequest::TPtr&, ERejectReasons) {
141+
// not a S3 upload rows
142+
return false;
143+
}
144+
145+
static void OutOfSpace(TEvDataShard::TEvS3UploadRowsResponse& response) {
146+
response.Record.SetStatus(NKikimrTxDataShard::TError::OUT_OF_SPACE);
147+
}
148+
149+
static void DiskSpaceExhausted(TEvDataShard::TEvS3UploadRowsResponse& response) {
150+
response.Record.SetStatus(NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED);
151+
}
152+
153+
static void WrongShardState(TEvDataShard::TEvS3UploadRowsResponse& response) {
154+
response.Record.SetStatus(NKikimrTxDataShard::TError::WRONG_SHARD_STATE);
155+
}
156+
157+
static void Replicated(TEvDataShard::TEvS3UploadRowsResponse& response) {
158+
response.Record.SetStatus(NKikimrTxDataShard::TError::READONLY);
141159
}
142160

143-
static void Replicated(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
144-
response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::EXEC_ERROR);
161+
static void Overloaded(TEvDataShard::TEvS3UploadRowsResponse& response) {
162+
response.Record.SetStatus(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED);
145163
}
146164

147-
static void Overloaded(NKikimrTxDataShard::TEvEraseRowsResponse& response) {
148-
response.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::SHARD_OVERLOADED);
165+
ECumulativeCounters OverloadedCounter(TEvDataShard::TEvS3UploadRowsRequest::TPtr&) {
166+
return COUNTER_BULK_UPSERT_OVERLOADED;
167+
}
168+
169+
static bool TryDelayS3UploadRows(TDataShard* self, TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev, ERejectReasons rejectReasons) {
170+
if (rejectReasons == ERejectReasons::OverloadByProbability) {
171+
self->DelayS3UploadRows(ev);
172+
return true;
173+
}
174+
return false;
175+
}
176+
177+
static void WrongShardState(TEvDataShard::TEvEraseRowsResponse& response) {
178+
response.Record.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::WRONG_SHARD_STATE);
179+
}
180+
181+
static void Replicated(TEvDataShard::TEvEraseRowsResponse& response) {
182+
response.Record.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::EXEC_ERROR);
183+
}
184+
185+
static void Overloaded(TEvDataShard::TEvEraseRowsResponse& response) {
186+
response.Record.SetStatus(NKikimrTxDataShard::TEvEraseRowsResponse::SHARD_OVERLOADED);
149187
}
150188

151189
ECumulativeCounters OverloadedCounter(TEvDataShard::TEvEraseRowsRequest::TPtr&) {
152190
return COUNTER_ERASE_ROWS_OVERLOADED;
153191
}
154192

193+
static bool TryDelayS3UploadRows(TDataShard*, TEvDataShard::TEvEraseRowsRequest::TPtr&, ERejectReasons) {
194+
// not a S3 upload rows
195+
return false;
196+
}
197+
155198
template <typename TEvResponse>
156-
using TSetStatusFunc = void(*)(typename TEvResponse::ProtoRecordType&);
199+
using TSetStatusFunc = void(*)(TEvResponse&);
157200

158201
template <typename TEvResponse, typename TEvRequest>
159202
static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
@@ -166,7 +209,7 @@ static void Reject(TDataShard* self, TEvRequest& ev, const TString& txDesc,
166209
<< ", error# " << rejectDescription);
167210

168211
auto response = MakeHolder<TEvResponse>();
169-
setStatusFunc(response->Record);
212+
setStatusFunc(*response);
170213
response->Record.SetTabletID(self->TabletID());
171214
response->Record.SetErrorDescription(rejectDescription);
172215

@@ -191,6 +234,13 @@ static bool MaybeReject(TDataShard* self, TEvRequest& ev, const TActorContext& c
191234
NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
192235
if (self->CheckDataTxReject(txDesc, ctx, rejectStatus, rejectReasons, rejectDescription)) {
193236
self->IncCounter(OverloadedCounter(ev));
237+
238+
// FIXME: it seems that s3 upload should work via TUploadRowsInternal
239+
// this will deduplicate code and support overload subscribed
240+
if (TryDelayS3UploadRows(self, ev, rejectReasons)) {
241+
return true;
242+
}
243+
194244
Reject<TEvResponse, TEvRequest>(self, ev, txDesc, rejectReasons, rejectDescription, &WrongShardState, ctx, logThrottlerType);
195245
return true;
196246
}
@@ -232,6 +282,16 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
232282
}
233283
}
234284

285+
void TDataShard::Handle(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev, const TActorContext& ctx) {
286+
if (ShouldDelayOperation(ev)) {
287+
return;
288+
}
289+
290+
if (!MaybeReject<TEvDataShard::TEvS3UploadRowsResponse, true>(this, ev, ctx, "s3 bulk upsert", TDataShard::ELogThrottlerType::S3UploadRows_Reject)) {
291+
Executor()->Execute(new TTxS3UploadRows(this, ev), ctx);
292+
}
293+
}
294+
235295
void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActorContext& ctx) {
236296
if (ShouldDelayOperation(ev)) {
237297
return;

ydb/core/tx/datashard/datashard_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1838,6 +1838,7 @@ class TDataShard
18381838
void ScanComplete(NTable::EStatus status, TAutoPtr<IDestructable> prod, ui64 cookie, const TActorContext &ctx) override;
18391839
bool ReassignChannelsEnabled() const override;
18401840
void OnYellowChannelsChanged() override;
1841+
void DelayS3UploadRows(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev);
18411842
void OnRejectProbabilityRelaxed() override;
18421843
void OnFollowersCountChanged() override;
18431844
ui64 GetMemoryUsage() const override;
@@ -2189,6 +2190,7 @@ class TDataShard
21892190
FinishProposeUnit_UpdateCounters,
21902191
UploadRows_Reject,
21912192
EraseRows_Reject,
2193+
S3UploadRows_Reject,
21922194

21932195
LAST
21942196
};

ydb/core/tx/datashard/datashard_overload.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ void TDataShard::OnYellowChannelsChanged() {
88
}
99
}
1010

11+
void TDataShard::DelayS3UploadRows(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev) {
12+
DelayedS3UploadRows.emplace_back().Reset(ev.Release());
13+
}
14+
1115
void TDataShard::OnRejectProbabilityRelaxed() {
1216
NotifyOverloadSubscribers(ERejectReason::OverloadByProbability);
1317
for (auto& ev : DelayedS3UploadRows) {

ydb/core/tx/datashard/import_s3.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -814,16 +814,13 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
814814
IMPORT_LOG_D("Handle " << ev->Get()->ToString());
815815

816816
const auto& record = ev->Get()->Record;
817-
switch (record.GetStatus()) {
818-
case NKikimrTxDataShard::TError::OK:
819-
return ProcessDownloadInfo(ev->Get()->Info, TStringBuf("UploadResponse"));
820-
821-
case NKikimrTxDataShard::TError::SCHEME_ERROR:
822-
case NKikimrTxDataShard::TError::BAD_ARGUMENT:
823-
return Finish(false, record.GetErrorDescription());
824817

825-
default:
818+
if (record.GetStatus() == NKikimrTxDataShard::TError::OK) {
819+
return ProcessDownloadInfo(ev->Get()->Info, TStringBuf("UploadResponse"));
820+
} else if (ev->Get()->IsRetriableError()) {
826821
return RetryOrFinish(record.GetErrorDescription());
822+
} else {
823+
return Finish(false, record.GetErrorDescription());
827824
}
828825
}
829826

ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2375,7 +2375,7 @@ value {
23752375
env.TestWaitNotification(runtime, txId);
23762376

23772377
const ui32 expected = data.Data.size() / batchSize + ui32(bool(data.Data.size() % batchSize));
2378-
UNIT_ASSERT(requests > expected);
2378+
UNIT_ASSERT_C(requests > expected, TStringBuilder() << "Expected to get more than " << expected << " requests, but got only " << requests);
23792379
UNIT_ASSERT_VALUES_EQUAL(responses, expected);
23802380

23812381
auto content = ReadTable(runtime, TTestTxConfig::FakeHiveTablets, "Table", {"key"}, {"key", "value"});

ydb/core/tx/tx_proxy/upload_rows_common_impl.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,11 +1262,7 @@ class TUploadRowsBase : public TActorBootstrapped<TUploadRowsBase<DerivedActivit
12621262
}
12631263
}
12641264

1265-
bool isRetryableError = shardResponse.GetStatus() == NKikimrTxDataShard::TError::WRONG_SHARD_STATE ||
1266-
shardResponse.GetStatus() == NKikimrTxDataShard::TError::SHARD_IS_BLOCKED ||
1267-
shardResponse.GetStatus() == NKikimrTxDataShard::TError::SCHEME_CHANGED;
1268-
1269-
if (!isRetryableError || !Backoff.HasMore()) {
1265+
if (!ev->Get()->IsRetriableError() || !Backoff.HasMore()) {
12701266
return ReplyWithError(
12711267
TUploadStatus(static_cast<NKikimrTxDataShard::TError::EKind>(shardResponse.GetStatus()),
12721268
shardResponse.GetErrorDescription()), ctx);

0 commit comments

Comments
 (0)