Skip to content

Commit 828babd

Browse files
authored
Jsonata: Support tracing (#68)
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
1 parent de31c89 commit 828babd

File tree

4 files changed

+331
-9
lines changed

4 files changed

+331
-9
lines changed

transform-jsonata/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ pnpm dev
2020
# or pnpm dev-kubevirt to inject a different example transformation
2121
```
2222

23+
### Tracing
24+
25+
```shell
26+
docker run --rm -d -p 9411:9411 openzipkin/zipkin
27+
28+
pnpm dev-zipkin
29+
```
30+
2331
## Building
2432

2533
Assuming current working directory is `transform-jsonata`

transform-jsonata/jsonata.js

Lines changed: 172 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,32 @@ const fs = require('node:fs');
55
const fsPromises = require('node:fs').promises;
66
const {buffer} = require('node:stream/consumers');
77

8+
const {
9+
trace,
10+
context,
11+
SpanStatusCode,
12+
propagation,
13+
DiagConsoleLogger,
14+
DiagLogLevel,
15+
diag
16+
} = require('@opentelemetry/api');
17+
const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node');
18+
const {Resource} = require('@opentelemetry/resources');
19+
const {
20+
ATTR_SERVICE_NAME,
21+
ATTR_SERVER_ADDRESS,
22+
ATTR_SERVER_PORT,
23+
ATTR_URL_SCHEME
24+
} = require('@opentelemetry/semantic-conventions');
25+
const {
26+
BatchSpanProcessor,
27+
ParentBasedSampler,
28+
TraceIdRatioBasedSampler, ConsoleSpanExporter
29+
} = require('@opentelemetry/sdk-trace-base');
30+
const {ZipkinExporter} = require('@opentelemetry/exporter-zipkin');
31+
const {W3CTraceContextPropagator, CompositePropagator} = require("@opentelemetry/core");
32+
const {B3InjectEncoding, B3Propagator} = require("@opentelemetry/propagator-b3");
33+
834
const port = process.env.PORT = process.env.PORT || 8080;
935
const k_sink = process.env.K_SINK || undefined;
1036
const jsonata_transform_file_name = process.env.JSONATA_TRANSFORM_FILE_NAME || undefined;
@@ -14,6 +40,8 @@ const jsonata_response_transform_file_name = process.env.JSONATA_RESPONSE_TRANSF
1440

1541
const jsonata_discard_response_body = process.env.JSONATA_DISCARD_RESPONSE_BODY === "true" || false;
1642

43+
const jsonata_config_tracing = JSON.parse(process.env.JSONATA_CONFIG_TRACING || '{}')
44+
1745
const oidc_token_file = process.env.OIDC_TOKEN_FILE || undefined
1846
if (oidc_token_file && !fs.existsSync(oidc_token_file)) {
1947
console.info(`${oidc_token_file} file doesn't exist, token will not be forwarded to K_SINK endpoint (if specified)`);
@@ -57,7 +85,98 @@ function logDebug(...inputs) {
5785
}
5886
}
5987

