Skip to content

Commit 962e1d8

Browse files
committed
Slight refactor of how span reparenting works
Signed-off-by: Caleb Schoepp <[email protected]>
1 parent bb46919 commit 962e1d8

File tree

15 files changed

+66
-130
lines changed

15 files changed

+66
-130
lines changed

crates/factor-key-value/src/lib.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99

1010
use anyhow::ensure;
1111
use host::KEY_VALUE_STORES_KEY;
12-
use spin_factor_observe::{ObserveContext, ObserveFactor};
12+
use spin_factor_observe::ObserveContext;
1313
use spin_factors::{
1414
ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
1515
};
@@ -97,11 +97,7 @@ impl Factor for KeyValueFactor {
9797
.get(ctx.app_component().id())
9898
.expect("component should be in component_stores")
9999
.clone();
100-
let observe_context = match ctx.instance_builder::<ObserveFactor>() {
101-
Ok(factor) => Some(factor.get_observe_context()),
102-
Err(spin_factors::Error::NoSuchFactor(_)) => None,
103-
Err(e) => return Err(e.into()),
104-
};
100+
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
105101
Ok(InstanceBuilder {
106102
store_manager: app_state.store_manager.clone(),
107103
allowed_stores,
@@ -154,7 +150,7 @@ pub struct InstanceBuilder {
154150
store_manager: Arc<AppStoreManager>,
155151
/// The allowed stores for this component instance.
156152
allowed_stores: HashSet<String>,
157-
observe_context: Option<ObserveContext>,
153+
observe_context: ObserveContext,
158154
}
159155

160156
impl FactorInstanceBuilder for InstanceBuilder {
@@ -167,7 +163,7 @@ impl FactorInstanceBuilder for InstanceBuilder {
167163
observe_context,
168164
} = self;
169165
let mut dispatch = KeyValueDispatch::new_with_capacity(u32::MAX);
170-
dispatch.init(allowed_stores, store_manager, observe_context);
166+
dispatch.init(allowed_stores, store_manager, Some(observe_context));
171167
Ok(dispatch)
172168
}
173169
}

crates/factor-llm/src/host.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ impl v2::Host for InstanceState {
1515
prompt: String,
1616
params: Option<v2::InferencingParams>,
1717
) -> Result<v2::InferencingResult, v2::Error> {
18-
if let Some(observe_context) = self.observe_context.as_ref() {
19-
observe_context.reparent_tracing_span()
20-
}
18+
self.observe_context.reparent_tracing_span();
2119

2220
if !self.allowed_models.contains(&model) {
2321
return Err(access_denied_error(&model));
@@ -46,9 +44,7 @@ impl v2::Host for InstanceState {
4644
model: v1::EmbeddingModel,
4745
data: Vec<String>,
4846
) -> Result<v2::EmbeddingsResult, v2::Error> {
49-
if let Some(observe_context) = self.observe_context.as_ref() {
50-
observe_context.reparent_tracing_span()
51-
}
47+
self.observe_context.reparent_tracing_span();
5248

5349
if !self.allowed_models.contains(&model) {
5450
return Err(access_denied_error(&model));

crates/factor-llm/src/lib.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::collections::{HashMap, HashSet};
55
use std::sync::Arc;
66

77
use async_trait::async_trait;
8-
use spin_factor_observe::{ObserveContext, ObserveFactor};
8+
use spin_factor_observe::ObserveContext;
99
use spin_factors::{
1010
ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
1111
};
@@ -86,11 +86,7 @@ impl Factor for LlmFactor {
8686
.cloned()
8787
.unwrap_or_default();
8888
let engine = ctx.app_state().engine.clone();
89-
let observe_context = match ctx.instance_builder::<ObserveFactor>() {
90-
Ok(factor) => Some(factor.get_observe_context()),
91-
Err(spin_factors::Error::NoSuchFactor(_)) => None,
92-
Err(e) => return Err(e.into()),
93-
};
89+
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
9490

9591
Ok(InstanceState {
9692
engine,
@@ -110,7 +106,7 @@ pub struct AppState {
110106
pub struct InstanceState {
111107
engine: Arc<Mutex<dyn LlmEngine>>,
112108
pub allowed_models: Arc<HashSet<String>>,
113-
observe_context: Option<ObserveContext>,
109+
observe_context: ObserveContext,
114110
}
115111

116112
/// The runtime configuration for the LLM factor.

crates/factor-observe/src/lib.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use opentelemetry::{
88
trace::{SpanId, TraceContextExt},
99
Context,
1010
};
11-
use spin_factors::{Factor, SelfInstanceBuilder};
11+
use spin_factors::{Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder};
1212
use tracing_opentelemetry::OpenTelemetrySpanExt;
1313

1414
#[derive(Default)]
@@ -65,15 +65,6 @@ pub struct InstanceState {
6565

6666
impl SelfInstanceBuilder for InstanceState {}
6767

68-
impl InstanceState {
69-
/// TODO comment
70-
pub fn get_observe_context(&self) -> ObserveContext {
71-
ObserveContext {
72-
state: self.state.clone(),
73-
}
74-
}
75-
}
76-
7768
/// Internal state of the ObserveFactor instance state.
7869
///
7970
/// This data lives here rather than directly on InstanceState so that we can have multiple things
@@ -104,16 +95,32 @@ pub struct GuestSpan {
10495

10596
/// TODO comment
10697
pub struct ObserveContext {
107-
pub(crate) state: Arc<RwLock<State>>,
98+
pub(crate) state: Option<Arc<RwLock<State>>>,
10899
}
109100

110101
impl ObserveContext {
102+
/// TODO: Comment
103+
pub fn from_prepare_context<T: RuntimeFactors, F: Factor>(
104+
prepare_context: &mut PrepareContext<T, F>,
105+
) -> anyhow::Result<Self> {
106+
let state = match prepare_context.instance_builder::<ObserveFactor>() {
107+
Ok(instance_state) => Some(instance_state.state.clone()),
108+
Err(spin_factors::Error::NoSuchFactor(_)) => None,
109+
Err(e) => return Err(e.into()),
110+
};
111+
Ok(Self { state })
112+
}
113+
111114
/// TODO comment
112115
/// Make sure to mention this should only be called from an instrumented function in a factor.
113116
/// Make sure this is called before any awaits
114117
pub fn reparent_tracing_span(&self) {
115-
// TODO: Move this duplicate logic into its own impl
116-
let state = self.state.read().unwrap();
118+
let state = if let Some(state) = self.state.as_ref() {
119+
state.read().unwrap()
120+
} else {
121+
return;
122+
};
123+
117124
if state.active_spans.is_empty() {
118125
return;
119126
}

crates/factor-outbound-http/src/lib.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use http::{
1010
uri::{Authority, Parts, PathAndQuery, Scheme},
1111
HeaderValue, Uri,
1212
};
13-
use spin_factor_observe::{ObserveContext, ObserveFactor};
13+
use spin_factor_observe::ObserveContext;
1414
use spin_factor_outbound_networking::{
1515
ComponentTlsConfigs, OutboundAllowedHosts, OutboundNetworkingFactor,
1616
};
@@ -75,11 +75,7 @@ impl Factor for OutboundHttpFactor {
7575
let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
7676
let allowed_hosts = outbound_networking.allowed_hosts();
7777
let component_tls_configs = outbound_networking.component_tls_configs().clone();
78-
let observe_context = match ctx.instance_builder::<ObserveFactor>() {
79-
Ok(factor) => Some(factor.get_observe_context()),
80-
Err(spin_factors::Error::NoSuchFactor(_)) => None,
81-
Err(e) => return Err(e.into()),
82-
};
78+
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
8379
Ok(InstanceState {
8480
wasi_http_ctx: WasiHttpCtx::new(),
8581
allowed_hosts,
@@ -102,7 +98,7 @@ pub struct InstanceState {
10298
request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
10399
// Connection-pooling client for 'fermyon:spin/http' interface
104100
spin_http_client: Option<reqwest::Client>,
105-
observe_context: Option<ObserveContext>,
101+
observe_context: ObserveContext,
106102
}
107103

108104
impl InstanceState {

crates/factor-outbound-http/src/spin.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ impl spin_http::Host for crate::InstanceState {
1313
fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
1414
http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
1515
async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
16-
if let Some(observe_context) = self.observe_context.as_ref() {
17-
observe_context.reparent_tracing_span()
18-
}
16+
self.observe_context.reparent_tracing_span();
1917

2018
let span = Span::current();
2119
record_request_fields(&span, &req);

crates/factor-outbound-http/src/wasi.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,8 @@ impl<'a> WasiHttpView for WasiHttpImplInner<'a> {
8787
request: Request<wasmtime_wasi_http::body::HyperOutgoingBody>,
8888
config: wasmtime_wasi_http::types::OutgoingRequestConfig,
8989
) -> wasmtime_wasi_http::HttpResult<wasmtime_wasi_http::types::HostFutureIncomingResponse> {
90-
if let Some(observe_context) = self.state.observe_context.as_ref() {
91-
observe_context.reparent_tracing_span()
92-
}
90+
self.state.observe_context.reparent_tracing_span();
91+
9392
Ok(HostFutureIncomingResponse::Pending(
9493
wasmtime_wasi::runtime::spawn(
9594
send_request_impl(

crates/factor-outbound-mqtt/src/host.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ pub struct InstanceState {
1313
allowed_hosts: OutboundAllowedHosts,
1414
connections: table::Table<Arc<dyn MqttClient>>,
1515
create_client: Arc<dyn ClientCreator>,
16-
observe_context: Option<ObserveContext>,
16+
observe_context: ObserveContext,
1717
}
1818

1919
impl InstanceState {
2020
pub fn new(
2121
allowed_hosts: OutboundAllowedHosts,
2222
create_client: Arc<dyn ClientCreator>,
23-
observe_context: Option<ObserveContext>,
23+
observe_context: ObserveContext,
2424
) -> Self {
2525
Self {
2626
allowed_hosts,
@@ -80,9 +80,7 @@ impl v2::HostConnection for InstanceState {
8080
password: String,
8181
keep_alive_interval: u64,
8282
) -> Result<Resource<Connection>, Error> {
83-
if let Some(observe_context) = self.observe_context.as_ref() {
84-
observe_context.reparent_tracing_span()
85-
}
83+
self.observe_context.reparent_tracing_span();
8684

8785
if !self
8886
.is_address_allowed(&address)
@@ -117,9 +115,7 @@ impl v2::HostConnection for InstanceState {
117115
payload: Vec<u8>,
118116
qos: Qos,
119117
) -> Result<(), Error> {
120-
if let Some(observe_context) = self.observe_context.as_ref() {
121-
observe_context.reparent_tracing_span()
122-
}
118+
self.observe_context.reparent_tracing_span();
123119

124120
let conn = self.get_conn(connection).await.map_err(other_error)?;
125121

crates/factor-outbound-mqtt/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use host::other_error;
77
use host::InstanceState;
88
use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
99
use spin_core::async_trait;
10-
use spin_factor_observe::ObserveFactor;
10+
use spin_factor_observe::ObserveContext;
1111
use spin_factor_outbound_networking::OutboundNetworkingFactor;
1212
use spin_factors::{
1313
ConfigureAppContext, Factor, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
@@ -54,11 +54,7 @@ impl Factor for OutboundMqttFactor {
5454
let allowed_hosts = ctx
5555
.instance_builder::<OutboundNetworkingFactor>()?
5656
.allowed_hosts();
57-
let observe_context = match ctx.instance_builder::<ObserveFactor>() {
58-
Ok(factor) => Some(factor.get_observe_context()),
59-
Err(spin_factors::Error::NoSuchFactor(_)) => None,
60-
Err(e) => return Err(e.into()),
61-
};
57+
let observe_context = ObserveContext::from_prepare_context(&mut ctx)?;
6258

6359
Ok(InstanceState::new(
6460
allowed_hosts,

crates/factor-outbound-mysql/src/host.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ impl<C: Client> v2::Host for InstanceState<C> {}
4040
impl<C: Client> v2::HostConnection for InstanceState<C> {
4141
#[instrument(name = "spin_outbound_mysql.open_connection", skip(self), err(level = Level::INFO), fields(otel.kind = "client", db.system = "mysql"))]
4242
async fn open(&mut self, address: String) -> Result<Resource<Connection>, v2::Error> {
43-
if let Some(observe_context) = self.observe_context.as_ref() {
44-
observe_context.reparent_tracing_span()
45-
}
43+
self.observe_context.reparent_tracing_span();
4644

4745
if !self
4846
.is_address_allowed(&address)
@@ -63,9 +61,7 @@ impl<C: Client> v2::HostConnection for InstanceState<C> {
6361
statement: String,
6462
params: Vec<ParameterValue>,
6563
) -> Result<(), v2::Error> {
66-
if let Some(observe_context) = self.observe_context.as_ref() {
67-
observe_context.reparent_tracing_span()
68-
}
64+
self.observe_context.reparent_tracing_span();
6965

7066
Ok(self
7167
.get_client(connection)
@@ -81,9 +77,7 @@ impl<C: Client> v2::HostConnection for InstanceState<C> {
8177
statement: String,
8278
params: Vec<ParameterValue>,
8379
) -> Result<v2_types::RowSet, v2::Error> {
84-
if let Some(observe_context) = self.observe_context.as_ref() {
85-
observe_context.reparent_tracing_span()
86-
}
80+
self.observe_context.reparent_tracing_span();
8781

8882
Ok(self
8983
.get_client(connection)

0 commit comments

Comments
 (0)