Skip to content

Commit 919fcac

Browse files
calebschoeppasteurer
authored andcommitted
feat(wasi-otel): A WASI OTel host component
Signed-off-by: Caleb Schoepp <[email protected]> fix: update opentelemetry version Signed-off-by: Andrew Steurer <[email protected]> Fix out of date Cargo.lock from bad rebase Signed-off-by: Caleb Schoepp <[email protected]> feat(factor-otel): adding metrics Signed-off-by: Andrew Steurer <[email protected]> feat(factor-otel): refactoring otel conversions Signed-off-by: Andrew Steurer <[email protected]> fix(telemetry): update BatchLogProcessor to be async Signed-off-by: Andrew Steurer <[email protected]> feat(factor-otel): refactoring WIT and updating conversions Signed-off-by: Andrew Steurer <[email protected]> fix(factor-otel): updating WIT Signed-off-by: Andrew Steurer <[email protected]> feat(factor-otel): updating WIT and conversions Signed-off-by: Andrew Steurer <[email protected]> feat(factor-otel): small refactors Signed-off-by: Andrew Steurer <[email protected]> fix(factor-otel): rebasing wasi-otel branch on main Signed-off-by: Andrew Steurer <[email protected]>
1 parent abc1d54 commit 919fcac

File tree

59 files changed

+4509
-1675
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+4509
-1675
lines changed

Cargo.lock

Lines changed: 1807 additions & 1390 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ openssl = { version = "0.10" }
8484
anyhow = { workspace = true, features = ["backtrace"] }
8585
conformance = { path = "tests/conformance-tests" }
8686
conformance-tests = { workspace = true }
87+
fake-opentelemetry-collector = "0.26"
8788
hex = "0.4"
8889
http-body-util = { workspace = true }
8990
hyper = { workspace = true }
@@ -144,6 +145,10 @@ hyper-util = { version = "0.1", features = ["tokio"] }
144145
indexmap = "2"
145146
itertools = "0.14"
146147
lazy_static = "1.5"
148+
opentelemetry = "0.28"
149+
# The default `reqwest-blocking-client` causes a runtime panic
150+
opentelemetry-otlp = { version = "0.28", default-features = false, features = ["http-proto", "reqwest-client", "logs"]}
151+
opentelemetry_sdk = {version = "0.28", features = ["experimental_metrics_periodicreader_with_async_runtime", "experimental_trace_batch_span_processor_with_async_runtime", "experimental_logs_batch_log_processor_with_async_runtime"]}
147152
path-absolutize = "3"
148153
pin-project-lite = "0.2.16"
149154
quote = "1"
@@ -172,6 +177,7 @@ toml_edit = "0.22"
172177
tower-service = "0.3.3"
173178
tracing = { version = "0.1.41", features = ["log"] }
174179
url = "2.5.7"
180+
tracing-opentelemetry = "0.29"
175181
walkdir = "2"
176182
wasm-encoder = "0.239.0"
177183
wasm-metadata = "0.239.0"

crates/factor-key-value/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ edition = { workspace = true }
88
anyhow = { workspace = true }
99
serde = { workspace = true }
1010
spin-core = { path = "../core" }
11+
spin-factor-otel = { path = "../factor-otel" }
1112
spin-factors = { path = "../factors" }
1213
spin-locked-app = { path = "../locked-app" }
1314
spin-resource-table = { path = "../table" }

crates/factor-key-value/src/host.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use super::{Cas, SwapError};
22
use anyhow::{Context, Result};
33
use spin_core::{async_trait, wasmtime::component::Resource};
4+
use spin_factor_otel::OtelContext;
45
use spin_resource_table::Table;
56
use spin_telemetry::traces::{self, Blame};
67
use spin_world::v2::key_value;
@@ -49,23 +50,26 @@ pub struct KeyValueDispatch {
4950
manager: Arc<dyn StoreManager>,
5051
stores: Table<Arc<dyn Store>>,
5152
compare_and_swaps: Table<Arc<dyn Cas>>,
53+
otel_context: Option<OtelContext>,
5254
}
5355

