Skip to content

Commit 2ee0064

Browse files
committed
feat(wasi-observe): A WASI Observe host component
Signed-off-by: Caleb Schoepp <[email protected]>
1 parent 7417676 commit 2ee0064

File tree

54 files changed

+2319
-64
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2319
-64
lines changed

Cargo.lock

Lines changed: 284 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ openssl = { version = "0.10" }
8181
anyhow = { workspace = true, features = ["backtrace"] }
8282
conformance = { path = "tests/conformance-tests" }
8383
conformance-tests = { workspace = true }
84+
fake-opentelemetry-collector = "0.21.1"
8485
hex = "0.4"
8586
http-body-util = { workspace = true }
8687
hyper = { workspace = true }

crates/factor-key-value/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = { workspace = true }
88
anyhow = { workspace = true }
99
serde = { workspace = true }
1010
spin-core = { path = "../core" }
11+
spin-factor-otel = { path = "../factor-otel" }
1112
spin-factors = { path = "../factors" }
1213
spin-locked-app = { path = "../locked-app" }
1314
spin-resource-table = { path = "../table" }

crates/factor-key-value/src/host.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::{Cas, SwapError};
22
use anyhow::{Context, Result};
33
use spin_core::{async_trait, wasmtime::component::Resource};
4+
use spin_factor_otel::OtelContext;
45
use spin_resource_table::Table;
56
use spin_telemetry::traces::{self, Blame};
67
use spin_world::v2::key_value;
@@ -49,23 +50,26 @@ pub struct KeyValueDispatch {
4950
manager: Arc<dyn StoreManager>,
5051
stores: Table<Arc<dyn Store>>,
5152
compare_and_swaps: Table<Arc<dyn Cas>>,
53+
otel_context: Option<OtelContext>,
5254
}
5355

5456
impl KeyValueDispatch {
5557
pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
56-
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
58+
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY, None)
5759
}
5860

5961
pub fn new_with_capacity(
6062
allowed_stores: HashSet<String>,
6163
manager: Arc<dyn StoreManager>,
6264
capacity: u32,
65+
otel_context: Option<OtelContext>,
6366
) -> Self {
6467
Self {
6568
allowed_stores,
6669
manager,
6770
stores: Table::new(capacity),
6871
compare_and_swaps: Table::new(capacity),
72+
otel_context,
6973
}
7074
}
7175

