Skip to content

Commit 0fdf2c0

Browse files
committed
wip
1 parent c650254 commit 0fdf2c0

File tree

5 files changed

+260
-19
lines changed

5 files changed

+260
-19
lines changed

configs/gateway.config.ts

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/**
2+
* Hive Gateway Config with a Hive Console Specific Span Processor.
3+
*
4+
* ```sh
5+
* docker run --name hive-gateway --rm -p 4000:4000 \
6+
* -v $(pwd)/configs/gateway.config.ts:/gateway/gateway.config.ts \
7+
* -e HIVE_ORGANIZATION_ACCESS_TOKEN="<organization_access_key>"
8+
* -e HIVE_TARGET_REF="<target_ref>"
9+
* -e DEBUG=1
10+
* ghcr.io/graphql-hive/gateway supergraph \
11+
* "http://host.docker.internal:3001/artifacts/v1/<target_id>" \
12+
* --hive-cdn-key '<cdn_key>'
13+
* ````
14+
*/
15+
import { createOtlpHttpExporter, defineConfig, GatewayPlugin } from '@graphql-hive/gateway';
16+
import type { MeshFetchRequestInit } from '@graphql-mesh/types';
17+
import { Span, SpanProcessor } from '@opentelemetry/sdk-trace-base';
18+
19+
// The following plugin is used to trace the fetch calls made by Mesh.
20+
const useOnFetchTracer = (): GatewayPlugin => {
21+
const upstreamCallHeaders: Array<{
22+
url: string;
23+
headers: MeshFetchRequestInit['headers'];
24+
}> = [];
25+
26+
return {
27+
onFetch({ url, options }) {
28+
upstreamCallHeaders.push({ url, headers: options.headers });
29+
},
30+
onRequest({ request, url, endResponse, fetchAPI }) {
31+
if (url.pathname === '/upstream-fetch' && request.method === 'GET') {
32+
endResponse(fetchAPI.Response.json(upstreamCallHeaders));
33+
return;
34+
}
35+
},
36+
onExecute({ context }) {
37+
const span: Span | undefined = context.opentelemetry
38+
.activeContext()
39+
._currentContext.values()
40+
.next().value;
41+
42+
if (!span) {
43+
return;
44+
}
45+
return {
46+
onExecuteDone(ctx) {
47+
const errors = new Set<string>();
48+
if (ctx.result?.errors) {
49+
for (const err of ctx.result.errors) {
50+
if (err.extensions?.code && typeof err.extensions.code === 'string') {
51+
errors.add(err.extensions.code);
52+
}
53+
}
54+
}
55+
if (errors.size) {
56+
span.setAttribute('hive.graphql.error.codes', Array.from(errors).join(','));
57+
}
58+
},
59+
};
60+
},
61+
};
62+
};
63+
64+
class HiveTracingSpanProcessor implements SpanProcessor {
65+
private activeSpans: Map<string, Map<string, Span>> = new Map();
66+
private rootSpanIds: Map<string, string> = new Map();
67+
private subgraphNames: Map<string, Set<string>> = new Map();
68+
69+
onStart(span: Span): void {
70+
const spanContext = span.spanContext();
71+
const traceId = spanContext.traceId;
72+
const spanId = spanContext.spanId;
73+
74+
// Initialize trace data structures if needed
75+
if (!this.activeSpans.has(traceId)) {
76+
this.activeSpans.set(traceId, new Map());
77+
}
78+
if (!this.subgraphNames.has(traceId)) {
79+
this.subgraphNames.set(traceId, new Set());
80+
}
81+
82+
this.activeSpans.get(traceId)!.set(spanId, span);
83+
84+
// If this is a root span (no parent), mark it as the root span for this trace
85+
if (!span.parentSpanId) {
86+
this.rootSpanIds.set(traceId, spanId);
87+
}
88+
89+
// Check if this is a subgraph execution span
90+
if (span.name && span.name.startsWith('subgraph.execute')) {
91+
const subgraphName = span.attributes['gateway.upstream.subgraph.name'];
92+
if (subgraphName && typeof subgraphName === 'string') {
93+
this.subgraphNames.get(traceId)!.add(subgraphName);
94+
}
95+
}
96+
}
97+
98+
onEnd(span: Span): void {
99+
const spanContext = span.spanContext();
100+
const traceId = spanContext.traceId;
101+
const spanId = spanContext.spanId;
102+
103+
// Skip if we don't have this trace
104+
if (!this.activeSpans.has(traceId)) {
105+
return;
106+
}
107+
108+
const spansForTrace = this.activeSpans.get(traceId)!;
109+
const rootSpanId = this.rootSpanIds.get(traceId);
110+
const subgraphNamesForTrace = this.subgraphNames.get(traceId);
111+
112+
// Check if this is the GraphQL execute span we're interested in
113+
// TODO: can we have this fully type safe?
114+
if (span.name === 'graphql.execute') {
115+
const operationType = span.attributes['graphql.operation.type'];
116+
const operationName = span.attributes['graphql.operation.name'];
117+
const errorCount = span.attributes['graphql.error.count'];
118+
const document = span.attributes['graphql.document'];
119+
120+
if (rootSpanId) {
121+
const rootSpan = spansForTrace.get(rootSpanId);
122+
if (rootSpan && !rootSpan.ended) {
123+
// Update the name of the root span
124+
if (operationType && document) {
125+
rootSpan.updateName(`${operationType} ${operationName}`);
126+
127+
// Copy attributes to root span
128+
rootSpan.setAttribute('hive.graphql.operation.type', operationType);
129+
rootSpan.setAttribute('hive.graphql.operation.name', operationName ?? '');
130+
rootSpan.setAttribute('hive.graphql.operation.document', document);
131+
132+
if (errorCount !== undefined)
133+
rootSpan.setAttribute('hive.graphql.error.count', errorCount);
134+
135+
// Add the subgraph names as a comma-separated list
136+
if (subgraphNamesForTrace && subgraphNamesForTrace.size > 0) {
137+
rootSpan.setAttribute(
138+
'hive.subgraph.names',
139+
Array.from(subgraphNamesForTrace).join(','),
140+
);
141+
}
142+
}
143+
}
144+
}
145+
}
146+
147+
// For any subgraph span that's ending, make sure we capture its name
148+
if (span.name && span.name.startsWith('subgraph.execute')) {
149+
const subgraphName = span.attributes['gateway.upstream.subgraph.name'];
150+
if (subgraphName && typeof subgraphName === 'string' && subgraphNamesForTrace) {
151+
subgraphNamesForTrace.add(subgraphName);
152+
153+
// Update root span with current list of subgraph names
154+
if (rootSpanId) {
155+
const rootSpan = spansForTrace.get(rootSpanId);
156+
if (rootSpan && !rootSpan.ended) {
157+
rootSpan.setAttribute(
158+
'hive.subgraph.names',
159+
Array.from(subgraphNamesForTrace).join(','),
160+
);
161+
}
162+
}
163+
}
164+
}
165+
166+
// Clean up the span reference
167+
spansForTrace.delete(spanId);
168+
169+
// If this is the root span or if no spans remain, clean up the trace
170+
if (rootSpanId === spanId || spansForTrace.size === 0) {
171+
this.activeSpans.delete(traceId);
172+
this.rootSpanIds.delete(traceId);
173+
this.subgraphNames.delete(traceId);
174+
if (process.env.DEBUG === '1') {
175+
console.log('span attributes', span.attributes);
176+
}
177+
}
178+
}
179+
180+
async forceFlush(): Promise<void> {
181+
// Clear all processor state
182+
this.activeSpans.clear();
183+
this.rootSpanIds.clear();
184+
this.subgraphNames.clear();
185+
}
186+
187+
async shutdown(): Promise<void> {
188+
// Clean up resources when shutting down
189+
await this.forceFlush();
190+
}
191+
}
192+
async function createHiveTracingSpanProcessor(): Promise<HiveTracingSpanProcessor> {
193+
return new HiveTracingSpanProcessor();
194+
}
195+
196+
export const gatewayConfig = defineConfig({
197+
openTelemetry: {
198+
exporters: [
199+
createHiveTracingSpanProcessor(),
200+
createOtlpHttpExporter(
201+
{
202+
url: 'http://host.docker.internal:4318/v1/traces',
203+
headers: {
204+
Authorization: `Bearer ${process.env.HIVE_ORGANIZATION_ACCESS_TOKEN}`,
205+
'X-Hive-Target-Ref': process.env.HIVE_TARGET_REF,
206+
},
207+
},
208+
// Batching config is set in order to make it easier to test.
209+
{
210+
scheduledDelayMillis: 1,
211+
},
212+
),
213+
],
214+
serviceName: 'hive-gateway',
215+
},
216+
plugins: () => [useOnFetchTracer()],
217+
});

