Skip to content

Commit d1a110b

Browse files
committed
Implement WFP instrumentation
1 parent 988bd77 commit d1a110b

File tree

3 files changed

+90
-1
lines changed

3 files changed

+90
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,4 +296,4 @@ Bindings:
296296
- [ ] mTLS
297297
- [ ] Vectorize
298298
- [ ] Hyperdrive
299-
- [ ] Workers for Platforms Dispatch
299+
- [x] Workers for Platforms Dispatch
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { Attributes, SpanKind, SpanOptions, trace } from '@opentelemetry/api'
2+
import { SemanticAttributes } from '@opentelemetry/semantic-conventions'
3+
import { passthroughGet, wrap } from '../wrap.js'
4+
import { instrumentClientFetch } from './fetch.js'
5+
6+
type ExtraAttributeFn = (argArray: any[], result: any) => Attributes
7+
8+
const WFPAttributes: Record<string | symbol, ExtraAttributeFn> = {
9+
get(argArray) {
10+
const attrs: Attributes = {}
11+
const name = argArray[0]
12+
const opts = argArray[2]
13+
attrs['wfp.script_name'] = name
14+
if (typeof opts === 'object') {
15+
const limits = opts.limits
16+
if (typeof limits === 'object') {
17+
const { cpuMs, subRequests } = limits
18+
if (typeof cpuMs === 'number') {
19+
attrs['wfp.limits.cpuMs'] = cpuMs
20+
}
21+
if (typeof subRequests === 'number') {
22+
attrs['wfp.limits.subRequests'] = subRequests
23+
}
24+
}
25+
}
26+
return attrs
27+
}
28+
}
29+
30+
function instrumentWFPFn(fn: Function, name: string, operation: string) {
31+
const tracer = trace.getTracer('WorkersForPlatforms')
32+
const fnHandler: ProxyHandler<any> = {
33+
apply: (target, thisArg, argArray) => {
34+
const attributes = {
35+
binding_type: 'WorkersForPlatforms',
36+
[SemanticAttributes.CODE_NAMESPACE]: name
37+
}
38+
const options: SpanOptions = {
39+
kind: SpanKind.INTERNAL,
40+
attributes,
41+
}
42+
return tracer.startActiveSpan(`${name} ${operation}`, options, async (span) => {
43+
const result: Fetcher = await Reflect.apply(target, thisArg, argArray)
44+
const extraAttrs = WFPAttributes[operation] ? WFPAttributes[operation](argArray, result) : {}
45+
span.setAttributes(extraAttrs)
46+
span.end()
47+
return instrumentUserWorkerFetcher(result, name, argArray[0])
48+
})
49+
},
50+
}
51+
return wrap(fn, fnHandler)
52+
}
53+
54+
export function instrumentDispatchNamespace(dataset: DispatchNamespace, name: string): DispatchNamespace {
55+
const datasetHandler: ProxyHandler<DispatchNamespace> = {
56+
get: (target, prop, receiver) => {
57+
const operation = String(prop)
58+
const fn = Reflect.get(target, prop, receiver)
59+
return instrumentWFPFn(fn, name, operation)
60+
},
61+
}
62+
return wrap(dataset, datasetHandler)
63+
}
64+
65+
export function instrumentUserWorkerFetcher(fetcher: Fetcher, dispatch_namespace: string, worker_name: string): Fetcher {
66+
const fetcherHandler: ProxyHandler<Fetcher> = {
67+
get(target, prop) {
68+
if (prop === 'fetch') {
69+
const fetcher = Reflect.get(target, prop)
70+
const attrs = {
71+
dispatch_namespace,
72+
worker_name
73+
}
74+
return instrumentClientFetch(fetcher, () => ({ includeTraceContext: false }), attrs)
75+
} else {
76+
return passthroughGet(target, prop)
77+
}
78+
},
79+
}
80+
return wrap(fetcher, fetcherHandler)
81+
}

src/instrumentation/env.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { instrumentKV } from './kv.js'
44
import { instrumentQueueSender } from './queue.js'
55
import { instrumentServiceBinding } from './service.js'
66
import { instrumentAnalyticsEngineDataset } from './analytics-engine'
7+
import { instrumentDispatchNamespace } from './dispatch-namespace'
78

89
const isKVNamespace = (item?: unknown): item is KVNamespace => {
910
return !!(item as KVNamespace)?.getWithMetadata
@@ -26,6 +27,11 @@ const isAnalyticsEngineDataset = (item?: unknown): item is AnalyticsEngineDatase
2627
return !!(item as AnalyticsEngineDataset)?.writeDataPoint
2728
}
2829

30+
const isDispatchNamespace = (item?: unknown): item is DispatchNamespace => {
31+
// KV Namespaces and R2 buckets also have .get, but also .put
32+
return !!(item as DispatchNamespace)?.get && !(item as KVNamespace & R2Bucket & DurableObjectState & DurableObjectNamespace)?.put
33+
}
34+
2935
const instrumentEnv = (env: Record<string, unknown>): Record<string, unknown> => {
3036
const envHandler: ProxyHandler<Record<string, unknown>> = {
3137
get: (target, prop, receiver) => {
@@ -43,6 +49,8 @@ const instrumentEnv = (env: Record<string, unknown>): Record<string, unknown> =>
4349
return instrumentServiceBinding(item, String(prop))
4450
} else if (isAnalyticsEngineDataset(item)) {
4551
return instrumentAnalyticsEngineDataset(item, String(prop))
52+
} else if (isDispatchNamespace(item)) {
53+
return instrumentDispatchNamespace(item, String(prop))
4654
} else {
4755
return item
4856
}

0 commit comments

Comments
 (0)