88+
const w3cPropagator = new W3CTraceContextPropagator();
89+
const propagator = new CompositePropagator({
90+
propagators: [
91+
w3cPropagator,
92+
new B3Propagator({
93+
injectEncoding: B3InjectEncoding.MULTI_HEADER
94+
}),
95+
new B3Propagator({
96+
injectEncoding: B3InjectEncoding.SINGLE_HEADER,
97+
})
98+
],
99+
})
100+
101+
if (process.env.NODE_ENV === "development") {
102+
// Enable OpenTelemetry debug logging
103+
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.DEBUG);
104+
}
105+
106+
logDebug(jsonata_config_tracing)
107+
108+
let exporter = undefined
109+
if ("zipkin-endpoint" in jsonata_config_tracing &&
110+
jsonata_config_tracing['zipkin-endpoint'] !== "" &&
111+
'backend' in jsonata_config_tracing &&
112+
jsonata_config_tracing.backend === 'zipkin') {
113+
114+
console.info("Using zipkin tracing exporter")
115+
exporter = new ZipkinExporter({
116+
url: jsonata_config_tracing['zipkin-endpoint'],
117+
serviceName: 'transform-jsonata',
118+
})
119+
} else {
120+
console.info("Using console tracing exporter")
121+
exporter = new ConsoleSpanExporter()
122+
}
123+
124+
const batchSpanProcessor = new BatchSpanProcessor(exporter, {
125+
maxQueueSize: 1000, // Maximum queue size. After this, spans are dropped
126+
maxExportBatchSize: 100, // Maximum batch size of every export
127+
scheduledDelayMillis: 5000, // Delay interval between two consecutive exports
128+
exportTimeoutMillis: 30000, // How long the export can run before it is cancelled
129+
});
130+
131+
let sampleRate = undefined
132+
if ('sample-rate' in jsonata_config_tracing) {
133+
sampleRate = Number.parseFloat(jsonata_config_tracing['sample-rate']);
134+
console.info(`Tracing sample rate is ${sampleRate}`)
135+
}
136+
137+
const traceProvider = new NodeTracerProvider({
138+
resource: new Resource({
139+
[ATTR_SERVICE_NAME]: 'transform-jsonata',
140+
}),
141+
spanProcessors: [batchSpanProcessor],
142+
sampler: new ParentBasedSampler({
143+
root: new TraceIdRatioBasedSampler(sampleRate ?? 0),
144+
}),
145+
});
146+
147+
traceProvider.register({propagator: propagator});
148+
149+
// Get a tracer
150+
const tracer = trace.getTracer('tracer');
151+
60152
const app = express()
153+
app.use((req, res, next) => {
154+
const parentContext = propagation.extract(context.active(), req.headers);
155+
156+
const span = tracer.startSpan(
157+
'http_request',
158+
{
159+
attributes: {
160+
'http.method': req.method,
161+
'http.url': req.url,
162+
'http.route': req.route?.path,
163+
},
164+
},
165+
parentContext
166+
);
167+
168+
// Store the span in the context
169+
return context.with(trace.setSpan(context.active(), span), () => {
170+
// Add a callback to end the span when the response is finished
171+
res.on('finish', () => {
172+
span.setAttributes({
173+
'http.status_code': res.statusCode,
174+
});
175+
span.end();
176+
});
177+
next();
178+
});
179+
});
61180
app.use(express.json());
62181
app.use(express.text());
63182
app.use(express.raw({type: '*/*'}));
@@ -75,7 +194,16 @@ app.use((req, res, next) => {
75194
next();
76195
});
77196