docker/configs/otel-collector/builder-config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ processors:
1212
- gomod:
1313
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor
1414
v0.122.0
15+
- gomod:
16+
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.122.0
1517

1618
exporters:
1719
- gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.122.0

docker/configs/otel-collector/config.yaml

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,23 @@ processors:
2626
- key: hive.target_id
2727
from_context: auth.targetId
2828
action: insert
29+
filter/drop_missing_attributes:
30+
error_mode: ignore
31+
traces:
32+
span:
33+
# prettier-ignore
34+
- not (
35+
attributes["http.status"] != nil and
36+
attributes["http.host"] != nil and
37+
attributes["http.method"] != nil and
38+
attributes["http.route"] != nil and
39+
attributes["http.url"] != nil and
40+
(
41+
attributes["hive.graphql.operation.type"] == "query" or
42+
attributes["hive.graphql.operation.type"] == "mutation" or
43+
attributes["hive.graphql.operation.type"] == "subscription"
44+
)
45+
)
2946
exporters:
3047
# debug:
3148
# verbosity: detailed
@@ -61,5 +78,9 @@ service:
6178
pipelines:
6279
traces:
6380
receivers: [otlp]
64-
processors: [attributes, batch]
81+
processors: [
82+
attributes,
83+
# filter/drop_missing_attributes,
84+
batch,
85+
]
6586
exporters: [clickhouse]

