Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/worker/.dev.vars.sample
Original file line number Diff line number Diff line change
@@ -1 +1 @@
otel.headers.x-honeycomb-team=<honeycomb_api_key>
otel.exporter.headers.x-honeycomb-team=<honeycomb_api_key>
3 changes: 2 additions & 1 deletion examples/worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
"@cloudflare/workers-types": "^4.20230321.0",
"@opentelemetry/api": "^1.4.1",
"typescript": "^5.0.2",
"wrangler": "2.13.0"
"wrangler": "2.13.0",
"ws": "^8.16.0"
},
"private": true,
"scripts": {
Expand Down
39 changes: 39 additions & 0 deletions examples/worker/src/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,51 @@ const handleRest = async (request: Request, env: Env, ctx: ExecutionContext): Pr
ctx.waitUntil(fetch('https://workers.dev'))
return new Response(`${greeting}!`)
}
let count = 0
async function handleSession(websocket: WebSocket, env: Env) {
websocket.accept()

websocket.addEventListener('message', async ({ data }) => {
if (data === 'CLICK') {
count += 1
await new Promise((resolve) => setTimeout(resolve, 1000))
websocket.send(JSON.stringify({ count, tz: new Date() }))
await env.OTEL_TEST.get('non-existant')
// if I add this line, the trace for message is not sent? :(
// websocket.close()
} else {
// An unknown message came into the server. Send back an error message
websocket.send(JSON.stringify({ error: 'Unknown message received', tz: new Date() }))
}
})

websocket.addEventListener('close', (evt) => {
// Handle when a client closes the WebSocket connection
})
}

async function handleWS(request: Request, env: Env): Promise<Response> {
const upgradeHeader = request.headers.get('Upgrade')
if (upgradeHeader !== 'websocket') {
return new Response('Expected websocket', { status: 400 })
}

const [client, server] = Object.values(new WebSocketPair())
await handleSession(server, env)

return new Response(null, {
status: 101,
webSocket: client,
})
}