197+
const headerSetter = {
198+
set: (carrier, key, value) => {
199+
carrier[key] = value;
200+
}
201+
};
202+
78203
app.post("/", async (req, res) => {
204+
const processSpan = tracer.startSpan('process_request');
205+
const processSpanContext = trace.setSpan(context.active(), processSpan)
206+
79207
try {
80208
let input = null
81209
try {
@@ -119,12 +247,41 @@ app.post("/", async (req, res) => {
119247
}
120248
}
121249

122-
const response = await fetch(k_sink, {
123-
method: "POST",
124-
headers: k_sink_request_headers,
125-
body: JSON.stringify(transformed),
126-
redirect: 'error',
127-
})
250+
const k_sink_url = new URL(k_sink)
251+
const kSinkSendSpan = tracer.startSpan('k_sink_send', {
252+
attributes: {
253+
[ATTR_URL_SCHEME]: k_sink_url.protocol.endsWith(':') ? k_sink_url.protocol.substring(0, k_sink_url.protocol.length-1) : k_sink_url.protocol,
254+
[ATTR_SERVER_ADDRESS]: k_sink_url.hostname,
255+
[ATTR_SERVER_PORT]: k_sink_url.port,
256+
}
257+
}, processSpanContext);
258+
259+
const response = await context.with(
260+
trace.setSpan(context.active(), kSinkSendSpan),
261+
async () => {
262+
try {
263+
w3cPropagator.inject(context.active(), k_sink_request_headers, headerSetter)
264+
265+
const result = await fetch(k_sink, {
266+
method: "POST",
267+
headers: k_sink_request_headers,
268+
body: JSON.stringify(transformed),
269+
redirect: 'error',
270+
})
271+
kSinkSendSpan.setAttributes({
272+
'http.status_code': result.status,
273+
'http.response_content_length': result.headers.get('content-length'),
274+
});
275+
return result;
276+
} catch (error) {
277+
kSinkSendSpan.recordException(error);
278+
kSinkSendSpan.setStatus({code: SpanStatusCode.ERROR});
279+
throw error;
280+
} finally {
281+
kSinkSendSpan.end();
282+
}
283+
}
284+
);
128285

129286
if (jsonata_discard_response_body) {
130287
logDebug(`Received response from K_SINK, discarding response body and responding with ${response.status}`)
@@ -192,11 +349,15 @@ app.post("/", async (req, res) => {
192349
.send(JSON.stringify(transformed_response))
193350

194351
} catch (error) {
352+
processSpan.recordException(error)
353+
processSpan.setStatus({code: SpanStatusCode.ERROR});
195354
console.error(error);
196355
return res
197356
.header("Reason", error.toString())
198357
.status(500)
199358
.send()
359+
} finally {
360+
processSpan.end();
200361
}
201362
});
202363

@@ -256,7 +417,11 @@ function shutDownNow() {
256417

257418
server.close(() => {
258419
console.log('Closed out remaining connections');
259-
process.exit(0);
420+
421+
console.log('Shutting down tracing...');
422+
batchSpanProcessor.shutdown().then(() => {
423+
process.exit(0);
424+
});
260425
});
261426

262427
setTimeout(() => {

transform-jsonata/package.json

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,21 @@
33
"author": "Knative authors",
44
"scripts": {
55
"dev": "NODE_ENV=development OIDC_TOKEN_FILE=./examples/example-token.txt JSONATA_TRANSFORM_FILE_NAME=./examples/jsonata_transform_identity.jsonata nodemon ./jsonata.js",
6-
"dev-kubevirt": "NODE_ENV=development OIDC_TOKEN_FILE=./examples/example-token.txt JSONATA_TRANSFORM_FILE_NAME=./examples/ce_apiserversource_kubevirt.jsonata nodemon ./jsonata.js"
6+
"dev-kubevirt": "NODE_ENV=development OIDC_TOKEN_FILE=./examples/example-token.txt JSONATA_TRANSFORM_FILE_NAME=./examples/ce_apiserversource_kubevirt.jsonata nodemon ./jsonata.js",
7+
"dev-zipkin": "NODE_ENV=development JSONATA_CONFIG_TRACING='{\"backend\":\"zipkin\", \"zipkin-endpoint\": \"http://localhost:9411/api/v2/spans\", \"sample-rate\":\"1\"}' OIDC_TOKEN_FILE=./examples/example-token.txt JSONATA_TRANSFORM_FILE_NAME=./examples/jsonata_transform_identity.jsonata nodemon ./jsonata.js"
78
},
89
"dependencies": {
910
"jsonata": "^2.0.6",
1011
"cloudevents": "^8.0.2",
11-
"express": "^4.21.2"
12+
"express": "^4.21.2",
13+
"@opentelemetry/api": "1.9.0",
14+
"@opentelemetry/core": "^1.30.1",
15+
"@opentelemetry/sdk-trace-node": "^1.30.1",
16+
"@opentelemetry/resources": "^1.30.1",
17+
"@opentelemetry/semantic-conventions": "^1.30.0",
18+
"@opentelemetry/sdk-trace-base": "^1.30.1",
19+
"@opentelemetry/exporter-zipkin": "^1.30.1",
20+
"@opentelemetry/propagator-b3": "^1.30.1"
1221
},
1322
"devDependencies": {
1423
"nodemon": "^3.1.9",

0 commit comments

Comments
 (0)