Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,4 @@ oracle = { version = "0.6.3", features = ["chrono"] }
rumqttc = { version = "0.24.0", features = ["use-native-tls"]}
strum = { version = "0.27", features = ["derive"] }
strum_macros = "^0"
prost = "0.13.5"
9 changes: 9 additions & 0 deletions backend/migrations/20260107000000_job_otel_traces.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Drop indexes
DROP INDEX IF EXISTS idx_job_otel_traces_job_workspace;
DROP INDEX IF EXISTS idx_job_otel_traces_created_at;
DROP INDEX IF EXISTS idx_job_otel_traces_trace_id;
DROP INDEX IF EXISTS idx_job_otel_traces_workspace_id;
DROP INDEX IF EXISTS idx_job_otel_traces_job_id;

-- Drop table
DROP TABLE IF EXISTS job_otel_traces;
28 changes: 28 additions & 0 deletions backend/migrations/20260107000000_job_otel_traces.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Add table to store OTel traces from auto-instrumented scripts
CREATE TABLE IF NOT EXISTS job_otel_traces (
id BIGSERIAL PRIMARY KEY,
job_id UUID NOT NULL,
workspace_id VARCHAR(50) NOT NULL,
trace_id VARCHAR(32) NOT NULL,
span_id VARCHAR(16) NOT NULL,
parent_span_id VARCHAR(16),
operation_name VARCHAR(255) NOT NULL,
service_name VARCHAR(255),
start_time_unix_nano BIGINT NOT NULL,
end_time_unix_nano BIGINT NOT NULL,
duration_ns BIGINT GENERATED ALWAYS AS (end_time_unix_nano - start_time_unix_nano) STORED,
status_code SMALLINT DEFAULT 0,
status_message TEXT,
attributes JSONB DEFAULT '{}',
events JSONB DEFAULT '[]',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Create indexes for efficient querying
CREATE INDEX IF NOT EXISTS idx_job_otel_traces_job_id ON job_otel_traces(job_id);
CREATE INDEX IF NOT EXISTS idx_job_otel_traces_workspace_id ON job_otel_traces(workspace_id);
CREATE INDEX IF NOT EXISTS idx_job_otel_traces_trace_id ON job_otel_traces(trace_id);
CREATE INDEX IF NOT EXISTS idx_job_otel_traces_created_at ON job_otel_traces(created_at);

-- Composite index for common query patterns
CREATE INDEX IF NOT EXISTS idx_job_otel_traces_job_workspace ON job_otel_traces(job_id, workspace_id);
81 changes: 81 additions & 0 deletions backend/windmill-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ pub fn workspace_unauthed_service() -> Router {
get(get_completed_job_logs_tail),
)
.route("/get_args/:id", get(get_args))
.route("/get_otel_traces/:id", get(get_job_otel_traces))
.route("/queue/get_started_at_by_ids", post(get_started_at_by_ids))
.route("/get_flow_debug_info/:id", get(get_flow_job_debug_info))
.route("/completed/get/:id", get(get_completed_job))
Expand Down Expand Up @@ -1694,6 +1695,86 @@ async fn get_args(
}
}

/// OTel trace span returned from the API
#[derive(Debug, Serialize, Deserialize)]
pub struct OtelTraceSpan {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
pub operation_name: String,
pub service_name: Option<String>,
pub start_time_unix_nano: i64,
pub end_time_unix_nano: i64,
pub duration_ns: i64,
pub status_code: i16,
pub status_message: Option<String>,
pub attributes: serde_json::Value,
pub events: serde_json::Value,
}