export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
const pathname = new URL(request.url).pathname
if (pathname === '/do') {
return handleDO(request, env)
} else if (pathname === '/ws') {
return handleWS(request, env)
} else if (pathname === '/error') {
throw new Error('You asked for it!')
} else {
Expand Down
3 changes: 2 additions & 1 deletion examples/worker/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ compatibility_date = "2023-03-27"
compatibility_flags = [ "nodejs_compat" ]

kv_namespaces = [
{ binding = "OTEL_TEST", id = "f124c9696873443da0a277ddb75000ca", preview_id = "3569aab8617645d9b8ed4bd1d45c8d96" }
# { binding = "OTEL_TEST", id = "f124c9696873443da0a277ddb75000ca", preview_id = "3569aab8617645d9b8ed4bd1d45c8d96" }
{ binding = "OTEL_TEST", id = "b47e660f2c1940329adf9a7aa2d93804", preview_id = "9fb9bceb46064170adadcefb7f801810" }
]

[durable_objects]
Expand Down
13 changes: 13 additions & 0 deletions examples/worker/ws-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import WebSocket from 'ws'

const ws = new WebSocket('ws://0.0.0.0:8787/ws')

ws.on('error', console.error)

ws.on('open', function open() {
ws.send('CLICK')
})

ws.on('message', function message(data) {
console.log('received: %s', data)
})
8 changes: 7 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import { ResolvedTraceConfig, Trigger } from './types.js'
const configSymbol = Symbol('Otel Workers Tracing Configuration')

export type Initialiser = (env: Record<string, unknown>, trigger: Trigger) => ResolvedTraceConfig
let fallbackConfig: ResolvedTraceConfig

export function setConfig(config: ResolvedTraceConfig, ctx = context.active()) {
// I could not get the context to work properly, so I'm using a global fallback.
// I suspect this is because I am not initialising the config properly in the WS instrumentation.
if (!fallbackConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually a bug in this library and/or with context combined with websockets. I tried instrumenting WS on my own and I get the same thing as here: #14

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey I think it's an async local storage issue. Happy to jump on a call and work on this together if you would like ☺️

fallbackConfig = config
}
return ctx.setValue(configSymbol, config)
}

export function getActiveConfig(): ResolvedTraceConfig {
const config = context.active().getValue(configSymbol) as ResolvedTraceConfig
return config
return config || fallbackConfig
}
4 changes: 3 additions & 1 deletion src/instrumentation/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ export function createFetchHandler(fetchFn: FetchHandler, initialiser: Initialis
const env = instrumentEnv(orig_env as Record<string, unknown>)
const { ctx, tracker } = proxyExecutionContext(orig_ctx)
const context = setConfig(config)

try {
const args: FetchHandlerArgs = [request, env, ctx]
return await api_context.with(context, executeFetchHandler, undefined, target, args)
Expand All @@ -201,6 +200,9 @@ export function instrumentClientFetch(
const handler: ProxyHandler<typeof fetch> = {
apply: (target, thisArg, argArray): ReturnType<typeof fetch> => {
const request = new Request(argArray[0], argArray[1])
if (request.url === 'https://api.honeycomb.io/v1/traces') {
return Reflect.apply(target, thisArg, argArray)
}
if (!request.url.startsWith('http')) {
return Reflect.apply(target, thisArg, argArray)
}
Expand Down
81 changes: 81 additions & 0 deletions src/instrumentation/ws.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { wrap } from '../wrap'
import { SpanKind, Tracer, trace } from '@opentelemetry/api'
import { setConfig } from '../config.js'
import { ResolvedTraceConfig } from '../types'

function traceWebsocket(type: string, tracer: Tracer) {
return (messageCallback: any, scope: any, argArray: { data: string | ArrayBuffer }[] | undefined[]) => {
const attributes = argArray[0]?.data ? { data: argArray[0].data.toString() } : {}
// it would be nice to add links from the http request that makes the websocket connection but I can't figure out how to do that
const currentSpan = trace.getActiveSpan()
let links = currentSpan ? [{ context: currentSpan.spanContext() }] : []
return tracer.startActiveSpan(
`websocket ${type}`,
{ kind: SpanKind.SERVER, attributes, links, root: true },
async (span) => {
try {
return await Reflect.apply(messageCallback, scope, argArray)
} catch (e) {
span.recordException(e as Error)
throw e
} finally {
span.end()
}
},
)
}
}

function wrapWSServer(server: WebSocket, config: ResolvedTraceConfig) {
const handler: ProxyHandler<WebSocket> = {
get: (target, p) => {
if (p === 'addEventListener') {
const fn = Reflect.get(target, p).bind(target)
return wrap(fn, {
apply: async (target, thisArg, argArray) => {
// WHERE DO I CALL SET CONFIG - I cannot figure out how to make this work, have done a hack in config.ts to compensate
setConfig(config)
const cb = argArray[1]
const wrappedCB = wrap(cb, {
apply: traceWebsocket(argArray[0], trace.getTracer('ws')),
})
return Reflect.apply(target, thisArg, [argArray[0], wrappedCB, argArray[2]])
},
})
}
const method = Reflect.get(target, p)
if (typeof method === 'function') {
return wrap(method.bind(target), {
apply: async (target, thisArg, argArray) => {
const span = trace.getActiveSpan()
span?.addEvent(p.toString(), { [p]: argArray[0] })
return Reflect.apply(target, thisArg, argArray)
},
})
}
return Reflect.get(target, p)
},
}
return wrap(server, handler)
}

function instrumentWSPair(WSPair: typeof self.WebSocketPair, config: ResolvedTraceConfig) {
const handler: ProxyHandler<typeof self.WebSocketPair> = {
construct: (target) => {
// call class to get access to original clients
const { 0: client, 1: server } = new target()

class InstrumentedWSPair {
0 = client
1 = wrapWSServer(server, config)
}

return Reflect.construct(InstrumentedWSPair, [])
},
}
return wrap(self.WebSocketPair, handler)
}

export function patchWebsocketPair(config: ResolvedTraceConfig) {
self.WebSocketPair = instrumentWSPair(self.WebSocketPair, config)
}
2 changes: 2 additions & 0 deletions src/sdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { instrumentGlobalCache } from './instrumentation/cache.js'
import { createQueueHandler } from './instrumentation/queue.js'
import { DOClass, instrumentDOClass } from './instrumentation/do.js'
import { createScheduledHandler } from './instrumentation/scheduled.js'
import { patchWebsocketPair } from './instrumentation/ws.js'

type FetchHandler = ExportedHandlerFetchHandler<unknown, unknown>
type ScheduledHandler = ExportedHandlerScheduledHandler<unknown>
Expand Down Expand Up @@ -76,6 +77,7 @@ function init(config: ResolvedTraceConfig): void {
if (!initialised) {
instrumentGlobalCache()
instrumentGlobalFetch()
patchWebsocketPair(config)
propagation.setGlobalPropagator(config.propagator)
const resource = createResource(config)

Expand Down