packages/migrations/src/clickhouse-actions/015-otel-trace.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ export const action: Action = async exec => {
166166
, "SpanAttributes"['hive.graphql.operation.name'] AS "graphql_operation_name"
167167
, toLowCardinality("SpanAttributes"['hive.graphql.operation.type']) AS "graphql_operation_type"
168168
, "SpanAttributes"['hive.graphql.operation.document'] AS "graphql_operation_document"
169-
, "SpanAttributes"['hive.graphql.error.count'] AS "graphql_errors_count"
169+
, "SpanAttributes"['hive.graphql.error.count'] AS "graphql_error_count"
170170
, arrayMap(x -> toLowCardinality(x), splitByChar(',', "SpanAttributes"['hive.graphql.error.codes'])) AS "graphql_error_codes"
171171
, arrayMap(x -> toLowCardinality(x), splitByChar(',', "SpanAttributes"['hive.subgraph.names'])) AS "subgraph_names"
172172
FROM

packages/services/api/src/modules/operations/resolvers/Target.ts

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -152,22 +152,25 @@ export const Target: Pick<
152152
}>({
153153
query: sql`
154154
SELECT
155-
target_id,
156-
trace_id,
157-
span_id,
158-
timestamp,
159-
graphql_operation_name as operation_name,
160-
graphql_operation_type as operation_type,
161-
duration,
162-
subgraph_names,
163-
http_status_code,
164-
http_method,
165-
http_host,
166-
http_route,
167-
http_url
168-
FROM otel_traces_normalized
155+
"target_id"
156+
, "trace_id"
157+
, "span_id"
158+
, "timestamp"
159+
, "http_status_code"
160+
, "http_method"
161+
, "http_host"
162+
, "http_route"
163+
, "http_url"
164+
, "duration"
165+
, "graphql_operation_name" AS "operation_name"
166+
, upper("graphql_operation_type") AS "operation_type"
167+
, "subgraph_names"
168+
FROM
169+
"otel_traces_normalized"
169170
WHERE ${sql.join(ANDs, ' AND ')}
170-
ORDER BY timestamp DESC, trace_id DESC
171+
ORDER BY
172+
"timestamp" DESC
173+
, "trace_id" DESC
171174
LIMIT ${sql.raw(String(limit))}
172175
`,
173176
queryId: 'traces',
@@ -177,8 +180,6 @@ export const Target: Pick<
177180
const traces = tracesQuery.data;
178181
let hasNext = false;
179182

180-
console.log('AYAYAYAY', traces.length);
181-
182183
if (traces.length == limit) {
183184
hasNext = true;
184185
(traces as any).pop();

0 commit comments

Comments
 (0)