async fn get_job_otel_traces(
OptAuthed(opt_authed): OptAuthed,
Extension(db): Extension<DB>,
Path((w_id, id)): Path<(String, Uuid)>,
) -> JsonResult<Vec<OtelTraceSpan>> {
// Check if user has access to view job (similar to get_args)
// Use raw SQL query since this is a simple check
let job_record: Option<(String,)> = sqlx::query_as(
"SELECT created_by FROM v2_job WHERE id = $1 AND workspace_id = $2",
)
.bind(id)
.bind(&w_id)
.fetch_optional(&db)
.await?;

if let Some(record) = job_record {
if opt_authed.is_none() && record.0 != "anonymous" {
return Err(Error::BadRequest(
"As a non logged in user, you can only see jobs ran by anonymous users".to_string(),
));
}
} else {
return Err(Error::NotFound(format!("Job {} not found", id)));
}

// Fetch OTel traces for this job using raw SQL (table is new, not in sqlx cache)
let traces: Vec<(String, String, Option<String>, String, Option<String>, i64, i64, Option<i64>, Option<i16>, Option<String>, Option<serde_json::Value>, Option<serde_json::Value>)> = sqlx::query_as(
r#"
SELECT
trace_id, span_id, parent_span_id, operation_name, service_name,
start_time_unix_nano, end_time_unix_nano, duration_ns,
status_code, status_message, attributes, events
FROM job_otel_traces
WHERE job_id = $1 AND workspace_id = $2
ORDER BY start_time_unix_nano ASC
"#,
)
.bind(id)
.bind(&w_id)
.fetch_all(&db)
.await?;

let spans: Vec<OtelTraceSpan> = traces
.into_iter()
.map(|row| OtelTraceSpan {
trace_id: row.0,
span_id: row.1,
parent_span_id: row.2,
operation_name: row.3,
service_name: row.4,
start_time_unix_nano: row.5,
end_time_unix_nano: row.6,
duration_ns: row.7.unwrap_or(0),
status_code: row.8.unwrap_or(0) as i16,
status_message: row.9,
attributes: row.10.unwrap_or(serde_json::json!({})),
events: row.11.unwrap_or(serde_json::json!([])),
})
.collect();

Ok(Json(spans))
}

