diff --git a/examples/worker/.dev.vars.sample b/examples/worker/.dev.vars.sample index 4ad1a3a3..50eff8a6 100644 --- a/examples/worker/.dev.vars.sample +++ b/examples/worker/.dev.vars.sample @@ -1 +1 @@ -otel.headers.x-honeycomb-team= +otel.exporter.headers.x-honeycomb-team= diff --git a/examples/worker/package.json b/examples/worker/package.json index cf0dccf7..9a82b084 100644 --- a/examples/worker/package.json +++ b/examples/worker/package.json @@ -5,7 +5,8 @@ "@cloudflare/workers-types": "^4.20230321.0", "@opentelemetry/api": "^1.4.1", "typescript": "^5.0.2", - "wrangler": "2.13.0" + "wrangler": "2.13.0", + "ws": "^8.16.0" }, "private": true, "scripts": { diff --git a/examples/worker/src/handler.ts b/examples/worker/src/handler.ts index 34ed21f7..1b263f8e 100644 --- a/examples/worker/src/handler.ts +++ b/examples/worker/src/handler.ts @@ -27,12 +27,51 @@ const handleRest = async (request: Request, env: Env, ctx: ExecutionContext): Pr ctx.waitUntil(fetch('https://workers.dev')) return new Response(`${greeting}!`) } +let count = 0 +async function handleSession(websocket: WebSocket, env: Env) { + websocket.accept() + + websocket.addEventListener('message', async ({ data }) => { + if (data === 'CLICK') { + count += 1 + await new Promise((resolve) => setTimeout(resolve, 1000)) + websocket.send(JSON.stringify({ count, tz: new Date() })) + await env.OTEL_TEST.get('non-existant') + // if I add this line, the trace for message is not sent? :( + // websocket.close() + } else { + // An unknown message came into the server. Send back an error message + websocket.send(JSON.stringify({ error: 'Unknown message received', tz: new Date() })) + } + }) + + websocket.addEventListener('close', (evt) => { + // Handle when a client closes the WebSocket connection + }) +} + +async function handleWS(request: Request, env: Env): Promise { + const upgradeHeader = request.headers.get('Upgrade') + if (upgradeHeader !== 'websocket') { + return new Response('Expected websocket', { status: 400 }) + } + + const [client, server] = Object.values(new WebSocketPair()) + await handleSession(server, env) + + return new Response(null, { + status: 101, + webSocket: client, + }) +} export default { async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { const pathname = new URL(request.url).pathname if (pathname === '/do') { return handleDO(request, env) + } else if (pathname === '/ws') { + return handleWS(request, env) } else if (pathname === '/error') { throw new Error('You asked for it!') } else { diff --git a/examples/worker/wrangler.toml b/examples/worker/wrangler.toml index c0f56209..596da624 100644 --- a/examples/worker/wrangler.toml +++ b/examples/worker/wrangler.toml @@ -4,7 +4,8 @@ compatibility_date = "2023-03-27" compatibility_flags = [ "nodejs_compat" ] kv_namespaces = [ - { binding = "OTEL_TEST", id = "f124c9696873443da0a277ddb75000ca", preview_id = "3569aab8617645d9b8ed4bd1d45c8d96" } + # { binding = "OTEL_TEST", id = "f124c9696873443da0a277ddb75000ca", preview_id = "3569aab8617645d9b8ed4bd1d45c8d96" } + { binding = "OTEL_TEST", id = "b47e660f2c1940329adf9a7aa2d93804", preview_id = "9fb9bceb46064170adadcefb7f801810" } ] [durable_objects] diff --git a/examples/worker/ws-test.ts b/examples/worker/ws-test.ts new file mode 100644 index 00000000..af9efee0 --- /dev/null +++ b/examples/worker/ws-test.ts @@ -0,0 +1,13 @@ +import WebSocket from 'ws' + +const ws = new WebSocket('ws://0.0.0.0:8787/ws') + +ws.on('error', console.error) + +ws.on('open', function open() { + ws.send('CLICK') +}) + +ws.on('message', function message(data) { + console.log('received: %s', data) +}) diff --git a/src/config.ts b/src/config.ts index 129c959e..29bff395 100644 --- a/src/config.ts +++ b/src/config.ts @@ -4,12 +4,18 @@ import { ResolvedTraceConfig, Trigger } from './types.js' const configSymbol = Symbol('Otel Workers Tracing Configuration') export type Initialiser = (env: Record, trigger: Trigger) => ResolvedTraceConfig +let fallbackConfig: ResolvedTraceConfig export function setConfig(config: ResolvedTraceConfig, ctx = context.active()) { + // I could not get the context to work properly, so I'm using a global fallback. + // I suspect this is because I am not initialising the config properly in the WS instrumentation. + if (!fallbackConfig) { + fallbackConfig = config + } return ctx.setValue(configSymbol, config) } export function getActiveConfig(): ResolvedTraceConfig { const config = context.active().getValue(configSymbol) as ResolvedTraceConfig - return config + return config || fallbackConfig } diff --git a/src/instrumentation/fetch.ts b/src/instrumentation/fetch.ts index 8f3df4b7..84053ba7 100644 --- a/src/instrumentation/fetch.ts +++ b/src/instrumentation/fetch.ts @@ -178,7 +178,6 @@ export function createFetchHandler(fetchFn: FetchHandler, initialiser: Initialis const env = instrumentEnv(orig_env as Record) const { ctx, tracker } = proxyExecutionContext(orig_ctx) const context = setConfig(config) - try { const args: FetchHandlerArgs = [request, env, ctx] return await api_context.with(context, executeFetchHandler, undefined, target, args) @@ -201,6 +200,9 @@ export function instrumentClientFetch( const handler: ProxyHandler = { apply: (target, thisArg, argArray): ReturnType => { const request = new Request(argArray[0], argArray[1]) + if (request.url === 'https://api.honeycomb.io/v1/traces') { + return Reflect.apply(target, thisArg, argArray) + } if (!request.url.startsWith('http')) { return Reflect.apply(target, thisArg, argArray) } diff --git a/src/instrumentation/ws.ts b/src/instrumentation/ws.ts new file mode 100644 index 00000000..24678883 --- /dev/null +++ b/src/instrumentation/ws.ts @@ -0,0 +1,81 @@ +import { wrap } from '../wrap' +import { SpanKind, Tracer, trace } from '@opentelemetry/api' +import { setConfig } from '../config.js' +import { ResolvedTraceConfig } from '../types' + +function traceWebsocket(type: string, tracer: Tracer) { + return (messageCallback: any, scope: any, argArray: { data: string | ArrayBuffer }[] | undefined[]) => { + const attributes = argArray[0]?.data ? { data: argArray[0].data.toString() } : {} + // it would be nice to add links from the http request that makes the websocket connection but I can't figure out how to do that + const currentSpan = trace.getActiveSpan() + let links = currentSpan ? [{ context: currentSpan.spanContext() }] : [] + return tracer.startActiveSpan( + `websocket ${type}`, + { kind: SpanKind.SERVER, attributes, links, root: true }, + async (span) => { + try { + return await Reflect.apply(messageCallback, scope, argArray) + } catch (e) { + span.recordException(e as Error) + throw e + } finally { + span.end() + } + }, + ) + } +} + +function wrapWSServer(server: WebSocket, config: ResolvedTraceConfig) { + const handler: ProxyHandler = { + get: (target, p) => { + if (p === 'addEventListener') { + const fn = Reflect.get(target, p).bind(target) + return wrap(fn, { + apply: async (target, thisArg, argArray) => { + // WHERE DO I CALL SET CONFIG - I cannot figure out how to make this work, have done a hack in config.ts to compensate + setConfig(config) + const cb = argArray[1] + const wrappedCB = wrap(cb, { + apply: traceWebsocket(argArray[0], trace.getTracer('ws')), + }) + return Reflect.apply(target, thisArg, [argArray[0], wrappedCB, argArray[2]]) + }, + }) + } + const method = Reflect.get(target, p) + if (typeof method === 'function') { + return wrap(method.bind(target), { + apply: async (target, thisArg, argArray) => { + const span = trace.getActiveSpan() + span?.addEvent(p.toString(), { [p]: argArray[0] }) + return Reflect.apply(target, thisArg, argArray) + }, + }) + } + return Reflect.get(target, p) + }, + } + return wrap(server, handler) +} + +function instrumentWSPair(WSPair: typeof self.WebSocketPair, config: ResolvedTraceConfig) { + const handler: ProxyHandler = { + construct: (target) => { + // call class to get access to original clients + const { 0: client, 1: server } = new target() + + class InstrumentedWSPair { + 0 = client + 1 = wrapWSServer(server, config) + } + + return Reflect.construct(InstrumentedWSPair, []) + }, + } + return wrap(self.WebSocketPair, handler) +} + +export function patchWebsocketPair(config: ResolvedTraceConfig) { + self.WebSocketPair = instrumentWSPair(self.WebSocketPair, config) +} diff --git a/src/sdk.ts b/src/sdk.ts index aa3f686b..ce9f1c75 100644 --- a/src/sdk.ts +++ b/src/sdk.ts @@ -29,6 +29,7 @@ import { instrumentGlobalCache } from './instrumentation/cache.js' import { createQueueHandler } from './instrumentation/queue.js' import { DOClass, instrumentDOClass } from './instrumentation/do.js' import { createScheduledHandler } from './instrumentation/scheduled.js' +import { patchWebsocketPair } from './instrumentation/ws.js' type FetchHandler = ExportedHandlerFetchHandler type ScheduledHandler = ExportedHandlerScheduledHandler @@ -76,6 +77,7 @@ function init(config: ResolvedTraceConfig): void { if (!initialised) { instrumentGlobalCache() instrumentGlobalFetch() + patchWebsocketPair(config) propagation.setGlobalPropagator(config.propagator) const resource = createResource(config)