Skip to content

Commit b170084

Browse files
committed
api: implement better rate limiting backoff
1) Remove shutdown handling in the API. It makes the code convoluted and doesn't do anything useful. Instead just move the shutdown wait to the top level future. 2) Add retry after handling in the API. 3) Move gRPC Code to the codec crate so it can be used in client code. 4) Some cleanups in order to remove a bunch of clones during config processing. Fixes BIT-5835 Signed-off-by: Matt Klein <mklein@bitdrift.io>
1 parent 9db7b41 commit b170084

File tree

30 files changed

+590
-432
lines changed

30 files changed

+590
-432
lines changed

api

bd-api/src/api.rs

Lines changed: 120 additions & 104 deletions
Large diffs are not rendered by default.

bd-api/src/api_test.rs

Lines changed: 106 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ use crate::upload::Tracked;
1212
use anyhow::anyhow;
1313
use assert_matches::assert_matches;
1414
use bd_client_common::{
15+
ClientConfigurationUpdate,
1516
ConfigurationUpdate,
1617
HANDSHAKE_FLAG_CONFIG_UP_TO_DATE,
1718
HANDSHAKE_FLAG_RUNTIME_UP_TO_DATE,
18-
MockConfigurationUpdate,
19+
MockClientConfigurationUpdate,
1920
};
2021
use bd_client_stats_store::Collector;
2122
use bd_client_stats_store::test::StatsHelper;
23+
use bd_grpc_codec::code::Code;
2224
use bd_grpc_codec::{Decompression, Encoder, OptimizeFor};
2325
use bd_internal_logging::{LogFields, LogLevel, LogType};
2426
use bd_metadata::{Metadata, Platform};
@@ -32,11 +34,11 @@ use bd_proto::protos::client::api::{
3234
ErrorShutdown,
3335
HandshakeRequest,
3436
HandshakeResponse,
37+
RateLimited,
3538
RuntimeUpdate,
3639
StatsUploadRequest,
3740
};
3841
use bd_runtime::runtime::{ConfigLoader, FeatureFlag};
39-
use bd_shutdown::ComponentShutdownTrigger;
4042
use bd_stats_common::labels;
4143
use bd_test_helpers::make_mut;
4244
use bd_time::{OffsetDateTimeExt, TimeDurationExt, TimeProvider, ToProtoDuration};
@@ -168,7 +170,6 @@ struct Setup {
168170
data_tx: Sender<DataUpload>,
169171
send_data_rx: Receiver<Vec<u8>>,
170172
start_stream_rx: Receiver<()>,
171-
shutdown_trigger: ComponentShutdownTrigger,
172173
collector: Collector,
173174
requests_decoder: bd_grpc_codec::Decoder<ApiRequest>,
174175
time_provider: Arc<bd_time::TestTimeProvider>,
@@ -184,7 +185,7 @@ impl Setup {
184185
Self::new_with_config_updater(Self::make_nice_mock_updater())
185186
}
186187

187-
fn new_with_config_updater(updater: Arc<dyn ConfigurationUpdate>) -> Self {
188+
fn new_with_config_updater(updater: Arc<dyn ClientConfigurationUpdate>) -> Self {
188189
let sdk_directory = tempfile::TempDir::with_prefix("sdk").unwrap();
189190

190191
let (start_stream_tx, start_stream_rx) = channel(1);
@@ -195,7 +196,6 @@ impl Setup {
195196
send_data_tx,
196197
current_stream_tx.clone(),
197198
));
198-
let shutdown_trigger = ComponentShutdownTrigger::default();
199199
let (data_tx, data_rx) = channel(1);
200200
let (trigger_upload_tx, _trigger_upload_rx) = channel(1);
201201
let (sleep_mode_active_tx, sleep_mode_active_rx) = watch::channel(false);
@@ -211,7 +211,6 @@ impl Setup {
211211
sdk_directory.path().to_path_buf(),
212212
api_key.clone(),
213213
manager,
214-
shutdown_trigger.make_shutdown(),
215214
data_rx,
216215
trigger_upload_tx,
217216
Arc::new(EmptyMetadata),
@@ -235,7 +234,6 @@ impl Setup {
235234
start_stream_rx,
236235
data_tx,
237236
send_data_rx,
238-
shutdown_trigger,
239237
collector,
240238
time_provider,
241239
requests_decoder: bd_grpc_codec::Decoder::new(
@@ -250,10 +248,6 @@ impl Setup {
250248
}
251249

252250
async fn restart(&mut self) {
253-
let old_shutdown = std::mem::take(&mut self.shutdown_trigger);
254-
old_shutdown.shutdown().await;
255-
self.api_task.take().unwrap().await.unwrap().unwrap();
256-
257251
let (start_stream_tx, start_stream_rx) = channel(1);
258252
let (send_data_tx, send_data_rx) = channel(1);
259253
let current_stream_tx = Arc::new(Mutex::new(None));
@@ -272,7 +266,6 @@ impl Setup {
272266
self.sdk_directory.path().to_path_buf(),
273267
self.api_key.clone(),
274268
manager,
275-
self.shutdown_trigger.make_shutdown(),
276269
data_rx,
277270
trigger_upload_tx,
278271
Arc::new(EmptyMetadata),
@@ -292,21 +285,54 @@ impl Setup {
292285
self.current_stream_tx = current_stream_tx;
293286
}
294287

288+
fn make_handshake(
289+
configuration_update_status: u32,
290+
stream_settings: Option<StreamSettings>,
291+
) -> ApiResponse {
292+
ApiResponse {
293+
response_type: Some(Response_type::Handshake(HandshakeResponse {
294+
stream_settings: stream_settings.into(),
295+
configuration_update_status,
296+
..Default::default()
297+
})),
298+
..Default::default()
299+
}
300+
}
301+
295302
async fn handshake_response(
296303
&self,
297304
configuration_update_status: u32,
298305
stream_settings: Option<StreamSettings>,
299306
) {
300-
let response = ApiResponse {
301-
response_type: Some(Response_type::Handshake(HandshakeResponse {
302-
stream_settings: stream_settings.into(),
307+
self
308+
.send_response(Self::make_handshake(
303309
configuration_update_status,
310+
stream_settings,
311+
))
312+
.await;
313+
}
314+
315+
fn make_error_shutdown(code: Code, message: &str, retry_after: Option<Duration>) -> ApiResponse {
316+
ApiResponse {
317+
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
318+
grpc_status: code.to_int(),
319+
grpc_message: message.to_string(),
320+
rate_limited: retry_after
321+
.map(|d| RateLimited {
322+
retry_after: d.into_proto(),
323+
..Default::default()
324+
})
325+
.into(),
304326
..Default::default()
305327
})),
306328
..Default::default()
307-
};
329+
}
330+
}
308331

309-
self.send_response(response).await;
332+
async fn error_shutdown(&self, code: Code, message: &str, retry_after: Option<Duration>) {
333+
self
334+
.send_response(Self::make_error_shutdown(code, message, retry_after))
335+
.await;
310336
}
311337

312338
async fn send_request(&self, data: DataUpload) {
@@ -327,6 +353,25 @@ impl Setup {
327353
tx.send(StreamEvent::Data(encoded.to_vec())).await.unwrap();
328354
}
329355

356+
async fn send_multiple_messages(&self, responses: Vec<ApiResponse>) {
357+
let tx = self
358+
.current_stream_tx
359+
.lock()
360+
.unwrap()
361+
.as_ref()
362+
.unwrap()
363+
.clone();
364+
365+
let mut encoder = Encoder::new(None);
366+
let mut encoded = Vec::new();
367+
for response in responses {
368+
let data = encoder.encode(&response).unwrap();
369+
encoded.extend(data);
370+
}
371+
372+
tx.send(StreamEvent::Data(encoded)).await.unwrap();
373+
}
374+
330375
async fn close_stream(&self) {
331376
let tx = self
332377
.current_stream_tx
@@ -382,8 +427,8 @@ impl Setup {
382427
self.decode(&data)
383428
}
384429

385-
fn make_nice_mock_updater() -> Arc<MockConfigurationUpdate> {
386-
let mut mock_updater = Arc::new(MockConfigurationUpdate::new());
430+
fn make_nice_mock_updater() -> Arc<MockClientConfigurationUpdate> {
431+
let mut mock_updater = Arc::new(MockClientConfigurationUpdate::new());
387432
make_mut(&mut mock_updater)
388433
.expect_fill_handshake()
389434
.times(..)
@@ -407,7 +452,7 @@ impl Setup {
407452

408453
#[tokio::test(start_paused = true)]
409454
async fn api_retry_stream() {
410-
let mut mock_updater = Arc::new(MockConfigurationUpdate::new());
455+
let mut mock_updater = Arc::new(MockClientConfigurationUpdate::new());
411456
make_mut(&mut mock_updater)
412457
.expect_fill_handshake()
413458
.times(..)
@@ -520,8 +565,6 @@ async fn api_retry_stream() {
520565
NetworkQuality::Online,
521566
setup.network_quality_provider.get_network_quality()
522567
);
523-
524-
setup.shutdown_trigger.shutdown().await;
525568
}
526569

527570
#[tokio::test(start_paused = true)]
@@ -622,12 +665,30 @@ async fn api_retry_stream_runtime_override() {
622665
setup.close_stream().await;
623666
assert!(setup.next_stream(1500.milliseconds()).await.is_some());
624667
}
668+
}
669+
670+
#[tokio::test(start_paused = true)]
671+
async fn multiple_handshake_messages_with_error() {
672+
let mut setup = Setup::new();
625673

626-
setup.shutdown_trigger.shutdown().await;
674+
assert!(setup.next_stream(1.seconds()).await.is_some());
675+
let now = Instant::now();
676+
setup
677+
.send_multiple_messages(vec![
678+
Setup::make_handshake(
679+
HANDSHAKE_FLAG_CONFIG_UP_TO_DATE | HANDSHAKE_FLAG_RUNTIME_UP_TO_DATE,
680+
None,
681+
),
682+
Setup::make_error_shutdown(Code::Internal, "some message", Some(5.minutes())),
683+
])
684+
.await;
685+
686+
assert!(setup.next_stream(6.minutes()).await.is_some());
687+
assert!(now.elapsed() >= 5.minutes());
627688
}
628689

629690
#[tokio::test(start_paused = true)]
630-
async fn error_response() {
691+
async fn error_response_with_retry_after() {
631692
let mut setup = Setup::new();
632693

633694
assert!(setup.next_stream(1.seconds()).await.is_some());
@@ -638,23 +699,13 @@ async fn error_response() {
638699
)
639700
.await;
640701

702+
let now = Instant::now();
641703
setup
642-
.send_response(ApiResponse {
643-
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
644-
grpc_status: 1,
645-
grpc_message: "some message".to_string(),
646-
..Default::default()
647-
})),
648-
..Default::default()
649-
})
704+
.error_shutdown(Code::Internal, "some message", Some(1.minutes()))
650705
.await;
651706

652-
// Processing the error message has no side effects, so we just make sure that we process is to
653-
// provide code coverage. To do so, we close the stream and wait for the next one. Since the close
654-
// event is processed via the same channel as the response, we know that the response must have
655-
// been processed.
656-
setup.close_stream().await;
657-
assert!(setup.next_stream(1.seconds()).await.is_some());
707+
assert!(setup.next_stream(2.minutes()).await.is_some());
708+
assert!(now.elapsed() >= 1.minutes());
658709

659710
setup
660711
.collector
@@ -668,21 +719,9 @@ async fn error_response_before_handshake() {
668719
assert!(setup.next_stream(1.seconds()).await.is_some());
669720

670721
setup
671-
.send_response(ApiResponse {
672-
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
673-
grpc_status: 1,
674-
grpc_message: "some message".to_string(),
675-
..Default::default()
676-
})),
677-
..Default::default()
678-
})
722+
.error_shutdown(Code::Internal, "some message", None)
679723
.await;
680724

681-
// Processing the error message has no side effects, so we just make sure that we process is to
682-
// provide code coverage. To do so, we close the stream and wait for the next one. Since the close
683-
// event is processed via the same channel as the response, we know that the response must have
684-
// been processed.
685-
setup.close_stream().await;
686725
assert!(setup.next_stream(1.seconds()).await.is_some());
687726

688727
setup
@@ -693,21 +732,29 @@ async fn error_response_before_handshake() {
693732
.assert_counter_eq(1, "api:remote_connect_failure", labels! {});
694733
}
695734

735+
#[tokio::test(start_paused = true)]
736+
async fn rate_limited_response_before_handshake() {
737+
let mut setup = Setup::new();
738+
739+
assert!(setup.next_stream(1.seconds()).await.is_some());
740+
741+
let now = Instant::now();
742+
setup
743+
.error_shutdown(Code::ResourceExhausted, "rate limited", Some(1.minutes()))
744+
.await;
745+
746+
assert!(setup.next_stream(2.minutes()).await.is_some());
747+
assert!(now.elapsed() >= 1.minutes());
748+
}
749+
696750
#[tokio::test(start_paused = true)]
697751
async fn unauthenticated_response_before_handshake() {
698752
let mut setup = Setup::new();
699753

700754
assert!(setup.next_stream(1.seconds()).await.is_some());
701755

702756
setup
703-
.send_response(ApiResponse {
704-
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
705-
grpc_status: 16,
706-
grpc_message: "some message".to_string(),
707-
..Default::default()
708-
})),
709-
..Default::default()
710-
})
757+
.error_shutdown(Code::Unauthenticated, "some message", None)
711758
.await;
712759

713760
// The unauthenticated response will kill the client for the default 1 day period. Make sure that

bd-artifact-upload/src/uploader_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ impl Setup {
9494
let config_loader = TestConfigLoader::new().await;
9595

9696
config_loader
97-
.update_snapshot(&bd_test_helpers::runtime::make_update(
97+
.update_snapshot(bd_test_helpers::runtime::make_update(
9898
vec![(
9999
artifact_upload::MaxPendingEntries::path(),
100100
ValueKind::Int(max_entries),

bd-client-common/src/lib.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
clippy::unwrap_used
1515
)]
1616

17-
use bd_proto::protos::client::api::{ApiRequest, ApiResponse, HandshakeRequest};
17+
use bd_proto::protos::client::api::{self, ApiRequest, HandshakeRequest};
18+
use mockall::mock;
1819
use std::future::{Future, pending};
1920
use tokio::time::Interval;
2021

@@ -71,11 +72,6 @@ pub const HANDSHAKE_FLAG_RUNTIME_UP_TO_DATE: u32 = 0x2;
7172
#[mockall::automock]
7273
#[async_trait::async_trait]
7374
pub trait ConfigurationUpdate: Send + Sync {
74-
/// Attempt to apply a new inbound configuration. Returns None if the response does not apply
75-
/// to this configuration type, otherwise returns the ack/nack after attempting to apply the
76-
/// config.
77-
async fn try_apply_config(&self, response: &ApiResponse) -> Option<ApiRequest>;
78-
7975
/// Attempts to load persisted config from disk if supported by the configuration type.
8076
async fn try_load_persisted_config(&self);
8177

@@ -88,3 +84,34 @@ pub trait ConfigurationUpdate: Send + Sync {
8884
/// Unconditionally mark any cached config as "safe" to use.
8985
async fn mark_safe(&self);
9086
}
87+
88+
// TODO(mattklein123): Move this trait and the client config code into its own crate to break a
89+
// a circular dependency. At the same time potentially just get rid of the ConfigurationUpdate
90+
// trait altogether as it's not clear we need it.
91+
#[async_trait::async_trait]
92+
pub trait ClientConfigurationUpdate: ConfigurationUpdate {
93+
async fn try_apply_config(
94+
&self,
95+
configuration_update: api::ConfigurationUpdate,
96+
) -> Option<ApiRequest>;
97+
}
98+
99+
mock! {
100+
pub ClientConfigurationUpdate {}
101+
102+
#[async_trait::async_trait]
103+
impl ConfigurationUpdate for ClientConfigurationUpdate {
104+
async fn try_load_persisted_config(&self);
105+
fn fill_handshake(&self, handshake: &mut HandshakeRequest);
106+
async fn on_handshake_complete(&self, configuration_update_status: u32);
107+
async fn mark_safe(&self);
108+
}
109+
110+
#[async_trait::async_trait]
111+
impl ClientConfigurationUpdate for ClientConfigurationUpdate {
112+
async fn try_apply_config(
113+
&self,
114+
configuration_update: api::ConfigurationUpdate,
115+
) -> Option<ApiRequest>;
116+
}
117+
}

0 commit comments

Comments
 (0)