async fn get_started_at_by_ids(
Extension(db): Extension<DB>,
Json(mut ids): Json<Vec<Uuid>>,
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-common/src/global_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub const DEV_INSTANCE_SETTING: &str = "dev_instance";
pub const JWT_SECRET_SETTING: &str = "jwt_secret";
pub const EMAIL_DOMAIN_SETTING: &str = "email_domain";
pub const OTEL_SETTING: &str = "otel";
pub const OTEL_AUTO_INSTRUMENTATION_SETTING: &str = "otel_auto_instrumentation";
pub const APP_WORKSPACED_ROUTE_SETTING: &str = "app_workspaced_route";

pub const ENV_SETTINGS: &[&str] = &[
Expand Down
4 changes: 3 additions & 1 deletion backend/windmill-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ path = "src/lib.rs"
default = []
private = []
prometheus = ["dep:prometheus", "windmill-common/prometheus"]
enterprise = ["windmill-queue/enterprise", "windmill-git-sync/enterprise", "windmill-common/enterprise", "dep:pem", "dep:tokio-util"]
enterprise = ["windmill-queue/enterprise", "windmill-git-sync/enterprise", "windmill-common/enterprise", "dep:pem", "dep:tokio-util", "dep:prost", "dep:axum"]
mssql = ["dep:tiberius"]
bigquery = ["dep:gcp_auth"]
benchmark = ["windmill-queue/benchmark", "windmill-common/benchmark"]
Expand Down Expand Up @@ -138,6 +138,8 @@ libloading = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
bollard = { workspace = true, optional = true }
oracle = { workspace = true, optional = true }
prost = { workspace = true, optional = true }
axum = { workspace = true, optional = true }

[build-dependencies]
deno_fetch = { workspace = true, optional = true }
Expand Down
191 changes: 191 additions & 0 deletions backend/windmill-worker/otel_bun_loader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// OpenTelemetry Auto-Instrumentation Loader for Bun/Node.js
// This file is loaded via -r flag when WINDMILL_OTEL_AUTO_INSTRUMENTATION=true
// It wraps the global fetch to automatically create OTel spans

const OTEL_ENDPOINT = process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT;
const JOB_ID = process.env.WINDMILL_JOB_ID;
const WORKSPACE_ID = process.env.WINDMILL_WORKSPACE_ID;
const SERVICE_NAME = process.env.OTEL_SERVICE_NAME || 'windmill-script';

if (!OTEL_ENDPOINT || !JOB_ID || !WORKSPACE_ID) {
// Skip instrumentation if env vars not set
console.log('[OTel] Missing environment variables, skipping instrumentation');
} else {
console.log(`[OTel] Auto-instrumentation enabled, sending traces to ${OTEL_ENDPOINT}`);

// Simple trace context generator
function generateTraceId() {
const bytes = new Uint8Array(16);
crypto.getRandomValues(bytes);
return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('');
}

function generateSpanId() {
const bytes = new Uint8Array(8);
crypto.getRandomValues(bytes);
return Array.from(bytes).map(b => b.toString(16).padStart(2, '0')).join('');
}

// Store for pending spans
const pendingSpans = [];
let flushTimeout = null;

// Encode spans to OTLP protobuf format (simplified JSON export for now)
async function flushSpans() {
if (pendingSpans.length === 0) return;

const spansToSend = pendingSpans.splice(0, pendingSpans.length);

// Build OTLP JSON format (will be converted to protobuf by collector or use JSON endpoint)
const jsonEndpoint = OTEL_ENDPOINT.replace('/v1/traces', '/v1/traces');

const exportRequest = {
resourceSpans: [{
resource: {
attributes: [
{ key: 'service.name', value: { stringValue: SERVICE_NAME } },
{ key: 'windmill.job_id', value: { stringValue: JOB_ID } },
{ key: 'windmill.workspace_id', value: { stringValue: WORKSPACE_ID } }
]
},
scopeSpans: [{
scope: {
name: 'windmill-otel-bun-loader',
version: '1.0.0'
},
spans: spansToSend
}]
}]
};

try {
// Use the original fetch to avoid recursion
await originalFetch(jsonEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(exportRequest)
});
} catch (e) {
// Silently ignore export errors to not affect the script
console.error('[OTel] Failed to export spans:', e.message);
}
}

function scheduleFlush() {
if (flushTimeout) return;
flushTimeout = setTimeout(() => {
flushTimeout = null;
flushSpans();
}, 100);
}

// Wrap the global fetch
const originalFetch = globalThis.fetch;

globalThis.fetch = async function instrumentedFetch(input, init) {
const url = typeof input === 'string' ? input : input.url;
const method = init?.method || (typeof input === 'object' ? input.method : 'GET') || 'GET';

// Don't instrument calls to the OTel endpoint to avoid recursion
if (url.includes(OTEL_ENDPOINT) || url.includes('/v1/traces')) {
return originalFetch(input, init);
}

const traceId = generateTraceId();
const spanId = generateSpanId();
const startTimeNano = BigInt(Date.now()) * 1000000n;

let statusCode = 0; // UNSET
let statusMessage = '';
let responseStatus = 0;
let error = null;

try {
const response = await originalFetch(input, init);
responseStatus = response.status;

if (response.ok) {
statusCode = 1; // OK
} else {
statusCode = 2; // ERROR
statusMessage = `HTTP ${response.status}`;
}

return response;
} catch (e) {
statusCode = 2; // ERROR
statusMessage = e.message;
error = e;
throw e;
} finally {
const endTimeNano = BigInt(Date.now()) * 1000000n;

// Parse URL for attributes
let parsedUrl;
try {
parsedUrl = new URL(url);
} catch {
parsedUrl = { hostname: '', pathname: url, protocol: '' };
}

const span = {
traceId: hexToBase64(traceId),
spanId: hexToBase64(spanId),
name: `HTTP ${method}`,
kind: 3, // CLIENT
startTimeUnixNano: startTimeNano.toString(),
endTimeUnixNano: endTimeNano.toString(),
attributes: [
{ key: 'http.method', value: { stringValue: method } },
{ key: 'http.url', value: { stringValue: url } },
{ key: 'http.host', value: { stringValue: parsedUrl.hostname } },
{ key: 'http.target', value: { stringValue: parsedUrl.pathname } },
{ key: 'http.scheme', value: { stringValue: parsedUrl.protocol?.replace(':', '') || 'https' } }
],
status: {
code: statusCode,
message: statusMessage
}
};

if (responseStatus) {
span.attributes.push({ key: 'http.status_code', value: { intValue: responseStatus.toString() } });
}

if (error) {
span.events = [{
timeUnixNano: endTimeNano.toString(),
name: 'exception',
attributes: [
{ key: 'exception.message', value: { stringValue: error.message } },
{ key: 'exception.type', value: { stringValue: error.name } }
]
}];
}

pendingSpans.push(span);
scheduleFlush();
}
};

// Helper to convert hex to base64 for OTLP JSON format
function hexToBase64(hex) {
const bytes = new Uint8Array(hex.match(/.{1,2}/g).map(byte => parseInt(byte, 16)));
return btoa(String.fromCharCode(...bytes));
}

// Ensure spans are flushed before process exits
process.on('beforeExit', async () => {
if (flushTimeout) {
clearTimeout(flushTimeout);
flushTimeout = null;
}
await flushSpans();
});

process.on('exit', () => {
// Synchronous - can't do async here, but beforeExit should have handled it
});
}
Loading
Loading