Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api
224 changes: 120 additions & 104 deletions bd-api/src/api.rs

Large diffs are not rendered by default.

165 changes: 106 additions & 59 deletions bd-api/src/api_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use crate::upload::Tracked;
use anyhow::anyhow;
use assert_matches::assert_matches;
use bd_client_common::{
ClientConfigurationUpdate,
ConfigurationUpdate,
HANDSHAKE_FLAG_CONFIG_UP_TO_DATE,
HANDSHAKE_FLAG_RUNTIME_UP_TO_DATE,
MockConfigurationUpdate,
MockClientConfigurationUpdate,
};
use bd_client_stats_store::Collector;
use bd_client_stats_store::test::StatsHelper;
use bd_grpc_codec::code::Code;
use bd_grpc_codec::{Decompression, Encoder, OptimizeFor};
use bd_internal_logging::{LogFields, LogLevel, LogType};
use bd_metadata::{Metadata, Platform};
Expand All @@ -32,11 +34,11 @@ use bd_proto::protos::client::api::{
ErrorShutdown,
HandshakeRequest,
HandshakeResponse,
RateLimited,
RuntimeUpdate,
StatsUploadRequest,
};
use bd_runtime::runtime::{ConfigLoader, FeatureFlag};
use bd_shutdown::ComponentShutdownTrigger;
use bd_stats_common::labels;
use bd_test_helpers::make_mut;
use bd_time::{OffsetDateTimeExt, TimeDurationExt, TimeProvider, ToProtoDuration};
Expand Down Expand Up @@ -168,7 +170,6 @@ struct Setup {
data_tx: Sender<DataUpload>,
send_data_rx: Receiver<Vec<u8>>,
start_stream_rx: Receiver<()>,
shutdown_trigger: ComponentShutdownTrigger,
collector: Collector,
requests_decoder: bd_grpc_codec::Decoder<ApiRequest>,
time_provider: Arc<bd_time::TestTimeProvider>,
Expand All @@ -184,7 +185,7 @@ impl Setup {
Self::new_with_config_updater(Self::make_nice_mock_updater())
}

fn new_with_config_updater(updater: Arc<dyn ConfigurationUpdate>) -> Self {
fn new_with_config_updater(updater: Arc<dyn ClientConfigurationUpdate>) -> Self {
let sdk_directory = tempfile::TempDir::with_prefix("sdk").unwrap();

let (start_stream_tx, start_stream_rx) = channel(1);
Expand All @@ -195,7 +196,6 @@ impl Setup {
send_data_tx,
current_stream_tx.clone(),
));
let shutdown_trigger = ComponentShutdownTrigger::default();
let (data_tx, data_rx) = channel(1);
let (trigger_upload_tx, _trigger_upload_rx) = channel(1);
let (sleep_mode_active_tx, sleep_mode_active_rx) = watch::channel(false);
Expand All @@ -211,7 +211,6 @@ impl Setup {
sdk_directory.path().to_path_buf(),
api_key.clone(),
manager,
shutdown_trigger.make_shutdown(),
data_rx,
trigger_upload_tx,
Arc::new(EmptyMetadata),
Expand All @@ -235,7 +234,6 @@ impl Setup {
start_stream_rx,
data_tx,
send_data_rx,
shutdown_trigger,
collector,
time_provider,
requests_decoder: bd_grpc_codec::Decoder::new(
Expand All @@ -250,10 +248,6 @@ impl Setup {
}

async fn restart(&mut self) {
let old_shutdown = std::mem::take(&mut self.shutdown_trigger);
old_shutdown.shutdown().await;
self.api_task.take().unwrap().await.unwrap().unwrap();

let (start_stream_tx, start_stream_rx) = channel(1);
let (send_data_tx, send_data_rx) = channel(1);
let current_stream_tx = Arc::new(Mutex::new(None));
Expand All @@ -272,7 +266,6 @@ impl Setup {
self.sdk_directory.path().to_path_buf(),
self.api_key.clone(),
manager,
self.shutdown_trigger.make_shutdown(),
data_rx,
trigger_upload_tx,
Arc::new(EmptyMetadata),
Expand All @@ -292,21 +285,54 @@ impl Setup {
self.current_stream_tx = current_stream_tx;
}

fn make_handshake(
configuration_update_status: u32,
stream_settings: Option<StreamSettings>,
) -> ApiResponse {
ApiResponse {
response_type: Some(Response_type::Handshake(HandshakeResponse {
stream_settings: stream_settings.into(),
configuration_update_status,
..Default::default()
})),
..Default::default()
}
}

async fn handshake_response(
&self,
configuration_update_status: u32,
stream_settings: Option<StreamSettings>,
) {
let response = ApiResponse {
response_type: Some(Response_type::Handshake(HandshakeResponse {
stream_settings: stream_settings.into(),
self
.send_response(Self::make_handshake(
configuration_update_status,
stream_settings,
))
.await;
}

fn make_error_shutdown(code: Code, message: &str, retry_after: Option<Duration>) -> ApiResponse {
ApiResponse {
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
grpc_status: code.to_int(),
grpc_message: message.to_string(),
rate_limited: retry_after
.map(|d| RateLimited {
retry_after: d.into_proto(),
..Default::default()
})
.into(),
..Default::default()
})),
..Default::default()
};
}
}

self.send_response(response).await;
async fn error_shutdown(&self, code: Code, message: &str, retry_after: Option<Duration>) {
self
.send_response(Self::make_error_shutdown(code, message, retry_after))
.await;
}

async fn send_request(&self, data: DataUpload) {
Expand All @@ -327,6 +353,25 @@ impl Setup {
tx.send(StreamEvent::Data(encoded.to_vec())).await.unwrap();
}

async fn send_multiple_messages(&self, responses: Vec<ApiResponse>) {
let tx = self
.current_stream_tx
.lock()
.unwrap()
.as_ref()
.unwrap()
.clone();

let mut encoder = Encoder::new(None);
let mut encoded = Vec::new();
for response in responses {
let data = encoder.encode(&response).unwrap();
encoded.extend(data);
}

tx.send(StreamEvent::Data(encoded)).await.unwrap();
}

async fn close_stream(&self) {
let tx = self
.current_stream_tx
Expand Down Expand Up @@ -382,8 +427,8 @@ impl Setup {
self.decode(&data)
}

fn make_nice_mock_updater() -> Arc<MockConfigurationUpdate> {
let mut mock_updater = Arc::new(MockConfigurationUpdate::new());
fn make_nice_mock_updater() -> Arc<MockClientConfigurationUpdate> {
let mut mock_updater = Arc::new(MockClientConfigurationUpdate::new());
make_mut(&mut mock_updater)
.expect_fill_handshake()
.times(..)
Expand All @@ -407,7 +452,7 @@ impl Setup {

#[tokio::test(start_paused = true)]
async fn api_retry_stream() {
let mut mock_updater = Arc::new(MockConfigurationUpdate::new());
let mut mock_updater = Arc::new(MockClientConfigurationUpdate::new());
make_mut(&mut mock_updater)
.expect_fill_handshake()
.times(..)
Expand Down Expand Up @@ -520,8 +565,6 @@ async fn api_retry_stream() {
NetworkQuality::Online,
setup.network_quality_provider.get_network_quality()
);

setup.shutdown_trigger.shutdown().await;
}

#[tokio::test(start_paused = true)]
Expand Down Expand Up @@ -622,12 +665,30 @@ async fn api_retry_stream_runtime_override() {
setup.close_stream().await;
assert!(setup.next_stream(1500.milliseconds()).await.is_some());
}
}

#[tokio::test(start_paused = true)]
async fn multiple_handshake_messages_with_error() {
let mut setup = Setup::new();

setup.shutdown_trigger.shutdown().await;
assert!(setup.next_stream(1.seconds()).await.is_some());
let now = Instant::now();
setup
.send_multiple_messages(vec![
Setup::make_handshake(
HANDSHAKE_FLAG_CONFIG_UP_TO_DATE | HANDSHAKE_FLAG_RUNTIME_UP_TO_DATE,
None,
),
Setup::make_error_shutdown(Code::Internal, "some message", Some(5.minutes())),
])
.await;

assert!(setup.next_stream(6.minutes()).await.is_some());
assert!(now.elapsed() >= 5.minutes());
}

#[tokio::test(start_paused = true)]
async fn error_response() {
async fn error_response_with_retry_after() {
let mut setup = Setup::new();

assert!(setup.next_stream(1.seconds()).await.is_some());
Expand All @@ -638,23 +699,13 @@ async fn error_response() {
)
.await;

let now = Instant::now();
setup
.send_response(ApiResponse {
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
grpc_status: 1,
grpc_message: "some message".to_string(),
..Default::default()
})),
..Default::default()
})
.error_shutdown(Code::Internal, "some message", Some(1.minutes()))
.await;

// Processing the error message has no side effects, so we just make sure that we process is to
// provide code coverage. To do so, we close the stream and wait for the next one. Since the close
// event is processed via the same channel as the response, we know that the response must have
// been processed.
setup.close_stream().await;
assert!(setup.next_stream(1.seconds()).await.is_some());
assert!(setup.next_stream(2.minutes()).await.is_some());
assert!(now.elapsed() >= 1.minutes());

setup
.collector
Expand All @@ -668,21 +719,9 @@ async fn error_response_before_handshake() {
assert!(setup.next_stream(1.seconds()).await.is_some());

setup
.send_response(ApiResponse {
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
grpc_status: 1,
grpc_message: "some message".to_string(),
..Default::default()
})),
..Default::default()
})
.error_shutdown(Code::Internal, "some message", None)
.await;

// Processing the error message has no side effects, so we just make sure that we process is to
// provide code coverage. To do so, we close the stream and wait for the next one. Since the close
// event is processed via the same channel as the response, we know that the response must have
// been processed.
setup.close_stream().await;
assert!(setup.next_stream(1.seconds()).await.is_some());

setup
Expand All @@ -693,21 +732,29 @@ async fn error_response_before_handshake() {
.assert_counter_eq(1, "api:remote_connect_failure", labels! {});
}

#[tokio::test(start_paused = true)]
async fn rate_limited_response_before_handshake() {
let mut setup = Setup::new();

assert!(setup.next_stream(1.seconds()).await.is_some());

let now = Instant::now();
setup
.error_shutdown(Code::ResourceExhausted, "rate limited", Some(1.minutes()))
.await;

assert!(setup.next_stream(2.minutes()).await.is_some());
assert!(now.elapsed() >= 1.minutes());
}

#[tokio::test(start_paused = true)]
async fn unauthenticated_response_before_handshake() {
let mut setup = Setup::new();

assert!(setup.next_stream(1.seconds()).await.is_some());

setup
.send_response(ApiResponse {
response_type: Some(Response_type::ErrorShutdown(ErrorShutdown {
grpc_status: 16,
grpc_message: "some message".to_string(),
..Default::default()
})),
..Default::default()
})
.error_shutdown(Code::Unauthenticated, "some message", None)
.await;

// The unauthenticated response will kill the client for the default 1 day period. Make sure that
Expand Down
2 changes: 1 addition & 1 deletion bd-artifact-upload/src/uploader_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl Setup {
let config_loader = TestConfigLoader::new().await;

config_loader
.update_snapshot(&bd_test_helpers::runtime::make_update(
.update_snapshot(bd_test_helpers::runtime::make_update(
vec![(
artifact_upload::MaxPendingEntries::path(),
ValueKind::Int(max_entries),
Expand Down
39 changes: 33 additions & 6 deletions bd-client-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
clippy::unwrap_used
)]

use bd_proto::protos::client::api::{ApiRequest, ApiResponse, HandshakeRequest};
use bd_proto::protos::client::api::{self, ApiRequest, HandshakeRequest};
use mockall::mock;
use std::future::{Future, pending};
use tokio::time::Interval;

Expand Down Expand Up @@ -71,11 +72,6 @@ pub const HANDSHAKE_FLAG_RUNTIME_UP_TO_DATE: u32 = 0x2;
#[mockall::automock]
#[async_trait::async_trait]
pub trait ConfigurationUpdate: Send + Sync {
/// Attempt to apply a new inbound configuration. Returns None if the response does not apply
/// to this configuration type, otherwise returns the ack/nack after attempting to apply the
/// config.
async fn try_apply_config(&self, response: &ApiResponse) -> Option<ApiRequest>;

/// Attempts to load persisted config from disk if supported by the configuration type.
async fn try_load_persisted_config(&self);

Expand All @@ -88,3 +84,34 @@ pub trait ConfigurationUpdate: Send + Sync {
/// Unconditionally mark any cached config as "safe" to use.
async fn mark_safe(&self);
}

// TODO(mattklein123): Move this trait and the client config code into its own crate to break a
// a circular dependency. At the same time potentially just get rid of the ConfigurationUpdate
// trait altogether as it's not clear we need it.
#[async_trait::async_trait]
pub trait ClientConfigurationUpdate: ConfigurationUpdate {
async fn try_apply_config(
&self,
configuration_update: api::ConfigurationUpdate,
) -> Option<ApiRequest>;
}

mock! {
pub ClientConfigurationUpdate {}

#[async_trait::async_trait]
impl ConfigurationUpdate for ClientConfigurationUpdate {
async fn try_load_persisted_config(&self);
fn fill_handshake(&self, handshake: &mut HandshakeRequest);
async fn on_handshake_complete(&self, configuration_update_status: u32);
async fn mark_safe(&self);
}

#[async_trait::async_trait]
impl ClientConfigurationUpdate for ClientConfigurationUpdate {
async fn try_apply_config(
&self,
configuration_update: api::ConfigurationUpdate,
) -> Option<ApiRequest>;
}
}
Loading
Loading