Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: build
build: setup
SKIP_PROTO_GEN=1 cargo build --workspace
SKIP_PROTO_GEN=1 cargo build --workspace

.PHONY: setup
setup:
Expand Down
38 changes: 20 additions & 18 deletions bd-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use backoff::SystemClock;
use backoff::backoff::Backoff;
use backoff::exponential::ExponentialBackoff;
use bd_client_common::file::{read_compressed_protobuf, write_compressed_protobuf};
use bd_client_common::payload_conversion::{IntoRequest, MuxResponse, ResponseKind};
use bd_client_common::payload_conversion::IntoRequest;
use bd_client_common::zlib::DEFAULT_MOBILE_ZLIB_COMPRESSION_LEVEL;
use bd_client_common::{ClientConfigurationUpdate, maybe_await};
use bd_client_stats_store::{Counter, CounterWrapper, Scope};
Expand All @@ -38,6 +38,7 @@ use bd_grpc_codec::{
};
use bd_metadata::Metadata;
use bd_network_quality::{NetworkQuality, NetworkQualityProvider};
use bd_proto::protos::client::api::api_response::Response_type;
pub use bd_proto::protos::client::api::log_upload_intent_response::{
Decision as LogsUploadDecision,
Drop as LogsUploadDecisionDrop,
Expand Down Expand Up @@ -302,6 +303,7 @@ impl StreamState {
let req = self.upload_state_tracker.track_upload(tracked);
self.send_request(req).await
},
DataUpload::DebugData(request) => self.send_request(request).await,
}
}
}
Expand Down Expand Up @@ -780,12 +782,12 @@ impl Api {
stream_state: &mut StreamState,
) -> anyhow::Result<Option<StreamClosureInfo>> {
for response in responses {
match response.demux() {
Some(ResponseKind::Handshake(_)) => {
match response.response_type {
Some(Response_type::Handshake(_)) => {
anyhow::bail!("unexpected api response: spurious handshake")
},
Some(ResponseKind::Pong(_)) => stream_state.maybe_schedule_ping(),
Some(ResponseKind::ErrorShutdown(error)) => {
Some(Response_type::Pong(_)) => stream_state.maybe_schedule_ping(),
Some(Response_type::ErrorShutdown(error)) => {
log::debug!(
"close with status {:?}, message {:?}",
error.grpc_status,
Expand All @@ -801,7 +803,7 @@ impl Api {
.map(bd_time::ProtoDurationExt::to_time_duration),
}));
},
Some(ResponseKind::LogUpload(log_upload)) => {
Some(Response_type::LogUpload(log_upload)) => {
log::debug!(
"received ack for log upload {:?} ({} dropped), error: {:?}",
log_upload.upload_uuid,
Expand All @@ -813,12 +815,12 @@ impl Api {
.upload_state_tracker
.resolve_pending_upload(&log_upload.upload_uuid, &log_upload.error)?;
},
Some(ResponseKind::LogUploadIntent(intent)) => {
Some(Response_type::LogUploadIntent(intent)) => {
stream_state
.upload_state_tracker
.resolve_intent(&intent.intent_uuid, intent.decision)?;
},
Some(ResponseKind::StatsUpload(stats_upload)) => {
Some(Response_type::StatsUpload(stats_upload)) => {
log::debug!(
"received ack for stats upload {:?}, error: {:?}",
stats_upload.upload_uuid,
Expand All @@ -829,7 +831,7 @@ impl Api {
.upload_state_tracker
.resolve_pending_upload(&stats_upload.upload_uuid, &stats_upload.error)?;
},
Some(ResponseKind::FlushBuffers(flush_buffers)) => {
Some(Response_type::FlushBuffers(flush_buffers)) => {
let (tx, _rx) = tokio::sync::oneshot::channel();

self
Expand All @@ -838,7 +840,7 @@ impl Api {
.await
.map_err(|_| anyhow!("remote trigger upload tx"))?;
},
Some(ResponseKind::SankeyPathUpload(sankey_path_upload)) => {
Some(Response_type::SankeyDiagramUpload(sankey_path_upload)) => {
log::debug!(
"received ack for sankey path upload {:?}, error: {:?}",
sankey_path_upload.upload_uuid,
Expand All @@ -849,7 +851,7 @@ impl Api {
.upload_state_tracker
.resolve_pending_upload(&sankey_path_upload.upload_uuid, &sankey_path_upload.error)?;
},
Some(ResponseKind::SankeyPathUploadIntent(sankey_path_upload_intent)) => {
Some(Response_type::SankeyIntentResponse(sankey_path_upload_intent)) => {
log::debug!(
"received ack for sankey path upload intent {:?}, decision: {:?}",
sankey_path_upload_intent.intent_uuid,
Expand All @@ -861,7 +863,7 @@ impl Api {
sankey_path_upload_intent.decision,
)?;
},
Some(ResponseKind::ArtifactUpload(artifact_upload)) => {
Some(Response_type::ArtifactUpload(artifact_upload)) => {
log::debug!(
"received ack for artifact upload {:?}, error: {:?}",
artifact_upload.upload_uuid,
Expand All @@ -872,7 +874,7 @@ impl Api {
.upload_state_tracker
.resolve_pending_upload(&artifact_upload.upload_uuid, &artifact_upload.error)?;
},
Some(ResponseKind::ArtifactUploadIntent(artifact_upload_intent)) => {
Some(Response_type::ArtifactIntent(artifact_upload_intent)) => {
log::debug!(
"received ack for artifact upload intent {:?}, decision: {:?}",
artifact_upload_intent.intent_uuid,
Expand All @@ -884,12 +886,12 @@ impl Api {
artifact_upload_intent.decision,
)?;
},
Some(ResponseKind::ConfigurationUpdate(update)) => {
Some(Response_type::ConfigurationUpdate(update)) => {
if let Some(request) = self.config_updater.try_apply_config(update).await {
stream_state.send_request(request).await?;
}
},
Some(ResponseKind::RuntimeUpdate(update)) => {
Some(Response_type::RuntimeUpdate(update)) => {
if let Some(request) = self.runtime_loader.try_apply_config(update).await {
stream_state.send_request(request).await?;
}
Expand Down Expand Up @@ -946,16 +948,16 @@ impl Api {
// it and pass it back up as well as any other frames we decoded for immediate
// processing. The response decoder might still contain data, so we continue to use
// it for further decoding.
match first.demux() {
Some(ResponseKind::Handshake(mut h)) => {
match first.response_type {
Some(Response_type::Handshake(mut h)) => {
log::debug!("received handshake");
return Ok(HandshakeResult::Received {
stream_settings: h.stream_settings.take(),
configuration_update_status: h.configuration_update_status,
remaining_responses: responses.collect(),
});
},
Some(ResponseKind::ErrorShutdown(error)) => {
Some(Response_type::ErrorShutdown(error)) => {
// If we're provided with an invalid API key or otherwise fail to authenticate with
// the backend, log this as a warning to surface this to developers trying to set up
// the SDK.
Expand Down
4 changes: 4 additions & 0 deletions bd-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use backoff::SystemClock;
use backoff::exponential::{ExponentialBackoff, ExponentialBackoffBuilder};
use bd_proto::protos::client::api::{
DebugDataRequest,
LogUploadIntentRequest,
LogUploadRequest,
SankeyIntentRequest,
Expand Down Expand Up @@ -68,6 +69,9 @@ pub enum DataUpload {

/// An generic artifact upload request.
ArtifactUpload(TrackedUpload<UploadArtifactRequest>),

/// A request to upload debug data.
DebugData(DebugDataRequest),
}

//
Expand Down
104 changes: 6 additions & 98 deletions bd-client-common/src/payload_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,96 +15,30 @@
// them.

use bd_proto::protos::client::api::api_request::Request_type;
use bd_proto::protos::client::api::api_response::Response_type;
use bd_proto::protos::client::api::{
ApiRequest,
ApiResponse,
ConfigurationUpdate,
ConfigurationUpdateAck,
ErrorShutdown,
FlushBuffers,
DebugDataRequest,
HandshakeRequest,
HandshakeResponse,
LogUploadIntentRequest,
LogUploadIntentResponse,
LogUploadRequest,
LogUploadResponse,
PingRequest,
PongResponse,
RuntimeUpdate,
SankeyIntentRequest,
SankeyIntentResponse,
SankeyPathUploadRequest,
SankeyPathUploadResponse,
StatsUploadRequest,
StatsUploadResponse,
UploadArtifactIntentRequest,
UploadArtifactIntentResponse,
UploadArtifactRequest,
UploadArtifactResponse,
};

//
// ResponseKind
//

/// A transport independent representation of the possible multiplexed response types.
pub enum ResponseKind {
Handshake(HandshakeResponse),
ErrorShutdown(ErrorShutdown),
Pong(PongResponse),
LogUpload(LogUploadResponse),
LogUploadIntent(LogUploadIntentResponse),
StatsUpload(StatsUploadResponse),
FlushBuffers(FlushBuffers),
SankeyPathUpload(SankeyPathUploadResponse),
SankeyPathUploadIntent(SankeyIntentResponse),
ArtifactUploadIntent(UploadArtifactIntentResponse),
ArtifactUpload(UploadArtifactResponse),
ConfigurationUpdate(ConfigurationUpdate),
RuntimeUpdate(RuntimeUpdate),
}

//
// MuxResponse
//

/// Used to convert a response type into the transport independent `ResponseKind`.
pub trait MuxResponse {
fn demux(self) -> Option<ResponseKind>;
}

/// Used to allow the API mux operate against multiple different transport APIs. The code is able
/// to operate against the inner types that can be wrapped up into the appropriate mux type
/// depending on the use case.
pub trait IntoRequest {
fn into_request(self) -> ApiRequest;
}

//
// FromResponse
//

/// Used to unwrap a multiplexing `ResponseType` into an inner type.
pub trait FromResponse<ResponseType> {
fn from_response(response: &ResponseType) -> Option<&Self>;
}

pub struct RuntimeConfigurationUpdate(pub ConfigurationUpdateAck);
pub struct ClientConfigurationUpdate(pub ConfigurationUpdateAck);

macro_rules! unwrap_response {
($wrapper:ty, $inner:ty, $field:path) => {
impl crate::payload_conversion::FromResponse<$wrapper> for $inner {
fn from_response(response: &$wrapper) -> Option<&Self> {
match &response.response_type {
Some($field(inner)) => Some(inner),
_ => None,
}
}
}
};
}
pub struct RuntimeConfigurationUpdateAck(pub ConfigurationUpdateAck);
pub struct ClientConfigurationUpdateAck(pub ConfigurationUpdateAck);

// Helper macro for defining IntoRequest for a wrapper type where an inner request type is wrapped
// using the provided oneof branch.
Expand All @@ -121,13 +55,6 @@ macro_rules! into_api_request {
};
}

unwrap_response!(
ApiResponse,
ConfigurationUpdate,
Response_type::ConfigurationUpdate
);
unwrap_response!(ApiResponse, RuntimeUpdate, Response_type::RuntimeUpdate);

impl crate::payload_conversion::IntoRequest for ApiRequest {
fn into_request(self) -> Self {
self
Expand All @@ -143,8 +70,9 @@ into_api_request!(SankeyIntentRequest, Request_type::SankeyIntent);
into_api_request!(SankeyPathUploadRequest, Request_type::SankeyPathUpload);
into_api_request!(UploadArtifactRequest, Request_type::ArtifactUpload);
into_api_request!(UploadArtifactIntentRequest, Request_type::ArtifactIntent);
into_api_request!(DebugDataRequest, Request_type::DebugData);

impl IntoRequest for RuntimeConfigurationUpdate {
impl IntoRequest for RuntimeConfigurationUpdateAck {
fn into_request(self) -> ApiRequest {
ApiRequest {
request_type: Some(Request_type::RuntimeUpdateAck(self.0)),
Expand All @@ -153,31 +81,11 @@ impl IntoRequest for RuntimeConfigurationUpdate {
}
}

impl IntoRequest for ClientConfigurationUpdate {
impl IntoRequest for ClientConfigurationUpdateAck {
fn into_request(self) -> ApiRequest {
ApiRequest {
request_type: Some(Request_type::ConfigurationUpdateAck(self.0)),
..Default::default()
}
}
}

impl MuxResponse for ApiResponse {
fn demux(self) -> Option<ResponseKind> {
match self.response_type? {
Response_type::Handshake(handshake) => Some(ResponseKind::Handshake(handshake)),
Response_type::LogUpload(log_upload) => Some(ResponseKind::LogUpload(log_upload)),
Response_type::LogUploadIntent(intent) => Some(ResponseKind::LogUploadIntent(intent)),
Response_type::StatsUpload(stats_upload) => Some(ResponseKind::StatsUpload(stats_upload)),
Response_type::Pong(pong) => Some(ResponseKind::Pong(pong)),
Response_type::ErrorShutdown(e) => Some(ResponseKind::ErrorShutdown(e)),
Response_type::FlushBuffers(f) => Some(ResponseKind::FlushBuffers(f)),
Response_type::SankeyDiagramUpload(s) => Some(ResponseKind::SankeyPathUpload(s)),
Response_type::SankeyIntentResponse(s) => Some(ResponseKind::SankeyPathUploadIntent(s)),
Response_type::ConfigurationUpdate(u) => Some(ResponseKind::ConfigurationUpdate(u)),
Response_type::RuntimeUpdate(r) => Some(ResponseKind::RuntimeUpdate(r)),
Response_type::ArtifactUpload(u) => Some(ResponseKind::ArtifactUpload(u)),
Response_type::ArtifactIntent(u) => Some(ResponseKind::ArtifactUploadIntent(u)),
}
}
}
Loading
Loading