@@ -113,6 +117,9 @@ impl key_value::Host for KeyValueDispatch {}
113117
impl key_value::HostStore for KeyValueDispatch {
114118
#[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
115119
async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
120+
if let Some(otel_context) = self.otel_context.as_ref() {
121+
otel_context.reparent_tracing_span()
122+
}
116123
Ok(async {
117124
if self.allowed_stores.contains(&name) {
118125
let store = self.manager.get(&name).await?;
@@ -135,6 +142,9 @@ impl key_value::HostStore for KeyValueDispatch {
135142
store: Resource<key_value::Store>,
136143
key: String,
137144
) -> Result<Result<Option<Vec<u8>>, Error>> {
145+
if let Some(otel_context) = self.otel_context.as_ref() {
146+
otel_context.reparent_tracing_span()
147+
}
138148
let store = self.get_store(store)?;
139149
Ok(store.get(&key).await.map_err(track_error_on_span))
140150
}
@@ -146,6 +156,9 @@ impl key_value::HostStore for KeyValueDispatch {
146156
key: String,
147157
value: Vec<u8>,
148158
) -> Result<Result<(), Error>> {
159+
if let Some(otel_context) = self.otel_context.as_ref() {
160+
otel_context.reparent_tracing_span()
161+
}
149162
let store = self.get_store(store)?;
150163
Ok(store.set(&key, &value).await.map_err(track_error_on_span))
151164
}
@@ -156,6 +169,9 @@ impl key_value::HostStore for KeyValueDispatch {
156169
store: Resource<key_value::Store>,
157170
key: String,
158171
) -> Result<Result<(), Error>> {
172+
if let Some(otel_context) = self.otel_context.as_ref() {
173+
otel_context.reparent_tracing_span()
174+
}
159175
let store = self.get_store(store)?;
160176
Ok(store.delete(&key).await.map_err(track_error_on_span))
161177
}
@@ -166,6 +182,9 @@ impl key_value::HostStore for KeyValueDispatch {
166182
store: Resource<key_value::Store>,
167183
key: String,
168184
) -> Result<Result<bool, Error>> {
185+
if let Some(otel_context) = self.otel_context.as_ref() {
186+
otel_context.reparent_tracing_span()
187+
}
169188
let store = self.get_store(store)?;
170189
Ok(store.exists(&key).await.map_err(track_error_on_span))
171190
}
@@ -175,6 +194,9 @@ impl key_value::HostStore for KeyValueDispatch {
175194
&mut self,
176195
store: Resource<key_value::Store>,
177196
) -> Result<Result<Vec<String>, Error>> {
197+
if let Some(otel_context) = self.otel_context.as_ref() {
198+
otel_context.reparent_tracing_span()
199+
}
178200
let store = self.get_store(store)?;
179201
Ok(store.get_keys().await.map_err(track_error_on_span))
180202
}

crates/factor-key-value/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use anyhow::ensure;
11+
use spin_factor_otel::OtelContext;
1112
use spin_factors::{
1213
ConfigureAppContext, Factor, FactorData, FactorInstanceBuilder, InitContext, PrepareContext,
1314
RuntimeFactors,
@@ -87,17 +88,19 @@ impl Factor for KeyValueFactor {
8788

8889
fn prepare<T: RuntimeFactors>(
8990
&self,
90-
ctx: PrepareContext<T, Self>,
91+
mut ctx: PrepareContext<T, Self>,
9192
) -> anyhow::Result<InstanceBuilder> {
9293
let app_state = ctx.app_state();
9394
let allowed_stores = app_state
9495
.component_allowed_stores
9596
.get(ctx.app_component().id())
9697
.expect("component should be in component_stores")
9798
.clone();
99+
let otel_context = OtelContext::from_prepare_context(&mut ctx)?;
98100
Ok(InstanceBuilder {
99101
store_manager: app_state.store_manager.clone(),
100102
allowed_stores,
103+
otel_context,
101104
})
102105
}
103106
}
@@ -177,6 +180,7 @@ pub struct InstanceBuilder {
177180
store_manager: Arc<AppStoreManager>,
178181
/// The allowed stores for this component instance.
179182
allowed_stores: HashSet<String>,
183+
otel_context: OtelContext,
180184
}
181185

182186
impl FactorInstanceBuilder for InstanceBuilder {
@@ -186,11 +190,13 @@ impl FactorInstanceBuilder for InstanceBuilder {
186190
let Self {
187191
store_manager,
188192
allowed_stores,
193+
otel_context,
189194
} = self;
190195
Ok(KeyValueDispatch::new_with_capacity(
191196
allowed_stores,
192197
store_manager,
193198
u32::MAX,
199+
Some(otel_context),
194200
))
195201
}
196202
}

crates/factor-llm/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ llm-cublas = ["llm", "spin-llm-local/cublas"]
1717
anyhow = { workspace = true }
1818
async-trait = { workspace = true }
1919
serde = { workspace = true }
20+
spin-factor-otel = { path = "../factor-otel" }
2021
spin-factors = { path = "../factors" }
2122
spin-llm-local = { path = "../llm-local", optional = true }
2223
spin-llm-remote-http = { path = "../llm-remote-http" }

crates/factor-llm/src/host.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ impl v2::Host for InstanceState {
1313
prompt: String,
1414
params: Option<v2::InferencingParams>,
1515
) -> Result<v2::InferencingResult, v2::Error> {
16+
self.otel_context.reparent_tracing_span();
17+
1618
if !self.allowed_models.contains(&model) {
1719
return Err(access_denied_error(&model));
1820
}
@@ -40,6 +42,8 @@ impl v2::Host for InstanceState {
4042
model: v1::EmbeddingModel,
4143
data: Vec<String>,
4244
) -> Result<v2::EmbeddingsResult, v2::Error> {
45+
self.otel_context.reparent_tracing_span();
46+
4347
if !self.allowed_models.contains(&model) {
4448
return Err(access_denied_error(&model));
4549
}

crates/factor-llm/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::collections::{HashMap, HashSet};
55
use std::sync::Arc;
66

77
use async_trait::async_trait;
8+
use spin_factor_otel::OtelContext;
89
use spin_factors::{
910
ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
1011
};
@@ -73,7 +74,7 @@ impl Factor for LlmFactor {
7374

7475
fn prepare<T: RuntimeFactors>(
7576
&self,
76-
ctx: PrepareContext<T, Self>,
77+
mut ctx: PrepareContext<T, Self>,
7778
) -> anyhow::Result<Self::InstanceBuilder> {
7879
let allowed_models = ctx
7980
.app_state()
@@ -82,10 +83,12 @@ impl Factor for LlmFactor {
8283
.cloned()
8384
.unwrap_or_default();
8485
let engine = ctx.app_state().engine.clone();
86+
let otel_context = OtelContext::from_prepare_context(&mut ctx)?;
8587

8688
Ok(InstanceState {
8789
engine,
8890
allowed_models,
91+
otel_context,
8992
})
9093
}
9194
}
@@ -100,6 +103,7 @@ pub struct AppState {
100103
pub struct InstanceState {
101104
engine: Arc<Mutex<dyn LlmEngine>>,
102105
pub allowed_models: Arc<HashSet<String>>,
106+
otel_context: OtelContext,
103107
}
104108

105109
/// The runtime configuration for the LLM factor.

crates/factor-otel/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "spin-factor-otel"
3+
version = { workspace = true }
4+
authors = { workspace = true }
5+
edition = { workspace = true }
6+
7+
[dependencies]
8+
anyhow = { workspace = true }
9+
indexmap = "2.2.6"
10+
opentelemetry = { workspace = true }
11+
opentelemetry_sdk = { workspace = true }
12+
opentelemetry-otlp = { version = "0.27", features = ["http-proto", "http", "reqwest-client"] }
13+
spin-core = { path = "../core" }
14+
spin-factors = { path = "../factors" }
15+
spin-resource-table = { path = "../table" }
16+
spin-telemetry = { path = "../telemetry" }
17+
spin-world = { path = "../world" }
18+
tracing = { workspace = true }
19+
tracing-opentelemetry = { workspace = true }
20+
21+
[dev-dependencies]
22+
toml = "0.5"
23+
24+
[lints]
25+
workspace = true

crates/factor-otel/src/host.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use anyhow::anyhow;
2+
use anyhow::Result;
3+
use opentelemetry::trace::TraceContextExt;
4+
use opentelemetry_sdk::trace::SpanProcessor;
5+
use spin_world::wasi::otel::tracing as wasi_otel;
6+
7+
use tracing_opentelemetry::OpenTelemetrySpanExt;
8+
9+
use crate::InstanceState;
10+
11+
impl wasi_otel::Host for InstanceState {
12+
async fn on_start(&mut self, context: wasi_otel::SpanContext) -> Result<()> {
13+
let mut state = self.state.write().unwrap();
14+
15+
// Before we do anything make sure we track the original host span ID for reparenting
16+
if state.original_host_span_id.is_none() {
17+
state.original_host_span_id = Some(
18+
tracing::Span::current()
19+
.context()
20+
.span()
21+
.span_context()
22+
.span_id(),
23+
);
24+
}
25+
26+
// Track the guest spans context in our ordered map
27+
let span_context: opentelemetry::trace::SpanContext = context.into();
28+
state
29+
.guest_span_contexts
30+
.insert(span_context.span_id(), span_context);
31+
32+
Ok(())
33+
}
34+
35+
async fn on_end(&mut self, span_data: wasi_otel::SpanData) -> Result<()> {
36+
let mut state = self.state.write().unwrap();
37+
38+
let span_context: opentelemetry::trace::SpanContext = span_data.span_context.clone().into();
39+
let span_id: opentelemetry::trace::SpanId = span_context.span_id();
40+
41+
if state.guest_span_contexts.shift_remove(&span_id).is_none() {
42+
Err(anyhow!("Trying to end a span that was not started"))?;
43+
}
44+
45+
self.processor.on_end(span_data.into());
46+
47+
Ok(())
48+
}
49+
50+
async fn outer_span_context(&mut self) -> Result<wasi_otel::SpanContext> {
51+
Ok(tracing::Span::current()
52+
.context()
53+
.span()
54+
.span_context()
55+
.clone()
56+
.into())
57+
}
58+
}

0 commit comments

Comments
 (0)