5456
impl KeyValueDispatch {
5557
pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
56-
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
58+
Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY, None)
5759
}
5860

5961
pub fn new_with_capacity(
6062
allowed_stores: HashSet<String>,
6163
manager: Arc<dyn StoreManager>,
6264
capacity: u32,
65+
otel_context: Option<OtelContext>,
6366
) -> Self {
6467
Self {
6568
allowed_stores,
6669
manager,
6770
stores: Table::new(capacity),
6871
compare_and_swaps: Table::new(capacity),
72+
otel_context,
6973
}
7074
}
7175

@@ -113,6 +117,9 @@ impl key_value::Host for KeyValueDispatch {}
113117
impl key_value::HostStore for KeyValueDispatch {
114118
#[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
115119
async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
120+
if let Some(otel_context) = self.otel_context.as_ref() {
121+
otel_context.reparent_tracing_span()
122+
}
116123
Ok(async {
117124
if self.allowed_stores.contains(&name) {
118125
let store = self.manager.get(&name).await?;
@@ -135,6 +142,9 @@ impl key_value::HostStore for KeyValueDispatch {
135142
store: Resource<key_value::Store>,
136143
key: String,
137144
) -> Result<Result<Option<Vec<u8>>, Error>> {
145+
if let Some(otel_context) = self.otel_context.as_ref() {
146+
otel_context.reparent_tracing_span()
147+
}
138148
let store = self.get_store(store)?;
139149
Ok(store.get(&key).await.map_err(track_error_on_span))
140150
}
@@ -146,6 +156,9 @@ impl key_value::HostStore for KeyValueDispatch {
146156
key: String,
147157
value: Vec<u8>,
148158
) -> Result<Result<(), Error>> {
159+
if let Some(otel_context) = self.otel_context.as_ref() {
160+
otel_context.reparent_tracing_span()
161+
}
149162
let store = self.get_store(store)?;
150163
Ok(store.set(&key, &value).await.map_err(track_error_on_span))
151164
}
@@ -156,6 +169,9 @@ impl key_value::HostStore for KeyValueDispatch {
156169
store: Resource<key_value::Store>,
157170
key: String,
158171
) -> Result<Result<(), Error>> {
172+
if let Some(otel_context) = self.otel_context.as_ref() {
173+
otel_context.reparent_tracing_span()
174+
}
159175
let store = self.get_store(store)?;
160176
Ok(store.delete(&key).await.map_err(track_error_on_span))
161177
}
@@ -166,6 +182,9 @@ impl key_value::HostStore for KeyValueDispatch {
166182
store: Resource<key_value::Store>,
167183
key: String,
168184
) -> Result<Result<bool, Error>> {
185+
if let Some(otel_context) = self.otel_context.as_ref() {
186+
otel_context.reparent_tracing_span()
187+
}
169188
let store = self.get_store(store)?;
170189
Ok(store.exists(&key).await.map_err(track_error_on_span))
171190
}
@@ -175,6 +194,9 @@ impl key_value::HostStore for KeyValueDispatch {
175194
&mut self,
176195
store: Resource<key_value::Store>,
177196
) -> Result<Result<Vec<String>, Error>> {
197+
if let Some(otel_context) = self.otel_context.as_ref() {
198+
otel_context.reparent_tracing_span()
199+
}
178200
let store = self.get_store(store)?;
179201
Ok(store.get_keys().await.map_err(track_error_on_span))
180202
}

crates/factor-key-value/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use anyhow::ensure;
11+
use spin_factor_otel::OtelContext;
1112
use spin_factors::{
1213
ConfigureAppContext, Factor, FactorData, FactorInstanceBuilder, InitContext, PrepareContext,
1314
RuntimeFactors,
@@ -87,17 +88,19 @@ impl Factor for KeyValueFactor {
8788

8889
fn prepare<T: RuntimeFactors>(
8990
&self,
90-
ctx: PrepareContext<T, Self>,
91+
mut ctx: PrepareContext<T, Self>,
9192
) -> anyhow::Result<InstanceBuilder> {
9293
let app_state = ctx.app_state();
9394
let allowed_stores = app_state
9495
.component_allowed_stores
9596
.get(ctx.app_component().id())
9697
.expect("component should be in component_stores")
9798
.clone();
99+
let otel_context = OtelContext::from_prepare_context(&mut ctx)?;
98100
Ok(InstanceBuilder {
99101
store_manager: app_state.store_manager.clone(),
100102
allowed_stores,
103+
otel_context,
101104
})
102105
}
103106
}
@@ -177,6 +180,7 @@ pub struct InstanceBuilder {
177180
store_manager: Arc<AppStoreManager>,
178181
/// The allowed stores for this component instance.
179182
allowed_stores: HashSet<String>,
183+
otel_context: OtelContext,
180184
}
181185

182186
impl FactorInstanceBuilder for InstanceBuilder {
@@ -186,11 +190,13 @@ impl FactorInstanceBuilder for InstanceBuilder {
186190
let Self {
187191
store_manager,
188192
allowed_stores,
193+
otel_context,
189194
} = self;
190195
Ok(KeyValueDispatch::new_with_capacity(
191196
allowed_stores,
192197
store_manager,
193198
u32::MAX,
199+
Some(otel_context),
194200
))
195201
}
196202
}

crates/factor-llm/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ llm-cublas = ["llm", "spin-llm-local/cublas"]
1717
anyhow = { workspace = true }
1818
async-trait = { workspace = true }
1919
serde = { workspace = true }
20+
spin-factor-otel = { path = "../factor-otel" }
2021
spin-factors = { path = "../factors" }
2122
spin-llm-local = { path = "../llm-local", optional = true }
2223
spin-llm-remote-http = { path = "../llm-remote-http" }

crates/factor-llm/src/host.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ impl v2::Host for InstanceState {
1313
prompt: String,
1414
params: Option<v2::InferencingParams>,
1515
) -> Result<v2::InferencingResult, v2::Error> {
16+
self.otel_context.reparent_tracing_span();
17+
1618
if !self.allowed_models.contains(&model) {
1719
return Err(access_denied_error(&model));
1820
}
@@ -40,6 +42,8 @@ impl v2::Host for InstanceState {
4042
model: v1::EmbeddingModel,
4143
data: Vec<String>,
4244
) -> Result<v2::EmbeddingsResult, v2::Error> {
45+
self.otel_context.reparent_tracing_span();
46+
4347
if !self.allowed_models.contains(&model) {
4448
return Err(access_denied_error(&model));
4549
}

crates/factor-llm/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::collections::{HashMap, HashSet};
55
use std::sync::Arc;
66

77
use async_trait::async_trait;
8+
use spin_factor_otel::OtelContext;
89
use spin_factors::{
910
ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
1011
};
@@ -73,7 +74,7 @@ impl Factor for LlmFactor {
7374

7475
fn prepare<T: RuntimeFactors>(
7576
&self,
76-
ctx: PrepareContext<T, Self>,
77+
mut ctx: PrepareContext<T, Self>,
7778
) -> anyhow::Result<Self::InstanceBuilder> {
7879
let allowed_models = ctx
7980
.app_state()
@@ -82,10 +83,12 @@ impl Factor for LlmFactor {
8283
.cloned()
8384
.unwrap_or_default();
8485
let engine = ctx.app_state().engine.clone();
86+
let otel_context = OtelContext::from_prepare_context(&mut ctx)?;
8587

8688
Ok(InstanceState {
8789
engine,
8890
allowed_models,
91+
otel_context,
8992
})
9093
}
9194
}
@@ -100,6 +103,7 @@ pub struct AppState {
100103
pub struct InstanceState {
101104
engine: Arc<Mutex<dyn LlmEngine>>,
102105
pub allowed_models: Arc<HashSet<String>>,
106+
otel_context: OtelContext,
103107
}
104108

105109
/// The runtime configuration for the LLM factor.

crates/factor-otel/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "spin-factor-otel"
3+
version = { workspace = true }
4+
authors = { workspace = true }
5+
edition = { workspace = true }
6+
7+
[dependencies]
8+
anyhow = { workspace = true }
9+
indexmap = "2.2.6"
10+
opentelemetry = { workspace = true }
11+
opentelemetry_sdk = { workspace = true }
12+
opentelemetry-otlp = { workspace = true }
13+
spin-core = { path = "../core" }
14+
spin-factors = { path = "../factors" }
15+
spin-resource-table = { path = "../table" }
16+
spin-telemetry = { path = "../telemetry" }
17+
spin-world = { path = "../world" }
18+
tracing = { workspace = true }
19+
tracing-opentelemetry = { workspace = true }
20+
21+
[dev-dependencies]
22+
toml = "0.5"
23+
24+
[lints]
25+
workspace = true

crates/factor-otel/src/host.rs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
use crate::InstanceState;
2+
use anyhow::anyhow;
3+
use anyhow::Result;
4+
use opentelemetry::trace::TraceContextExt;
5+
use opentelemetry_sdk::error::OTelSdkError;
6+
use opentelemetry_sdk::metrics::exporter::PushMetricExporter;
7+
use opentelemetry_sdk::trace::SpanProcessor;
8+
use spin_world::wasi;
9+
use tracing_opentelemetry::OpenTelemetrySpanExt;
10+
11+
impl wasi::otel::tracing::Host for InstanceState {
12+
async fn on_start(&mut self, context: wasi::otel::tracing::SpanContext) -> Result<()> {
13+
let mut state = self.state.write().unwrap();
14+
15+
// Before we do anything make sure we track the original host span ID for reparenting
16+
if state.original_host_span_id.is_none() {
17+
state.original_host_span_id = Some(
18+
tracing::Span::current()
19+
.context()
20+
.span()
21+
.span_context()
22+
.span_id(),
23+
);
24+
}
25+
26+
// Track the guest spans context in our ordered map
27+
let span_context: opentelemetry::trace::SpanContext = context.into();
28+
state
29+
.guest_span_contexts
30+
.insert(span_context.span_id(), span_context);
31+
32+
Ok(())
33+
}
34+
35+
async fn on_end(&mut self, span_data: wasi::otel::tracing::SpanData) -> Result<()> {
36+
let mut state = self.state.write().unwrap();
37+
38+
let span_context: opentelemetry::trace::SpanContext = span_data.span_context.clone().into();
39+
let span_id: opentelemetry::trace::SpanId = span_context.span_id();
40+
41+
if state.guest_span_contexts.shift_remove(&span_id).is_none() {
42+
Err(anyhow!("Trying to end a span that was not started"))?;
43+
}
44+
45+
self.span_processor.on_end(span_data.into());
46+
47+
Ok(())
48+
}
49+
50+
async fn outer_span_context(&mut self) -> Result<wasi::otel::tracing::SpanContext> {
51+
Ok(tracing::Span::current()
52+
.context()
53+
.span()
54+
.span_context()
55+
.clone()
56+
.into())
57+
}
58+
}
59+
60+
impl wasi::otel::metrics::Host for InstanceState {
61+
async fn export(
62+
&mut self,
63+
metrics: wasi::otel::metrics::ResourceMetrics,
64+
) -> spin_core::wasmtime::Result<std::result::Result<(), wasi::otel::metrics::Error>> {
65+
let mut rm: opentelemetry_sdk::metrics::data::ResourceMetrics = metrics.into();
66+
match self.metric_exporter.export(&mut rm).await {
67+
Ok(_) => Ok(Ok(())),
68+
Err(e) => match e {
69+
OTelSdkError::AlreadyShutdown => {
70+
let msg = "Shutdown has already been invoked";
71+
tracing::error!(msg);
72+
Ok(Err(msg.to_string()))
73+
}
74+
OTelSdkError::InternalFailure(e) => {
75+
let detailed_msg = format!("Internal failure: {}", e);
76+
tracing::error!(detailed_msg);
77+
Ok(Err("Internal failure.".to_string()))
78+
}
79+
OTelSdkError::Timeout(d) => {
80+
let detailed_msg = format!("Operation timed out after {} seconds", d.as_secs());
81+
tracing::error!(detailed_msg);
82+
Ok(Err("Operation timed out.".to_string()))
83+
}
84+
},
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)