1
1
#pragma once
2
2
3
3
#include < ydb/library/actors/core/actorsystem.h>
4
+ #include < ydb/library/actors/core/log.h>
5
+ #include < ydb/core/util/backoff.h>
6
+ #include < ydb/core/wrappers/retry_policy.h>
7
+ #include < util/system/mutex.h>
4
8
5
9
#include < ydb/core/protos/s3_settings.pb.h>
6
10
#include < ydb/core/wrappers/events/abstract.h>
@@ -41,6 +45,25 @@ struct TEvExternalStorage {
41
45
42
46
namespace NExternalStorage {
43
47
48
+ class TThreadSafeBackoff {
49
+ private:
50
+ mutable TMutex Mutex;
51
+ NKikimr::TBackoff Policy;
52
+ public:
53
+ TThreadSafeBackoff (size_t maxRetries = 100 , TDuration initial = TDuration::Seconds(3 ), TDuration max = TDuration::Seconds(10 ))
54
+ : Policy(maxRetries, initial, max) {}
55
+ void Reset () {
56
+ with_lock (Mutex) {
57
+ Policy.Reset ();
58
+ }
59
+ }
60
+ TDuration Next () {
61
+ with_lock (Mutex) {
62
+ return Policy.Next ();
63
+ }
64
+ }
65
+ };
66
+
44
67
class IReplyAdapter {
45
68
private:
46
69
std::optional<NActors::TActorId> CustomRecipient;
@@ -69,10 +92,12 @@ class IReplyAdapter {
69
92
class TReplyAdapterContainer {
70
93
private:
71
94
IReplyAdapter::TPtr Adapter;
95
+ std::shared_ptr<TThreadSafeBackoff> Backoff;
72
96
public:
73
97
TReplyAdapterContainer () = default ;
74
- TReplyAdapterContainer (IReplyAdapter::TPtr adapter)
75
- : Adapter(adapter) {
98
+ TReplyAdapterContainer (IReplyAdapter::TPtr adapter, std::shared_ptr<TThreadSafeBackoff> backoff = {})
99
+ : Adapter(adapter)
100
+ , Backoff(std::move(backoff)) {
76
101
77
102
}
78
103
@@ -95,10 +120,31 @@ class TReplyAdapterContainer {
95
120
96
121
template <class TBaseEventObject >
97
122
void Reply (const NActors::TActorId& recipientId, std::unique_ptr<TBaseEventObject>&& ev) const {
123
+ bool doBackoff = false ;
124
+ TDuration delay = TDuration::Zero ();
125
+
126
+ if (ev->IsSuccess ()) {
127
+
128
+ Backoff->Reset ();
129
+ } else if (NWrappers::ShouldBackoff (ev->GetError ())) {
130
+ AFL_VERIFY (Backoff);
131
+ delay = Backoff->Next ();
132
+ doBackoff = delay > TDuration::Zero ();
133
+ }
134
+
135
+ std::unique_ptr<NActors::IEventBase> finalEvent;
98
136
if (Adapter) {
99
- TlsActivationContext->ActorSystem ()->Send (Adapter->GetRecipient (recipientId), Adapter->RebuildReplyEvent (std::move (ev)).release ());
137
+ finalEvent = Adapter->RebuildReplyEvent (std::move (ev));
138
+ } else {
139
+ finalEvent.reset (ev.release ());
140
+ }
141
+
142
+ const NActors::TActorId recipient = Adapter ? Adapter->GetRecipient (recipientId) : recipientId;
143
+ if (doBackoff) {
144
+ auto * handle = new NActors::IEventHandle (recipient, NActors::TActorId (), finalEvent.release ());
145
+ TlsActivationContext->ActorSystem ()->Schedule (delay, handle);
100
146
} else {
101
- TlsActivationContext->ActorSystem ()->Send (recipientId, ev .release ());
147
+ TlsActivationContext->ActorSystem ()->Send (recipient, finalEvent .release ());
102
148
}
103
149
104
150
}
@@ -108,14 +154,15 @@ class TReplyAdapterContainer {
108
154
class IExternalStorageOperator {
109
155
protected:
110
156
TReplyAdapterContainer ReplyAdapter;
157
+ std::shared_ptr<TThreadSafeBackoff> BackoffPolicy = std::make_shared<TThreadSafeBackoff>();
111
158
virtual TString DoDebugString () const {
112
159
return " " ;
113
160
}
114
161
public:
115
162
using TPtr = std::shared_ptr<IExternalStorageOperator>;
116
163
void InitReplyAdapter (IReplyAdapter::TPtr adapter) {
117
164
Y_ABORT_UNLESS (!ReplyAdapter);
118
- ReplyAdapter = TReplyAdapterContainer (adapter);
165
+ ReplyAdapter = TReplyAdapterContainer (adapter, BackoffPolicy );
119
166
}
120
167
121
168
0 commit comments