Skip to content

Commit d4cffde

Browse files
committed
[o11y] Consolidate STW instrumentation test tail workers
1 parent 3755984 commit d4cffde

File tree

8 files changed

+108
-167
lines changed

8 files changed

+108
-167
lines changed

src/workerd/api/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ wd_test(
434434
data = [
435435
"kv-instrumentation-test.js",
436436
"kv-test.js",
437+
"tests/instrumentation-tail-worker.js",
437438
],
438439
)
439440

@@ -449,6 +450,7 @@ wd_test(
449450
data = [
450451
"r2-instrumentation-test.js",
451452
"r2-test.js",
453+
"tests/instrumentation-tail-worker.js",
452454
],
453455
)
454456

@@ -892,5 +894,6 @@ wd_test(
892894
data = [
893895
"actor-kv-test.js",
894896
"actor-kv-test-tail.js",
897+
"tests/instrumentation-tail-worker.js",
895898
],
896899
)
Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import * as assert from 'node:assert';
2+
import {
3+
invocationPromises,
4+
spans,
5+
testTailHandler,
6+
} from 'test:instumentation-tail';
27

3-
let invocationPromises = [];
4-
let spans = new Map();
8+
// Use shared instrumentation test tail worker
9+
export default testTailHandler;
510

6-
export default {
11+
export const test = {
712
async test(ctrl, env, ctx) {
813
const expected = [
9-
{
10-
name: 'durable_object_storage_put',
11-
closed: true,
12-
subrequests: [{ name: 'durable_object_subrequest' }],
13-
},
14+
{ name: 'durable_object_storage_put', closed: true },
1415
{ name: 'durable_object_storage_put', closed: true },
1516
{ name: 'durable_object_storage_get', closed: true },
1617
{ name: 'durable_object_storage_get', closed: true },
@@ -22,6 +23,7 @@ export default {
2223
{ name: 'durable_object_storage_deleteAlarm', closed: true },
2324
{ name: 'durable_object_storage_transaction', closed: true },
2425
{ name: 'durable_object_storage_sync', closed: true },
26+
{ name: 'durable_object_subrequest', closed: true },
2527
];
2628

2729
await Promise.allSettled(invocationPromises);
@@ -31,49 +33,4 @@ export default {
3133
assert.deepStrictEqual(received, expected);
3234
return new Response('');
3335
},
34-
35-
tailStream(event, env, ctx) {
36-
let resolveFn;
37-
38-
invocationPromises.push(
39-
new Promise((resolve, reject) => {
40-
resolveFn = resolve;
41-
})
42-
);
43-
44-
return (event) => {
45-
switch (event.event.type) {
46-
case 'spanOpen':
47-
if (event.event.name === 'durable_object_subrequest') {
48-
let span = spans.get(event.event.spanId);
49-
span['subrequests'] = span['subrequests']
50-
? (span['subrequests'].push({ name: { ...event.name } }),
51-
span['subrequests'])
52-
: [{ name: event.event.name }];
53-
} else {
54-
// The span ids will change between tests, but Map preserves insertion order
55-
spans.set(event.event.spanId, { name: event.event.name });
56-
}
57-
break;
58-
case 'attributes': {
59-
let span = spans.get(event.event.spanId);
60-
for (let { name, value } of event.event.info) {
61-
span[name] = value;
62-
}
63-
spans.set(event.event.spanId, span);
64-
break;
65-
}
66-
case 'spanClose': {
67-
const spanId = event.spanContext.spanId;
68-
let span = spans.get(spanId);
69-
span['closed'] = true;
70-
spans.set(spanId, span);
71-
break;
72-
}
73-
case 'outcome':
74-
resolveFn();
75-
break;
76-
}
77-
};
78-
},
7936
};

src/workerd/api/actor-kv-test.wd-test

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ const config :Workerd.Config = (
66
(name = "TEST_TMPDIR", disk = (writable = true)),
77
(name = "tail", worker = .tailWorker),
88
],
9+
extensions = [ (
10+
modules = [
11+
( name = "test:instumentation-tail", esModule = embed "tests/instrumentation-tail-worker.js" ),
12+
]
13+
) ]
914
);
1015

1116
const mainWorker :Workerd.Worker = (

src/workerd/api/kv-instrumentation-test.js

Lines changed: 7 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,73 +2,14 @@
22
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
33
// https://opensource.org/licenses/Apache-2.0
44
import * as assert from 'node:assert';
5+
import {
6+
invocationPromises,
7+
spans,
8+
testTailHandler,
9+
} from 'test:instumentation-tail';
510

6-
// tailStream is going to be invoked multiple times, but we want to wait
7-
// to run the test until all executions are done. Collect promises for
8-
// each
9-
let invocationPromises = [];
10-
let spans = new Map();
11-
12-
export default {
13-
tailStream(event, env, ctx) {
14-
// For each "onset" event, store a promise which we will resolve when
15-
// we receive the equivalent "outcome" event
16-
let resolveFn;
17-
invocationPromises.push(
18-
new Promise((resolve, reject) => {
19-
resolveFn = resolve;
20-
})
21-
);
22-
23-
// Capture the top-level span ID from the onset event
24-
const topLevelSpanId = event.event.spanId;
25-
26-
// Accumulate the span info for easier testing
27-
return (event) => {
28-
// For spanOpen events, the new span ID is in event.event.spanId
29-
// For other events, they reference an existing span via event.spanContext.spanId
30-
let spanKey = event.invocationId + event.spanContext.spanId;
31-
switch (event.event.type) {
32-
case 'spanOpen':
33-
// spanOpen creates a new span with ID in event.event.spanId
34-
spanKey = event.invocationId + event.event.spanId;
35-
spans.set(spanKey, {
36-
name: event.event.name,
37-
});
38-
break;
39-
case 'attributes': {
40-
// Filter out top-level attributes events (jsRpcSession span)
41-
if (topLevelSpanId && event.spanContext.spanId === topLevelSpanId) {
42-
// Ignore attributes for the top-level span
43-
break;
44-
}
45-
46-
// attributes references an existing span via spanContext.spanId
47-
let span = spans.get(spanKey);
48-
if (!span) {
49-
throw new Error(`Attributes event for unknown span: ${spanKey}`);
50-
}
51-
for (let { name, value } of event.event.info) {
52-
span[name] = value;
53-
}
54-
break;
55-
}
56-
case 'spanClose': {
57-
// spanClose references an existing span via spanContext.spanId
58-
let span = spans.get(spanKey);
59-
if (!span) {
60-
throw new Error(`SpanClose event for unknown span: ${spanKey}`);
61-
}
62-
span['closed'] = true;
63-
break;
64-
}
65-
case 'outcome':
66-
resolveFn();
67-
break;
68-
}
69-
};
70-
},
71-
};
11+
// Use shared instrumentation test tail worker
12+
export default testTailHandler;
7213

7314
export const test = {
7415
async test() {

src/workerd/api/kv-test.wd-test

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ const unitTests :Workerd.Config = (
1616
),
1717
(name = "tail", worker = .tailWorker, ),
1818
],
19+
extensions = [ (
20+
modules = [
21+
( name = "test:instumentation-tail", esModule = embed "tests/instrumentation-tail-worker.js" ),
22+
]
23+
) ]
1924
);
2025

2126
const tailWorker :Workerd.Worker = (

src/workerd/api/r2-instrumentation-test.js

Lines changed: 7 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,14 @@
22
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
33
// https://opensource.org/licenses/Apache-2.0
44
import * as assert from 'node:assert';
5+
import {
6+
invocationPromises,
7+
spans,
8+
testTailHandler,
9+
} from 'test:instumentation-tail';
510

6-
// tailStream is going to be invoked multiple times, but we want to wait
7-
// to run the test until all executions are done. Collect promises for
8-
// each
9-
let invocationPromises = [];
10-
let spans = new Map();
11-
12-
export default {
13-
tailStream(event, env, ctx) {
14-
// For each "onset" event, store a promise which we will resolve when
15-
// we receive the equivalent "outcome" event
16-
let resolveFn;
17-
invocationPromises.push(
18-
new Promise((resolve, reject) => {
19-
resolveFn = resolve;
20-
})
21-
);
22-
23-
// Accumulate the span info for easier testing
24-
return (event) => {
25-
// span ids are simple counters for tests, but invocation ID allows us to differentiate them
26-
let spanKey = event.invocationId + event.spanContext.spanId;
27-
switch (event.event.type) {
28-
case 'spanOpen':
29-
spans.set(event.invocationId + event.event.spanId, {
30-
name: event.event.name,
31-
});
32-
break;
33-
case 'attributes': {
34-
let span = spans.get(spanKey);
35-
for (let { name, value } of event.event.info) {
36-
span[name] = value;
37-
}
38-
spans.set(spanKey, span);
39-
break;
40-
}
41-
case 'spanClose': {
42-
let span = spans.get(spanKey);
43-
span['closed'] = true;
44-
spans.set(spanKey, span);
45-
break;
46-
}
47-
case 'outcome':
48-
resolveFn();
49-
break;
50-
}
51-
};
52-
},
53-
};
11+
// Use shared instrumentation test tail worker
12+
export default testTailHandler;
5413

5514
export const test = {
5615
async test() {

src/workerd/api/r2-test.wd-test

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ const unitTests :Workerd.Config = (
1818
),
1919
(name = "tail", worker = .tailWorker, ),
2020
],
21+
extensions = [ (
22+
modules = [
23+
( name = "test:instumentation-tail", esModule = embed "tests/instrumentation-tail-worker.js" ),
24+
]
25+
) ]
2126
);
2227

2328
const tailWorker :Workerd.Worker = (
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
// tailStream is going to be invoked multiple times, but we want to wait
2+
// to run the test until all executions are done. Collect promises for
3+
// each
4+
export let spans = new Map();
5+
export let invocationPromises = [];
6+
7+
// tail stream handler function used in several STW instrumentation tests.
8+
export const testTailHandler = {
9+
tailStream(event, env, ctx) {
10+
// Capture the top-level span ID from the onset event
11+
const topLevelSpanId = event.event.spanId;
12+
13+
// For each "onset" event, store a promise which we will resolve when
14+
// we receive the equivalent "outcome" event
15+
let resolveFn;
16+
invocationPromises.push(
17+
new Promise((resolve, reject) => {
18+
resolveFn = resolve;
19+
})
20+
);
21+
22+
return (event) => {
23+
// For spanOpen events, the new span ID is in event.event.spanId
24+
// For other events, they reference an existing span via event.spanContext.spanId
25+
let spanKey = event.invocationId + event.spanContext.spanId;
26+
switch (event.event.type) {
27+
case 'spanOpen':
28+
// spanOpen creates a new span with ID in event.event.spanId
29+
spanKey = event.invocationId + event.event.spanId;
30+
spans.set(spanKey, {
31+
name: event.event.name,
32+
});
33+
break;
34+
case 'attributes': {
35+
// Filter out top-level attributes events (jsRpcSession span)
36+
if (event.spanContext.spanId === topLevelSpanId) {
37+
// Ignore attributes for the top-level span
38+
break;
39+
}
40+
41+
// attributes references an existing span via spanContext.spanId
42+
let span = spans.get(spanKey);
43+
if (!span) {
44+
throw new Error(`Attributes event for unknown span: ${spanKey}`);
45+
}
46+
for (let { name, value } of event.event.info) {
47+
span[name] = value;
48+
}
49+
break;
50+
}
51+
case 'spanClose': {
52+
// spanClose references an existing span via spanContext.spanId
53+
let span = spans.get(spanKey);
54+
if (!span) {
55+
throw new Error(`SpanClose event for unknown span: ${spanKey}`);
56+
}
57+
span['closed'] = true;
58+
break;
59+
}
60+
case 'outcome':
61+
resolveFn();
62+
break;
63+
}
64+
};
65+
},
66+
};

0 commit comments

Comments
 (0)